第 52 章 POSIX 消息队列 (POSIX Message Queues)

      +

      核心结论

      • POSIX 消息队列特性:消息有优先级(严格优先级序);引用计数删除;异步通知(信号或线程);Linux 上 mqd_t 是 file descriptor(可 select/poll/epoll)。

      • 核心 API:mq_open / mq_close / mq_unlink / mq_send / mq_receive / mq_getattr / mq_setattr / mq_notify / mq_timedsend / mq_timedreceive。

      • 优先级 vs SysV type:POSIX 消息按优先级严格排序(数值越大越优先,相同优先级 FIFO);SysV 用 msgrcv 按 type 灵活选择(FIFO / 精确 type / 最大 type ≤ N)。

      • mq_attr 属性:mq_flags(描述符标志)、mq_maxmsg(队列最大消息数)、mq_msgsize(单消息最大字节)、mq_curmsgs(当前消息数);mq_maxmsg/mq_msgsize 创建时固定。

      • mq_notify 通知机制:空队列收到第一条消息时通知——方式有 SIGEV_NONE/SIGNAL/THREAD;同一队列同一时刻只有一个进程注册;触发后自动注销,需重新注册。

      • SysV vs POSIX 优势:POSIX 优点——引用计数、消息通知、可与 select/poll/epoll 集成(Linux);POSIX 缺点——可移植性差(Linux 2.6.6+)、type 字段灵活性不如 SysV。

      • Linux 默认值:mq_maxmsg=10、mq_msgsize=8192;可通过 /proc/sys/fs/mqueue/msg_max、msgsize_max、queues_max 调整。

      • 链接与挂载:编译 -lrt;mq 在虚拟文件系统——mount -t mqueue none /dev/mqueue 后可用 ls/rm 操作。

      本章主旨

      POSIX 消息队列是 SysV 消息队列的现代替代——核心改进是「引用计数删除」和「消息通知」。读者需要建立三组对比:(1) 与 SysV 消息队列对比——优先级 vs type、引用计数 vs 显式 IPC_RMID、通知 vs poll;(2) mq_open vs file open——同是「名字 + oflag + mode + attr」,但返回 mqd_t;(3) mq_send vs pipe write——保留消息边界、支持优先级、支持通知。Linux 把 mqd_t 实现为 fd——这是消息通知 + select/poll 集成的实现基础。

      一、核心概念

      本章围绕 6 个核心概念展开:API 模型、消息属性、优先级、引用计数、消息通知、Linux 特有特性。

      概念 定义 + 重要性 实现提示

      mq_open / mq_close / mq_unlink

      mq_open(name, oflag, mode, attr) 返回 mqd_t;mq_close 关闭;mq_unlink 删除名字(引用归零销毁);同 POSIX IPC 三件套语义

      §52.2;表 52-1:oflag 含 O_CREAT/O_EXCL/O_RDONLY/O_WRONLY/O_RDWR/O_NONBLOCK;attr 含 mq_maxmsg/mq_msgsize

      mq_attr 属性结构

      4 字段——mq_flags(描述符标志)、mq_maxmsg(队列容量)、mq_msgsize(单消息最大字节)、mq_curmsgs(当前消息数);mq_maxmsg/mq_msgsize 创建后固定

      §52.4;Linux 默认 10 条/8192 字节;mq_setattr 只能改 mq_flags;先 getattr 再 setattr 保留其他位

      优先级与 FIFO

      mq_send(mqd, buf, len, prio) 中 prio ≥ 0;接收始终是「最高优先级 + 同级 FIFO」;MQ_PRIO_MAX Linux=32768、Solaris=32、Tru64=256

      §52.5;零长度消息允许;满则阻塞或 EAGAIN;超时用 mq_timedsend + abs_timeout

      消息边界与管道对比

      POSIX mq 保留消息边界(每次 send 对应一次 receive);pipe 是字节流(无边界);无 EOF 语义(写者关闭后读不返回 0)

      §52.5.2;O_NONBLOCK 下空队列 mq_receive 返回 EAGAIN(不像 pipe 返回 0)

      mq_notify 异步通知

      mq_notify(mqd, sigevent*) 注册空队列收到消息时通知;SIGEV_NONE/SIGNAL/THREAD;同一队列同一时刻只允许一个注册者;触发后注销需重新注册

      §52.6;注册前已非空则需先排空再注册;与其他进程 mq_receive 阻塞互斥(接收者优先)

      Linux 特有:mqueuefs + fd 语义

      Linux 把 mq 放在 mqueuefs(mount -t mqueue);mqd_t 实为 file descriptor——可 select/poll/epoll;用 cat /dev/mqueue/X 看 QSIZE/NOTIFY/NOTIFY_PID

      §52.7;非标准特性;可移植代码不依赖 fd 语义;/proc/sys/fs/mqueue/ 调 limits

      二、详细笔记

      What:POSIX 消息队列的 open/close/unlink 三个生命周期调用——与文件 API 风格一致。

      Why:与 SysV msgget/msgrcv/msgctl 的「key + id」风格相比,POSIX 名字模型更直观。

      How

      // 摘自《The Linux Programming Interface》 第 52 章
      #include <fcntl.h>
      #include <sys/stat.h>
      #include <mqueue.h>
      
      mqd_t mq_open(const char *name, int oflag, ...);
      int mq_close(mqd_t mqdes);
      int mq_unlink(const char *name);
      
      /* 创建 */
      struct mq_attr attr = { .mq_maxmsg = 50, .mq_msgsize = 2048 };
      mqd_t mqd = mq_open("/myq", O_CREAT|O_RDWR, 0660, &attr);
      
      /* 关闭(不删除对象) */
      mq_close(mqd);
      
      /* 取消链接(引用归零才真销毁) */
      mq_unlink("/myq");

      oflag 取值(表 52-1):O_CREAT、O_EXCL、O_RDONLY、O_WRONLY、O_RDWR、O_NONBLOCK。

      When:(1) 进程启动时 mq_open(不存在则创建);(2) 用完 mq_close(防止 fd 耗尽);(3) 守护进程可立即 mq_unlink(避免名字残留)。

      Example:第 52 章 pmsg_create.c 命令行 -c -m 50 -s 2048 /mq 创建带属性的队列;pmsg_unlink.c 删除。

      52.2 mq_attr 属性结构

      Whatstruct mq_attr 4 字段——mq_flags、mq_maxmsg、mq_msgsize、mq_curmsgs。

      Why:可移植代码必须显式设置 mq_maxmsg/mq_msgsize——各实现默认值差异大。

      How

      // 摘自《The Linux Programming Interface》 第 52 章
      struct mq_attr {
          long mq_flags;       /* O_NONBLOCK(getattr/setattr) */
          long mq_maxmsg;      /* 队列最大消息数(open/getattr) */
          long mq_msgsize;     /* 单消息最大字节(open/getattr) */
          long mq_curmsgs;     /* 当前消息数(getattr) */
      };
      
      struct mq_attr attr = { .mq_maxmsg = 50, .mq_msgsize = 2048 };
      mqd_t mqd = mq_open("/q", O_CREAT|O_RDWR, 0660, &attr);
      
      mq_getattr(mqd, &attr);
      printf("curmsgs=%ld, maxmsg=%ld, msgsize=%ld\n",
             attr.mq_curmsgs, attr.mq_maxmsg, attr.mq_msgsize);

      mq_setattr 只能改 mq_flags;可移植改 O_NONBLOCK 的正确模式——getattr → 改 flags → setattr。

      When:(1) 创建时显式设 mq_maxmsg/mq_msgsize;(2) 读消息前 getattr 取 mq_msgsize 分配缓冲;(3) 切换阻塞/非阻塞用 setattr。

      Example:第 52 章 pmsg_create.c -m 50 -s 2048 设置容量 50 条、单消息最大 2 KB;pmsg_getattr.c 打印当前属性。

      52.3 优先级与 mq_send / mq_receive

      Whatmq_send(mqd, buf, len, prio)mq_receive(mqd, buf, len, &prio)——prio 越大越优先,同优先级 FIFO。

      Why:比 SysV mq 的 type 字段简单——按优先级排序,无需应用层排序。

      How

      // 摘自《The Linux Programming Interface》 第 52 章
      int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
                  unsigned int msg_prio);
      ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
                         unsigned int *msg_prio);
      
      /* 发送带优先级 */
      mq_send(mqd, "hello", 5, 10);    /* 高优先级 */
      mq_send(mqd, "world", 5, 0);     /* 低优先级 */
      
      /* 接收——先收到优先级 10,再收到 0 */
      unsigned int prio;
      char buf[8192];
      ssize_t n = mq_receive(mqd, buf, sizeof(buf), &prio);

      消息大小 ≤ mq_msgsize;零长度消息允许;满则阻塞或 EAGAIN;空则阻塞或 EAGAIN;超时用 mq_timedsend / mq_timedreceive。

      When:(1) 多任务调度——高优先级任务消息插队;(2) 同优先级维持 FIFO;(3) 不需 type 灵活性时首选 mq;(4) 需 type 灵活选择时仍用 SysV mq。

      Example:第 52 章示例——pmsg_send /mq msg-a 5pmsg_send /mq msg-b 0pmsg_send /mq msg-c 10pmsg_receive 依次得到 msg-c、msg-a、msg-b。

      52.4 mq_notify 异步通知

      Whatmq_notify(mqd, sigevent*) 注册——空队列收到第一条消息时通知调用进程。

      Why:避免「select/poll 忙等」或「阻塞 read 浪费 CPU」。

      How

      // 摘自《The Linux Programming Interface》 第 52 章
      struct sigevent sev;
      sev.sigev_notify = SIGEV_SIGNAL;       /* SIGEV_NONE/SIGNAL/THREAD */
      sev.sigev_signo = SIGUSR1;
      sev.sigev_value.sival_ptr = &my_data;  /* 信号 handler 可读 */
      mq_notify(mqd, &sev);
      
      /* 信号 handler 触发后需重新注册 */
      void handler(int sig) {
          mq_notify(mqd, &sev);              /* 重注册 */
          /* 排空队列(非阻塞模式) */
          while (mq_receive(mqd, buf, attr.mq_msgsize, NULL) >= 0)
              /* process */;
      }

      sigev_notify 三选项:

      • SIGEV_NONE — 注册但不真正通知(用于「探测」是否有人等你)。

      • SIGEV_SIGNAL — 发 sigev_signo 信号;可设实时信号携带 sival_ptr/sival_int。

      • SIGEV_THREAD — 在新线程调 sigev_notify_function(sigval)。

      When:(1) 单一消费者线程等待消息——注册一次,处理,重注册;(2) 多个消费者——不要用 mq_notify(互斥);(3) 与 mq_receive 阻塞互斥——有人阻塞 receive 时不发通知。

      Example:第 52 章 mq_notify_sig.c 用 sigsuspend 等 SIGUSR1 + O_NONBLOCK 排空;mq_notify_thread.c 用 SIGEV_THREAD 在线程中处理。

      52.5 Linux 特有:mqueuefs + fd 语义

      What:Linux 把 POSIX mq 实现为 mqueuefs 虚拟文件系统——mount -t mqueue none /dev/mqueue;mqd_t 实为 fd。

      Why:可与 select/poll/epoll 集成——解决 SysV mq 等待消息+fd 的难题。

      How

      挂载与查看

      mount -t mqueue none /dev/mqueue ls /dev/mqueue/ # 显示所有 mq cat /dev/mqueue/myq # QSIZE/NOTIFY/NOTIFY_PID/SIGNO rm /dev/mqueue/myq # 等价 mq_unlink

      /proc 接口调 limits

      /proc/sys/fs/mqueue/msg_max # 默认 10 /proc/sys/fs/mqueue/msgsize_max # 默认 8192 /proc/sys/fs/mqueue/queues_max # 默认 256

      mqd_t 是 fd——可 `select(mqd+1, &readfds, ...)` 等待消息;可 `poll()` / `epoll_wait()`。
      
      *When*:(1) 调试时 `ls /dev/mqueue` 看消息队列;(2) 同时等 mq + 普通 fd——用 select/poll/epoll;(3) 调整系统限制写 `/proc/sys/fs/mqueue/queues_max`(CAP_SYS_RESOURCE)。
      
      *Example*:第 52 章 §52.7 示例——`./pmsg_create -c /newq; ls /dev/mqueue` 显示 `newq`;`rm /dev/mqueue/newq` 删除。
      
      === 52.6 消息队列限制与 RLIMIT_MSGQUEUE
      
      *What*:SUSv3 定义 MQ_PRIO_MAX(≥32)与 MQ_OPEN_MAX(≥8);Linux 把后者替换为 fd 上限 + /proc/sys/fs/mqueue/ + RLIMIT_MSGQUEUE。
      
      *Why*:知道限制才能写出健壮的 mq 程序。
      
      *How*:
      
      [cols="1,2,2", options="header"]
      |===
      | 限制 | 默认值 | 说明
      
      | mq_maxmsg (per queue)
      | /proc/sys/fs/mqueue/msg_max = 10
      | 单队列最大消息数;HARD_MSGMAX = 32768(x86-32)
      
      | mq_msgsize (per message)
      | /proc/sys/fs/mqueue/msgsize_max = 8192
      | 单消息最大字节;最大 INT_MAX(2.6.28+)
      
      | queues_max (system-wide)
      | /proc/sys/fs/mqueue/queues_max = 256
      | 系统总消息队列数;达到后非特权进程无法创建
      
      | MQ_PRIO_MAX
      | Linux=32768
      | 消息最大优先级;通过 sysconf(_SC_MQ_PRIO_MAX) 查询
      
      | RLIMIT_MSGQUEUE
      | 默认 0(很多系统)
      | per-user 字节上限;用 setrlimit 调整
      |===
      
      *When*:(1) 调高队列容量需特权;(2) 单消息超过 8192 需 msgsize_max 调大;(3) 大量队列需 queues_max 调大;(4) 用 RLIMIT_MSGQUEUE 限制 per-user 总占用。
      
      *Example*:`cat /proc/sys/fs/mqueue/queues_max` 看上限;root 可 `echo 1024 > /proc/sys/fs/mqueue/queues_max`。
      
      == 三、关键图表
      
      [NOTE]
      .非可视化条目(API / 属性 / 限制)
      ====
      [cols="1,3", options="header"]
      |===
      | 类别 | 内容
      
      | 创建/打开
      | `mq_open(name, oflag, mode, attr)` 返回 mqd_t;oflag 含 O_CREAT/O_EXCL/O_RDONLY/WRONLY/RDWR/O_NONBLOCK
      
      | 关闭/删除
      | `mq_close(mqd)` 不删除;`mq_unlink(name)` 引用归零销毁
      
      | 收发
      | `mq_send(mqd, buf, len, prio)`;`mq_receive(mqd, buf, len, &prio)`;零长度允许
      
      | 超时
      | `mq_timedsend/timedreceive(mqd, buf, len, prio, abs_timeout)`;超时 ETIMEDOUT
      
      | 属性
      | `mq_getattr/setattr(mqd, *attr)`;mq_flags 可改;mq_maxmsg/mq_msgsize 固定
      
      | 通知
      | `mq_notify(mqd, *sigevent)`;SIGEV_NONE/SIGNAL/THREAD;触发后注销
      
      | 句柄
      | mqd_t;Linux 上是 int fd(可 select/poll/epoll)
      
      | 默认值
      | mq_maxmsg=10;mq_msgsize=8192;MQ_PRIO_MAX=32768
      
      | /proc 限制
      | /proc/sys/fs/mqueue/{msg_max, msgsize_max, queues_max}
      
      | RLIMIT
      | RLIMIT_MSGQUEUE per-user 字节上限
      
      | 挂载
      | `mount -t mqueue none /dev/mqueue`
      
      | 删除命令行
      | `rm /dev/mqueue/<name>`(非特权只能删自己的)
      
      | SysV vs POSIX
      | POSIX 优点:引用计数、通知、Linux 上可 select/poll
      | POSIX 缺点:可移植性差、type 不如 SysV 灵活
      |===
      ====
      
      == 四、思维导图
      
      [source,mermaid]

      mindmap root第 52 章 POSIX 消息队列 API 模型 mq_open close unlink 名字 oflag mode attr mqd_t 句柄 mq_attr 属性 属性结构 mq_flags mq_maxmsg 10 mq_msgsize 8192 mq_curmsgs 当前 create 固定 maxmsg msgsize 收发优先级 mq_send prio mq_receive prio 优先级严格 同级 FIFO 零长度消息 满 EAGAIN 异步通知 mq_notify SIGEV NONE SIGEV SIGNAL SIGEV THREAD 一次注册 需重新注册 Linux 特有 mqueuefs 虚拟文件 mqd_t 是 fd select poll epoll dev mqueue ls rm proc sys fs mqueue SysV 对比 POSIX 引用计数 POSIX 通知 POSIX 可 select SysV type 灵活 SysV 可移植

      五、重点与易错点

      1. mq_open attr 仅创建时生效——mq_maxmsg/mq_msgsize 创建后固定;mq_setattr 只能改 mq_flags。

      2. mqd_t 不是普通 fd——SUSv3 不保证是 fd;Linux 上是,但可移植代码不依赖此。

      3. 优先级 vs SysV type——POSIX 严格优先级序;SysV msgrcv 可按 type 灵活选择(FIFO/精确/最大≤N);需 type 灵活性时仍用 SysV。

      4. 消息边界 vs pipe——POSIX mq 保留消息边界;pipe 是字节流;mq 无 EOF 语义。

      5. mq_receive 不返回 0——空队列阻塞读永远阻塞;O_NONBLOCK 下返回 EAGAIN(不像 pipe 返回 0)。

      6. mq_notify 触发一次自动注销——必须重新注册;否则后续消息不再通知。

      7. mq_notify 与 mq_receive 阻塞互斥——有人阻塞 receive 时不发通知;多个消费者不要用 notify。

      8. 注册时队列已非空——需先排空再注册;否则永远等不到通知。

      9. 信号方式必须先阻塞信号 + sigsuspend 等——避免信号在 mq_notify 之前到达;pause() 不可靠。

      10. Linux MQ_PRIO_MAX=32768——比 SysV type 灵活范围大;SUSv3 只要求 ≥32。

      11. MQ_OPEN_MAX Linux 不限——因为 mqd_t 是 fd,受 RLIMIT_NOFILE 限制。

      12. RLIMIT_MSGQUEUE per-user 累计——所有队列总字节数受限于此;用 setrlimit 调整。

      13. Linux 默认 mq_maxmsg=10, msgsize=8192——可移植代码必须显式设 attr;不能依赖默认。

      14. CAP_SYS_RESOURCE 改 mqueuefs limits——非特权受 /proc/sys/fs/mqueue/ 限制;特权忽略 msg_max/msgsize_max,但仍受 HARD_MSGMAX 约束。

      15. 跨章衔接:第 46 章 SysV 消息队列(msgget/msgrcv 用 type);第 53 章 POSIX 信号量;第 54 章 POSIX 共享内存;第 51 章 POSIX IPC 概述(通用 API 模型)。