天天看點

程序間通信(7) - 消息隊列(System V)1.前言2.介紹3.消息結構模闆4.建立消息5.發送消息6.接收消息7.控制消息8.消息隊列的限制

目錄

1.前言

2.介紹

3.消息結構模闆

4.建立消息

5.發送消息

6.接收消息

7.控制消息

8.消息隊列的限制

1.前言

本篇文章的所有例子,基于RHEL6.5平台(linux kernal: 2.6.32-431.el6.i686)。

2.介紹

System V消息隊列是Open Group定義的XSI,不屬于POSIX标準。System V IPC的曆史相對很早,在上個世70年代後期有貝爾實驗室的分支機構開發,80年代加入System V的系統核心中,後來商用UNIX系統基本都加入了System V IPC的功能。

System V消息隊列相對于POSIX消息隊列的差別主要是:

    --POSIX消息隊列的讀操作總是傳回消息隊列中優先級最高的最早消息,而對于System V消息隊列可以傳回任意指定優先級(通過消息類型)的消息。

    --當向一個空消息隊列中寫入一個消息時,POSIX消息隊列允許産生一個信号或啟動一個線程,System V消息隊列不提供類似的機制。

系統核心都會為每一個System V消息隊列維護一個資訊結構,在Linux 2.6.32中的定義如下:

#include<bits/msq.h>
/* Obsolete, used only for backwards compatibility and libc5 compiles */
struct msqid_ds {
        struct ipc_perm msg_perm;         /*IPC對象的屬性資訊和通路權限 */
        struct msg *msg_first;            /* first message on queue,unused  */
        struct msg *msg_last;             /* last message in queue,unused */
        __kernel_time_t msg_stime;        /* last msgsnd time */
        __kernel_time_t msg_rtime;        /* last msgrcv time */
        __kernel_time_t msg_ctime;        /* last change time */
        unsigned long  msg_lcbytes;       /* Reuse junk fields for 32 bit */
        unsigned long  msg_lqbytes;       /* ditto */
        unsigned short msg_cbytes;        /* 目前隊列中消息的位元組數 */
        unsigned short msg_qnum;          /* number of messages in queue */
        unsigned short msg_qbytes;        /* 隊列允許存放的最大位元組數 */
        __kernel_ipc_pid_t msg_lspid;     /* pid of last msgsnd */
        __kernel_ipc_pid_t msg_lrpid;     /* last receive pid */
};
           

3.消息結構模闆

/* Include the definition of msqid64_ds */
#include <asm/msgbuf.h>
/* message buffer for msgsnd and msgrcv calls */
struct msgbuf {
        long mtype;         /* type of message */
        char mtext[1];      /* message text */
};
           

4.建立消息

#include <sys/msg.h>
//成功傳回非負消息隊列描述符,失敗傳回-1  
int msgget(key_t key, int flag);
           

key:消息隊列的鍵,用來建立一個消息隊列。System V IPC都有一個key,作為IPC的外部辨別符,建立成功後傳回的描述符作為IPC的内部辨別符使用。key的主要目的就是使不同程序在同一IPC彙合。key具體說可以有三種方式生成:

    · 不同的程序約定好的一個值;

    · 通過相同的路徑名和項目ID,調用ftok()函數,生成一個鍵;該函數為key_t ftok(const char *pathname, int proj_id);該函數把從pathname導出的資訊與id低8位組合成一個整數IPC鍵, 調用時pathname必須存在,若不存在ftok調用失敗,傳回-1,成功傳回該整數IPC鍵值;

    · 還可以設定為IPC_PRIVATE,這樣就會建立一個新的,唯一的IPC對象;然後将傳回的描述符通過某種方式傳遞給其他程序;

flag:為該消息隊列的讀寫權限組合,可以與IPC_CREAT 或IPC_EXCL相與,其中建立對列時都要使用IPC_CREAT。指定建立或打開消息隊列的标志和讀寫權限(ipc_perm中的mode成員)。我們知道System V IPC定義了自己的操作标志和權限設定标志,而且都是通過該參數傳遞,這和open函數存在差别,open函數第三個參數mode用于傳遞檔案的權限标志。System V IPC的操作标志包含:IPC_CREAT,IPC_EXCL,權限設定标志如下圖:

程式間通信(7) - 消息隊列(System V)1.前言2.介紹3.消息結構模闆4.建立消息5.發送消息6.接收消息7.控制消息8.消息隊列的限制

測試代碼1:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int main() {
    key_t lKey;
    int nMsgId;
    if ((lKey = ftok("/etc/profile", 1)) == -1) {
        perror("ftok");
        exit(1);
    }
    //帶參數IPC_CREAT和IPC_EXCL,如果隊列不存在則建立隊列,已存在則傳回EEXIST
    if ((nMsgId = msgget(lKey, IPC_CREAT | IPC_EXCL | 0666)) == -1) {
        if (errno != EEXIST)//建立失敗且不是由于隊列已存在
        {
            perror("msgget");
            exit(2);
        }
        if ((nMsgId = msgget(lKey, 0)) == -1)//已存在
        {
            perror("msgget");
            exit(3);
        }
    }
    printf("MsgID=%d\n", nMsgId);
    return 0;
}
           

輸出:

[[email protected] csdnblog]# ./a.out 

MsgID=0

[[email protected] csdnblog]# ipcs -q

------ Message Queues --------

key                 msqid      owner      perms      used-bytes   messages    

0x0100002a      0             root        666            0                  0 

測試代碼2:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int main()
{
	key_t lKey;
	int nMsgId;
	if ((lKey = ftok("/etc/profile", 1)) == -1)
	{
		perror("ftok");
		exit(1);
	}
	//帶參數IPC_CREAT和IPC_EXCL,如果隊列不存在則建立隊列,已存在則傳回EEXIST
	if ((nMsgId = msgget(lKey, IPC_CREAT | IPC_EXCL | 0666)) == -1)
	{
		if (errno != EEXIST)//建立失敗且不是由于隊列已存在
		{
			perror("msgget");
			exit(2);
		}
		if ((nMsgId = msgget(lKey, 0)) == -1)//已存在
		{
			perror("msgget");
			exit(3);
		}
	}
	printf("MsgID=%d\n", nMsgId);
	return 0;
}
           

輸出:

key:0x2

descriptor id:32769

實作System V IPC的任何系統都提供兩個特殊的程式ipcs和ipcrm。ipcs輸出IPC的各種資訊,ipcrm則用于删除各種System V IPC。由于System V IPC不屬于POSIX标準,是以這兩個指令也未被标準化。下面是通過ipcs指令來檢視剛剛建立的消息隊列。

[[email protected] csdnblog]# ipcs -q -i 32769

Message Queue msqid=32769

uid=0   gid=0   cuid=0  cgid=0  mode=0666

cbytes=0        qbytes=65536    qnum=0  lspid=0 lrpid=0

send_time=Not set

rcv_time=Not set

change_time=Wed Jun 17 22:49:00 2015 

5.發送消息

#include <sys/msg.h>
//成功傳回0,失敗傳回-1
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
           

此函數發送消息到指定的消息對列

msqid:消息隊列的描述符;

msgp:指向存放消息的緩沖區,該緩沖區中包含消息類型和消息體兩部分内容。該緩沖區的結構是由使用者定義的,在<sys/msg.h>中有關于該緩沖區結構定義的參考模版:

struct msgbuf  
{  
    long int mtype;             /* type of received/sent message */  
    char mtext[1];              /* text of the message */  
};  
           

緩沖區的開頭是一個long型的消息類型,該消息類型必須是一個非負數。緊跟在消息類型後面的是消息體部分(如果消息長度大于0),參考模版中定義的mtext隻是說明消息體,該部分可以自定義長度。我們自己的應用都會定義特定的消息結構。

msgsz:緩沖區中消息體部分的長度;

msgflg:設定操作标志。可以為0,IPC_NOWAIT;用于在消息隊列中沒有可用的空間時,調用線程采用何種操作方式。

    标志為IPC_NOWAIT,表示msgsnd操作以非阻塞的方式進行,在消息隊列中沒有可用的空間時,msgsnd操作會立刻傳回。并指定EAGAIN錯誤;

    标志為0,表示msgsnd操作以阻塞的方式進行,這種情況下在消息隊列中沒有可用的空間時調用線程會被阻塞,直到下面的情況發生:

        --等到有存放消息的空間;

        --消息隊列從系統中删除,這種情況下回傳回一個EIDRM錯誤;

        --調用線程被某個捕捉到的信号中斷,這種情況下傳回一個EINTR錯誤;

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
typedef struct
{
	long int nType;
	char szText[256];
}MSG;

int main()
{
	key_t lKey;
	int nMsgId;
	MSG msg;
	if ((lKey = ftok("/etc/profile", 1)) == -1)//生成鍵值
	{
		perror("ftok");
		exit(1);
	}
	if ((nMsgId = msgget(lKey, IPC_CREAT | IPC_EXCL | 0666)) == -1)//建立消息隊列
	{
		if (errno != EEXIST)
		{
			perror("msgget");
			exit(2);
		}
		if ((nMsgId = msgget(lKey, 0)) == -1)
		{
			perror("msgget");
			exit(3);
		}
	}
	memset(&msg, 0x00, sizeof(MSG));//清空隊列
	msg.nType = 2;//指定消息類型為2
	memcpy(msg.szText, "123456", 6);//指定消息内容
	if (msgsnd(nMsgId, (const void *)&msg, strlen(msg.szText), IPC_NOWAIT) < 0)//非阻塞發送消息
	{
		perror("msgsnd");
	}
	return 0;
}
           

輸出:

[[email protected] csdnblog]# ipcs -q

------ Message Queues --------

key               msqid      owner      perms      used-bytes   messages    

0x0100002a    0               root           666          6                    1  

可以看到,隊列中已經有一條消息,長度6位元組。

6.接收消息

System V消息隊列的讀取消息使用下面的函數接口:

#include <sys/msg.h>  
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); 
                      //成功傳回接收到的消息的消息體的位元組數,失敗傳回-1
           

msqid:消息隊列的描述符;

msgp:指向待存放消息的緩沖區,該緩沖區中将會存放接收到的消息的消息類型和消息體兩部分内容。該緩沖區的結構是由使用者定義的,和msgsnd相對應。

msgsz:緩沖區中能存放消息體部分的最大長度,這也是該函數能傳回的最大資料量;該字段的大小應該是sizeof(msg buffer) - sizeof(long);

msgtyp:希望從消息隊列中擷取的消息類型。

    --msgtyp為0,傳回消息隊列中的第一個消息;

    --msgtyp > 0,傳回該消息類型的第一個消息;

    --msgtyp < 0,傳回小于或等于msgtyp絕對值的消息中類型最小的第一個消息;

msgflg:設定操作标志。可以為0,IPC_NOWAIT,MSG_NOERROR;用于在消息隊列中沒有可用的指定消息時,調用線程采用何種操作方式。

标志為IPC_NOWAIT,表示msgrcv操作以非阻塞的方式進行,在消息隊列中沒有可用的指定消息時,msgrcv操作會立刻傳回,并設定errno為ENOMSG。

标志為0,表示msgrcv操作是阻塞操作,直到下面的情況發生:

    --消息隊列中有一個所請求的消息類型可以擷取;

    --消息隊列從系統中删除,這種情況下回傳回一個EIDRM錯誤;

    --調用線程被某個捕捉到的信号中斷,這種情況下傳回一個EINTR錯誤;

标志為MSG_NOERROR,表示接收到的消息的消息體的長度大于msgsz長度時,msgrcv采取的操作。如果設定了該标志msgrcv在這種情況下回截斷資料部分,而不傳回錯誤,否則傳回一個E2BIG錯誤。

測試代碼:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
typedef struct
{
    long int nType;
    char szText[256];
}MSG;
main()
{
    key_t lKey;
    int n,nMsgId;
    MSG msg;
    if((lKey = ftok("/etc/profile",1)) == -1)
    {
        perror("ftok");
        exit(1);
    }
    if((nMsgId = msgget(lKey,0)) == -1)
    {
        perror("ftok");
        exit(2);
    }
    memset(&msg,0x00,sizeof(MSG));
    if((n = msgrcv(nMsgId,(void *)&msg,sizeof(msg.szText),2L,0)) < 0)//從隊列接收消息,讀出以後就不存在了
    {
        perror("msgrcv");
    }
    else
    {
        printf("msgrcv return length=[%d] text=[%s]\n",n,msg.szText);//輸出
    }
    return 0;
}
           

輸出:

[[email protected] csdnblog]# ./a.out 

msgrcv return length=[6] text=[123456]

7.控制消息

對System V消息隊列的删除,屬性的設定和擷取等控制操作要使用下面的函數接口: 

#include <sys/msg.h>  
int msgctl(int msqid, int cmd, struct msqid_ds *buf);  
                        //成功傳回0,失敗傳回-1  
           

msqid:消息隊列的描述符;

cmd:控制操作的指令,SUS标準提供以下三個指令:

    · IPC_RMID,删除msgid指定的消息隊列。執行該指令系統會立刻把該消息隊列從核心中删除,該消息隊列中的所有消息将會被丢棄。對于該指令,buf參數可忽略。

這和已經讨論過的POSIX消息隊列有很大差别,POSIX消息隊列通過調用mq_unlink來從核心中删除一個消息隊列,但消息隊列的真正析構會在最後一個mq_close結束後發生。

    · IPC_SET,根據buf的所指的值來設定消息隊列msqid_ds結構中的msg_perm.uid,msg_perm.gid,msg_perm.mode,msg_qbytes四個成員。  

    · IPC_STAT,通過buf傳回目前消息隊列的msqid_ds結構。

    · 在Linux下還有例如IPC_INFO,MSG_INFO等指令,具體可以參考Linux手冊;

buf:指向msqid_ds結構的指針;

測試代碼1:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
typedef struct
{
    long int nType;
    char szText[256];
}MSG;
main()
{
    key_t lKey;
    int n,nMsgId;
    MSG msg;
    struct msqid_ds qds;
    if((lKey = ftok("/etc/profile",1)) == -1)
    {
        perror("ftok");
        exit(1);
    }
    if((nMsgId = msgget(lKey,0)) == -1)
    {
        perror("ftok");
        exit(2);
    }
    memset(&qds,0x00,sizeof(struct msqid_ds));
    if(msgctl(nMsgId,IPC_STAT,&qds) < 0)//擷取消息隊列屬性,擷取狀态放pds中
    {
        perror("msgctl IPC_STAT");
        exit(3);
    }
    printf("msg_perm.mode=%d\n",qds.msg_perm.mode);
    qds.msg_perm.mode &= (~0222);//去除消息隊列的寫權限
    if(msgctl(nMsgId,IPC_SET,&qds) < 0)//設定消息隊列權限
    {
        perror("msgctl IPC_SET");
        exit(4);
    }
    memset(&msg,0x00,sizeof(MSG));
    msg.nType = 2;
    memcpy(msg.szText,"12345",5);
    if(msgsnd(nMsgId,(void *)&msg,5,0) < 0)//發送消息
    {
        perror("msgsnd");
    }
    if(msgctl(nMsgId,IPC_RMID,NULL) < 0)//删除消息
    {
        perror("msgctl IPC_RMID");
        exit(5);
    }
    return 0;
}
           

說明: (~0222)取反後做與實際上就是去除其他使用者的寫權限,在C語言中,八進制常用用字首表示。

測試代碼二:

#include <iostream>  
#include <cstring>  
#include <errno.h>  
  
#include <unistd.h>  
#include <fcntl.h>  
#include <sys/msg.h>  
  
using namespace std;  
  
#define  PATH_NAME "/tmp/anonymQueue"  
  
key_t CreateKey(const char *pathName)  
{  
    int fd;  
    if ((fd = open(PATH_NAME, O_CREAT, 0666)) < 0)  
    {  
        cout<<"open file "<<PATH_NAME<<"failed.";  
        cout<<strerror(errno)<<endl;  
        return -1;  
    }  
  
    close(fd);  
  
    return ftok(PATH_NAME, 0);  
}  
  
int main(int argc, char **argv)  
{  
    key_t key;  
    key = CreateKey(PATH_NAME);  
  
    int msgID;  
    if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)  
    {  
        cout<<"open message queue failed...";  
        cout<<strerror(errno)<<endl;  
        return -1;  
    }  
  
    msqid_ds msgInfo;  
    msgctl(msgID, IPC_STAT, &msgInfo);  
  
    cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl;  
    cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl;  
    cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl;  
    return 0;  
}
           

輸出:

msg_qbytes:65536

msg_qnum:0

msg_cbytes:0

關于消息隊列中允許存放最大的位元組數可以通過IPC_SET指令進行修改,該修改隻能針對本消息隊列生效。如下測試代碼:

int main(int argc, char **argv)  
{  
    key_t key;  
    key = CreateKey(PATH_NAME);  
  
    int msgID;  
    if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)  
    {  
        cout<<"open message queue failed...";  
        cout<<strerror(errno)<<endl;  
        return -1;  
    }  
  
    msqid_ds msgInfo;  
    msgctl(msgID, IPC_STAT, &msgInfo);  
  
    cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl;  
    cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl;  
    cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl;  
  
    msgInfo.msg_qbytes = 6553600;  
    if (msgctl(msgID, IPC_SET, &msgInfo) < 0)  
    {  
        cout<<"set message queue failed...";  
        cout<<strerror(errno)<<endl;  
        return -1;  
    }  
  
    msgctl(msgID, IPC_STAT, &msgInfo);  
    cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl;  
    cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl;  
    cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl;  
    return 0;  
}
           

輸出:

msg_qbytes:65536

msg_qnum:0

msg_cbytes:0

msg_qbytes:6553600

msg_qnum:0

msg_cbytes:0

8.消息隊列的限制

對System V IPC,系統往往會存在一些限制,對于消息隊列,在Linux2.6.32中,系統核心存在以下限制:

[[email protected] csdnblog]# sysctl -a|grep msg

kernel.msgmax = 65536  //每個消息的最大位元組數

kernel.msgmni = 1736   //系統範圍内允許存在的最大消息隊列數

kernel.msgmnb = 65536  //一個消息隊列上允許的最大位元組數

kernel.auto_msgmni = 1

fs.mqueue.msg_max = 10

fs.mqueue.msgsize_max = 8192

fs.mqueue.msg_default = 10

fs.mqueue.msgsize_default = 8192

對于System V消息隊列一般核心還有一個限制:系統範圍内的最大消息數,在Linux下這個限制由msgmnb*msgmni決定。

上面已經說過可以通過IPC_SET來設定使用中的消息隊列的最大位元組數。但是要在系統範圍内對核心限制進行修改,在Linux下面可以通過修改/etc/sysctl.conf核心參數配置檔案,然後配合sysctl指令來對核心參數進行設定。

繼續閱讀