-
消息隊列介紹
在應用開發中,生産者,消費者的模型非常常見,一方産生資料并把資料放入隊列中,而另一方從隊列中取資料,先進先出。
同樣,在作業系統核心中,也實作了類似的功能,隊列中存放的是“消息”。稱之為消息隊列,消息也可了解為資料。
主要用途是程序間通信(IPC),所謂通信,就是進行資料互動。

消息隊列中的每條消息通常具有以下屬性:
- 一個表示優先級的整數;
- 消息的資料部分的長度;
- 消息資料本身
posix相關接口函數
Library interface | System call |
mq_close(3) | close(2) |
mq_getattr(3) | mq_getsetattr(2) |
mq_notify(3) | mq_notify(2) |
mq_open(3) | mq_open(2) |
mq_receive(3) | mq_timedreceive(2) |
mq_send(3) | mq_timedsend(2) |
mq_setattr(3) | mq_getsetattr(2) |
mq_timedreceive(3) | mq_timedreceive(2) |
mq_timedsend(3) | mq_timedsend(2) |
mq_unlink(3) | mq_unlink(2) |
-
Headers file
//Link with -lrt.
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>
#include <time.h>
-
Open/Close mqueue
//打開/建立 消息隊列,消息隊列名稱前面必須加上斜杆。
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode,struct mq_attr *attr);
//關閉消息隊列
int mq_close(mqd_t mqdes);
//删除消息隊列
int mq_unlink(const char *name); //成功傳回0,失敗傳回-1
Parameter Description:
name:表示消息隊列的名字,它符合POSIX IPC的名字規則。
oflag:表示打開的方式,和open函數的類似。有必須的選項:O_RDONLY,O_WRONLY,O_RDWR,還有可選的選項:O_NONBLOCK,O_CREAT,O_EXCL。
mode:是一個可選參數,在oflag中含有O_CREAT标志且消息隊列不存在時,才需要提供該參數。表示預設通路權限。可以參考open。
attr:也是一個可選參數,在oflag中含有O_CREAT标志且消息隊列不存在時才需要。該參數用于給新隊列設定某些屬性,如果是空指針,那麼就采用預設屬性。
-
Get/Set mqueue Attributes
//擷取/設定消息隊列屬性
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr,struct mq_attr *oldattr);
struct mq_attr { longmq_flags; /* Flags: 0 or O_NONBLOCK */ longmq_maxmsg; /* Max. # of messages onqueue,can only be initialized when mq_open is created*/ longmq_msgsize; /* Max. message size (bytes),can only be initialized when mq_open is created*/ longmq_curmsgs; /* # of messages currently inqueue */ }
Description:
mq_getattr用于擷取目前消息隊列的屬性,mq_setattr用于設定目前消息隊列的屬性。
其中mq_setattr中的oldattr用于儲存修改前的消息隊列的屬性,可以為空。
mq_setattr可以設定的屬性隻有mq_flags,用來設定或清除消息隊列的非阻塞标志。newattr結構的其他屬性被忽略。
mq_maxmsg和mq_msgsize屬性隻能在建立消息隊列時通過mq_open來設定。
mq_open隻會設定該兩個屬性,忽略另外兩個屬性。mq_curmsgs屬性隻能被擷取而不能被設定。
Note:
- attr.mq_maxmsg 不能超過檔案 /proc/sys/fs/mqueue/msg_max 中的數值,我的機器上面是10。
- attr.mq_msgsize不能超過 /proc/sys/fs/mqueue/msgsize_max 的數值。
-
Send/Receive messages through the message queue
//發送消息
int mq_send(mqd_t mqdes, const char *msg_ptr,size_t msg_len, unsigned int msg_prio);
//接收消息
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,size_t msg_len, unsigned int *msg_prio);
int mq_timedsend(mqd_t mqdes, const char *msg_ptr,
size_t msg_len, unsigned int msg_prio,
const struct timespec *abs_timeout);
ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr,
size_t msg_len, unsigned int *msg_prio,
const struct timespec *abs_timeout);
Parameter Description:
mqdes:消息隊列描述符;
msg_ptr:指向消息體緩沖區的指針;
msg_len:消息體的長度,其中mq_receive的該參數不能小于能寫入隊列中消息的最大大小,即一定要大于等于該隊列的mq_attr結構中mq_msgsize的大小。
如果mq_receive中的msg_len小于該值,就會傳回EMSGSIZE錯誤。POXIS消息隊列發送的消息長度可以為0。
msg_prio:消息的優先級;它是一個小于MQ_PRIO_MAX的數,數值越大,優先級越高。
POSIX消息隊列在調用mq_receive時總是傳回隊列中最高優先級的最早消息。
如果消息不需要設定優先級,那麼可以在mq_send是置msg_prio為0,mq_receive的msg_prio置為NULL。
const struct timespec:設定消息發送/接收的逾時時間。
abs_timeout指向一個結構,該結構指定了呼叫将被阻止的時間上限。
此上限是自1970年1月1日00:00:00 +0000(UTC)以來的絕對逾時(以秒和納秒為機關)
如果隊列已滿,預設情況下mq_send會阻塞,可以使用mq_timedsend函數設定阻塞逾時時間。
如果隊列為空,預設情況下mq_receive會阻塞,可以使用mq_timedreceive函數設定阻塞逾時時間。
由于預設情況下mq_send和mq_receive是阻塞進行調用,也可以通過mq_setattr來設定為O_NONBLOCK.
-
Asynchronous notification of message queue
//建立或者删除消息到達通知事件
int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
//sigevent 結構
#include <signal.h>
union sigval { /* Data passed with notification */
int sival_int; /* Integer value */
void *sival_ptr; /* Pointer value */
};
struct sigevent {
int sigev_notify; /* Notification method */
int sigev_signo; /* Notification signal */
union sigval sigev_value; /* Data passed with notification */
void (*sigev_notify_function) (union sigval);/* Function used for thread notification (SIGEV_THREAD) */
void *sigev_notify_attributes;/* Attributes for notification thread (SIGEV_THREAD) */
pid_t sigev_notify_thread_id; /* ID of thread to signal (SIGEV_THREAD_ID) */
};
Parameter Description:
- 如果sevp參數為非空,那麼目前程序希望在有一個消息到達所指定的先前為空的對列時得到通知。
- 如果sevp參數為空,而且目前程序被注冊為接收指定隊列的通知,那麼已存在的注冊将被撤銷。
sigev_notify :
SIGEV_NONE:空的提醒,事件發生時不做任何事情
SIGEV_SIGNAL:向程序發送sigev_signo中指定的信号,具體詳細的狀況參照上面的文檔,這涉及到sigaction的使用
SIGEV_THREAD:通知程序在一個新的線程中啟動sigev_notify_function函數,函數的實參是sigev_value,系統API自動啟動一個線程,我們不用顯式啟動。
Note:
- 任意時刻隻有一個程序可以被注冊為接收某個給定隊列的通知。
- 當有一個消息到達先前為空的消息隊列,而且已有一個程序被注冊為接收該隊列的通知時,隻有在沒有任何線程阻塞在該隊列的mq_receive調用中的前提下,通知才會發出。即說明,在mq_receive調用中的阻塞比任何通知的注冊都優先。
- 當通知被發送給它的注冊程序時,其注冊被撤消。程序必須再次調用mq_notify以重新注冊,注意:重新注冊要放在從消息隊列讀出消息之前而不是之後。
Posix消息隊列容許 異步事件通知,以告知何時有一個消息放置到某個空消息隊列中,這種通知有兩種方式可以選擇:
- 産生一個信号,通過信号處理函數進行處理消息.
struct sigevent sev = { 0 };
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = (void *)p; //線程入口函數
sev.sigev_notify_attributes = NULL;//線程屬性,根據需求設定
sev.sigev_value.sival_ptr = (void *)mqdes; // 傳遞給線程的參數
- 建立一個線程來執行一個指定的函數,在新的線程中處理消息.
struct sigevent sev = { 0 };
signal(signo,p); //設定信号處理函數
sev.sigev_notify = SIGEV_SIGNAL;
sev.sigev_signo = signo ; //信号值
-
消息隊列相關限制
mq_maxmsg: 隊列中最大的消息數
mq_msgsize: 給定消息的最大位元組數
MQ_OPEN_MAX: 一個程序能夠同時擁有的打開着消息隊列的最大數目
MQ_PRIO_MAX: 任意消息的最大優先級值+1
POSIX消息隊列本身的限制就是mq_attr中的mq_maxmsg和mq_msgsize
這兩個參數可以在調用mq_open建立一個消息隊列的時候設定。當這個設定是受到系統核心限制的。
MQ_OPEN_MAX和MQ_PRIO_MAX 定義在<unistd.h>中,可以在運作的時候聽過調用sysconf函數擷取.
ulimit -a |grep message
# POSIX message queues (bytes, -q) 819200
# 限制大小為800KB,該大小是整個消息隊列的大小,不僅僅是最大消息數*消息的最大大小;還包括消息隊列的額外開銷。
# POSIX消息隊列預設的最大消息數和消息的最大大小分别為:
# mq_maxmsg = 10
# mq_msgsize = 8192