天天看点

进程间通信(7) - 消息队列(System V)1.前言2.介绍3.消息结构模板4.创建消息5.发送消息6.接收消息7.控制消息8.消息队列的限制

目录

1.前言

2.介绍

3.消息结构模板

4.创建消息

5.发送消息

6.接收消息

7.控制消息

8.消息队列的限制

1.前言

本篇文章的所有例子,基于RHEL6.5平台(linux kernal: 2.6.32-431.el6.i686)。

2.介绍

System V消息队列是Open Group定义的XSI,不属于POSIX标准。System V IPC的历史相对很早,在上个世70年代后期有贝尔实验室的分支机构开发,80年代加入System V的系统内核中,后来商用UNIX系统基本都加入了System V IPC的功能。

System V消息队列相对于POSIX消息队列的区别主要是:

    --POSIX消息队列的读操作总是返回消息队列中优先级最高的最早消息,而对于System V消息队列可以返回任意指定优先级(通过消息类型)的消息。

    --当向一个空消息队列中写入一个消息时,POSIX消息队列允许产生一个信号或启动一个线程,System V消息队列不提供类似的机制。

系统内核都会为每一个System V消息队列维护一个信息结构,在Linux 2.6.32中的定义如下:

#include<bits/msq.h>
/* Obsolete, used only for backwards compatibility and libc5 compiles */
struct msqid_ds {
        struct ipc_perm msg_perm;         /*IPC对象的属性信息和访问权限 */
        struct msg *msg_first;            /* first message on queue,unused  */
        struct msg *msg_last;             /* last message in queue,unused */
        __kernel_time_t msg_stime;        /* last msgsnd time */
        __kernel_time_t msg_rtime;        /* last msgrcv time */
        __kernel_time_t msg_ctime;        /* last change time */
        unsigned long  msg_lcbytes;       /* Reuse junk fields for 32 bit */
        unsigned long  msg_lqbytes;       /* ditto */
        unsigned short msg_cbytes;        /* 当前队列中消息的字节数 */
        unsigned short msg_qnum;          /* number of messages in queue */
        unsigned short msg_qbytes;        /* 队列允许存放的最大字节数 */
        __kernel_ipc_pid_t msg_lspid;     /* pid of last msgsnd */
        __kernel_ipc_pid_t msg_lrpid;     /* last receive pid */
};
           

3.消息结构模板

/* Include the definition of msqid64_ds */
#include <asm/msgbuf.h>
/* message buffer for msgsnd and msgrcv calls */
struct msgbuf {
        long mtype;         /* type of message */
        char mtext[1];      /* message text */
};
           

4.创建消息

#include <sys/msg.h>
//成功返回非负消息队列描述符,失败返回-1  
int msgget(key_t key, int flag);
           

key:消息队列的键,用来创建一个消息队列。System V IPC都有一个key,作为IPC的外部标识符,创建成功后返回的描述符作为IPC的内部标识符使用。key的主要目的就是使不同进程在同一IPC汇合。key具体说可以有三种方式生成:

    · 不同的进程约定好的一个值;

    · 通过相同的路径名和项目ID,调用ftok()函数,生成一个键;该函数为key_t ftok(const char *pathname, int proj_id);该函数把从pathname导出的信息与id低8位组合成一个整数IPC键, 调用时pathname必须存在,若不存在ftok调用失败,返回-1,成功返回该整数IPC键值;

    · 还可以设置为IPC_PRIVATE,这样就会创建一个新的,唯一的IPC对象;然后将返回的描述符通过某种方式传递给其他进程;

flag:为该消息队列的读写权限组合,可以与IPC_CREAT 或IPC_EXCL相与,其中创建对列时都要使用IPC_CREAT。指定创建或打开消息队列的标志和读写权限(ipc_perm中的mode成员)。我们知道System V IPC定义了自己的操作标志和权限设置标志,而且都是通过该参数传递,这和open函数存在差别,open函数第三个参数mode用于传递文件的权限标志。System V IPC的操作标志包含:IPC_CREAT,IPC_EXCL,权限设置标志如下图:

进程间通信(7) - 消息队列(System V)1.前言2.介绍3.消息结构模板4.创建消息5.发送消息6.接收消息7.控制消息8.消息队列的限制

测试代码1:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int main() {
    key_t lKey;
    int nMsgId;
    if ((lKey = ftok("/etc/profile", 1)) == -1) {
        perror("ftok");
        exit(1);
    }
    //带参数IPC_CREAT和IPC_EXCL,如果队列不存在则创建队列,已存在则返回EEXIST
    if ((nMsgId = msgget(lKey, IPC_CREAT | IPC_EXCL | 0666)) == -1) {
        if (errno != EEXIST)//创建失败且不是由于队列已存在
        {
            perror("msgget");
            exit(2);
        }
        if ((nMsgId = msgget(lKey, 0)) == -1)//已存在
        {
            perror("msgget");
            exit(3);
        }
    }
    printf("MsgID=%d\n", nMsgId);
    return 0;
}
           

输出:

[[email protected] csdnblog]# ./a.out 

MsgID=0

[[email protected] csdnblog]# ipcs -q

------ Message Queues --------

key                 msqid      owner      perms      used-bytes   messages    

0x0100002a      0             root        666            0                  0 

测试代码2:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int main()
{
	key_t lKey;
	int nMsgId;
	if ((lKey = ftok("/etc/profile", 1)) == -1)
	{
		perror("ftok");
		exit(1);
	}
	//带参数IPC_CREAT和IPC_EXCL,如果队列不存在则创建队列,已存在则返回EEXIST
	if ((nMsgId = msgget(lKey, IPC_CREAT | IPC_EXCL | 0666)) == -1)
	{
		if (errno != EEXIST)//创建失败且不是由于队列已存在
		{
			perror("msgget");
			exit(2);
		}
		if ((nMsgId = msgget(lKey, 0)) == -1)//已存在
		{
			perror("msgget");
			exit(3);
		}
	}
	printf("MsgID=%d\n", nMsgId);
	return 0;
}
           

输出:

key:0x2

descriptor id:32769

实现System V IPC的任何系统都提供两个特殊的程序ipcs和ipcrm。ipcs输出IPC的各种信息,ipcrm则用于删除各种System V IPC。由于System V IPC不属于POSIX标准,所以这两个命令也未被标准化。下面是通过ipcs命令来查看刚刚创建的消息队列。

[[email protected] csdnblog]# ipcs -q -i 32769

Message Queue msqid=32769

uid=0   gid=0   cuid=0  cgid=0  mode=0666

cbytes=0        qbytes=65536    qnum=0  lspid=0 lrpid=0

send_time=Not set

rcv_time=Not set

change_time=Wed Jun 17 22:49:00 2015 

5.发送消息

#include <sys/msg.h>
//成功返回0,失败返回-1
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
           

此函数发送消息到指定的消息对列

msqid:消息队列的描述符;

msgp:指向存放消息的缓冲区,该缓冲区中包含消息类型和消息体两部分内容。该缓冲区的结构是由用户定义的,在<sys/msg.h>中有关于该缓冲区结构定义的参考模版:

struct msgbuf  
{  
    long int mtype;             /* type of received/sent message */  
    char mtext[1];              /* text of the message */  
};  
           

缓冲区的开头是一个long型的消息类型,该消息类型必须是一个非负数。紧跟在消息类型后面的是消息体部分(如果消息长度大于0),参考模版中定义的mtext只是说明消息体,该部分可以自定义长度。我们自己的应用都会定义特定的消息结构。

msgsz:缓冲区中消息体部分的长度;

msgflg:设置操作标志。可以为0,IPC_NOWAIT;用于在消息队列中没有可用的空间时,调用线程采用何种操作方式。

    标志为IPC_NOWAIT,表示msgsnd操作以非阻塞的方式进行,在消息队列中没有可用的空间时,msgsnd操作会立刻返回。并指定EAGAIN错误;

    标志为0,表示msgsnd操作以阻塞的方式进行,这种情况下在消息队列中没有可用的空间时调用线程会被阻塞,直到下面的情况发生:

        --等到有存放消息的空间;

        --消息队列从系统中删除,这种情况下回返回一个EIDRM错误;

        --调用线程被某个捕捉到的信号中断,这种情况下返回一个EINTR错误;

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
typedef struct
{
	long int nType;
	char szText[256];
}MSG;

int main()
{
	key_t lKey;
	int nMsgId;
	MSG msg;
	if ((lKey = ftok("/etc/profile", 1)) == -1)//生成键值
	{
		perror("ftok");
		exit(1);
	}
	if ((nMsgId = msgget(lKey, IPC_CREAT | IPC_EXCL | 0666)) == -1)//创建消息队列
	{
		if (errno != EEXIST)
		{
			perror("msgget");
			exit(2);
		}
		if ((nMsgId = msgget(lKey, 0)) == -1)
		{
			perror("msgget");
			exit(3);
		}
	}
	memset(&msg, 0x00, sizeof(MSG));//清空队列
	msg.nType = 2;//指定消息类型为2
	memcpy(msg.szText, "123456", 6);//指定消息内容
	if (msgsnd(nMsgId, (const void *)&msg, strlen(msg.szText), IPC_NOWAIT) < 0)//非阻塞发送消息
	{
		perror("msgsnd");
	}
	return 0;
}
           

输出:

[[email protected] csdnblog]# ipcs -q

------ Message Queues --------

key               msqid      owner      perms      used-bytes   messages    

0x0100002a    0               root           666          6                    1  

可以看到,队列中已经有一条消息,长度6字节。

6.接收消息

System V消息队列的读取消息使用下面的函数接口:

#include <sys/msg.h>  
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); 
                      //成功返回接收到的消息的消息体的字节数,失败返回-1
           

msqid:消息队列的描述符;

msgp:指向待存放消息的缓冲区,该缓冲区中将会存放接收到的消息的消息类型和消息体两部分内容。该缓冲区的结构是由用户定义的,和msgsnd相对应。

msgsz:缓冲区中能存放消息体部分的最大长度,这也是该函数能返回的最大数据量;该字段的大小应该是sizeof(msg buffer) - sizeof(long);

msgtyp:希望从消息队列中获取的消息类型。

    --msgtyp为0,返回消息队列中的第一个消息;

    --msgtyp > 0,返回该消息类型的第一个消息;

    --msgtyp < 0,返回小于或等于msgtyp绝对值的消息中类型最小的第一个消息;

msgflg:设置操作标志。可以为0,IPC_NOWAIT,MSG_NOERROR;用于在消息队列中没有可用的指定消息时,调用线程采用何种操作方式。

标志为IPC_NOWAIT,表示msgrcv操作以非阻塞的方式进行,在消息队列中没有可用的指定消息时,msgrcv操作会立刻返回,并设定errno为ENOMSG。

标志为0,表示msgrcv操作是阻塞操作,直到下面的情况发生:

    --消息队列中有一个所请求的消息类型可以获取;

    --消息队列从系统中删除,这种情况下回返回一个EIDRM错误;

    --调用线程被某个捕捉到的信号中断,这种情况下返回一个EINTR错误;

标志为MSG_NOERROR,表示接收到的消息的消息体的长度大于msgsz长度时,msgrcv采取的操作。如果设置了该标志msgrcv在这种情况下回截断数据部分,而不返回错误,否则返回一个E2BIG错误。

测试代码:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
typedef struct
{
    long int nType;
    char szText[256];
}MSG;
main()
{
    key_t lKey;
    int n,nMsgId;
    MSG msg;
    if((lKey = ftok("/etc/profile",1)) == -1)
    {
        perror("ftok");
        exit(1);
    }
    if((nMsgId = msgget(lKey,0)) == -1)
    {
        perror("ftok");
        exit(2);
    }
    memset(&msg,0x00,sizeof(MSG));
    if((n = msgrcv(nMsgId,(void *)&msg,sizeof(msg.szText),2L,0)) < 0)//从队列接收消息,读出以后就不存在了
    {
        perror("msgrcv");
    }
    else
    {
        printf("msgrcv return length=[%d] text=[%s]\n",n,msg.szText);//输出
    }
    return 0;
}
           

输出:

[[email protected] csdnblog]# ./a.out 

msgrcv return length=[6] text=[123456]

7.控制消息

对System V消息队列的删除,属性的设置和获取等控制操作要使用下面的函数接口: 

#include <sys/msg.h>  
int msgctl(int msqid, int cmd, struct msqid_ds *buf);  
                        //成功返回0,失败返回-1  
           

msqid:消息队列的描述符;

cmd:控制操作的命令,SUS标准提供以下三个命令:

    · IPC_RMID,删除msgid指定的消息队列。执行该命令系统会立刻把该消息队列从内核中删除,该消息队列中的所有消息将会被丢弃。对于该命令,buf参数可忽略。

这和已经讨论过的POSIX消息队列有很大差别,POSIX消息队列通过调用mq_unlink来从内核中删除一个消息队列,但消息队列的真正析构会在最后一个mq_close结束后发生。

    · IPC_SET,根据buf的所指的值来设置消息队列msqid_ds结构中的msg_perm.uid,msg_perm.gid,msg_perm.mode,msg_qbytes四个成员。  

    · IPC_STAT,通过buf返回当前消息队列的msqid_ds结构。

    · 在Linux下还有例如IPC_INFO,MSG_INFO等命令,具体可以参考Linux手册;

buf:指向msqid_ds结构的指针;

测试代码1:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
typedef struct
{
    long int nType;
    char szText[256];
}MSG;
main()
{
    key_t lKey;
    int n,nMsgId;
    MSG msg;
    struct msqid_ds qds;
    if((lKey = ftok("/etc/profile",1)) == -1)
    {
        perror("ftok");
        exit(1);
    }
    if((nMsgId = msgget(lKey,0)) == -1)
    {
        perror("ftok");
        exit(2);
    }
    memset(&qds,0x00,sizeof(struct msqid_ds));
    if(msgctl(nMsgId,IPC_STAT,&qds) < 0)//获取消息队列属性,获取状态放pds中
    {
        perror("msgctl IPC_STAT");
        exit(3);
    }
    printf("msg_perm.mode=%d\n",qds.msg_perm.mode);
    qds.msg_perm.mode &= (~0222);//去除消息队列的写权限
    if(msgctl(nMsgId,IPC_SET,&qds) < 0)//设置消息队列权限
    {
        perror("msgctl IPC_SET");
        exit(4);
    }
    memset(&msg,0x00,sizeof(MSG));
    msg.nType = 2;
    memcpy(msg.szText,"12345",5);
    if(msgsnd(nMsgId,(void *)&msg,5,0) < 0)//发送消息
    {
        perror("msgsnd");
    }
    if(msgctl(nMsgId,IPC_RMID,NULL) < 0)//删除消息
    {
        perror("msgctl IPC_RMID");
        exit(5);
    }
    return 0;
}
           

说明: (~0222)取反后做与实际上就是去除其他用户的写权限,在C语言中,八进制常用用前缀表示。

测试代码二:

#include <iostream>  
#include <cstring>  
#include <errno.h>  
  
#include <unistd.h>  
#include <fcntl.h>  
#include <sys/msg.h>  
  
using namespace std;  
  
#define  PATH_NAME "/tmp/anonymQueue"  
  
key_t CreateKey(const char *pathName)  
{  
    int fd;  
    if ((fd = open(PATH_NAME, O_CREAT, 0666)) < 0)  
    {  
        cout<<"open file "<<PATH_NAME<<"failed.";  
        cout<<strerror(errno)<<endl;  
        return -1;  
    }  
  
    close(fd);  
  
    return ftok(PATH_NAME, 0);  
}  
  
int main(int argc, char **argv)  
{  
    key_t key;  
    key = CreateKey(PATH_NAME);  
  
    int msgID;  
    if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)  
    {  
        cout<<"open message queue failed...";  
        cout<<strerror(errno)<<endl;  
        return -1;  
    }  
  
    msqid_ds msgInfo;  
    msgctl(msgID, IPC_STAT, &msgInfo);  
  
    cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl;  
    cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl;  
    cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl;  
    return 0;  
}
           

输出:

msg_qbytes:65536

msg_qnum:0

msg_cbytes:0

关于消息队列中允许存放最大的字节数可以通过IPC_SET命令进行修改,该修改只能针对本消息队列生效。如下测试代码:

int main(int argc, char **argv)  
{  
    key_t key;  
    key = CreateKey(PATH_NAME);  
  
    int msgID;  
    if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)  
    {  
        cout<<"open message queue failed...";  
        cout<<strerror(errno)<<endl;  
        return -1;  
    }  
  
    msqid_ds msgInfo;  
    msgctl(msgID, IPC_STAT, &msgInfo);  
  
    cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl;  
    cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl;  
    cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl;  
  
    msgInfo.msg_qbytes = 6553600;  
    if (msgctl(msgID, IPC_SET, &msgInfo) < 0)  
    {  
        cout<<"set message queue failed...";  
        cout<<strerror(errno)<<endl;  
        return -1;  
    }  
  
    msgctl(msgID, IPC_STAT, &msgInfo);  
    cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl;  
    cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl;  
    cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl;  
    return 0;  
}
           

输出:

msg_qbytes:65536

msg_qnum:0

msg_cbytes:0

msg_qbytes:6553600

msg_qnum:0

msg_cbytes:0

8.消息队列的限制

对System V IPC,系统往往会存在一些限制,对于消息队列,在Linux2.6.32中,系统内核存在以下限制:

[[email protected] csdnblog]# sysctl -a|grep msg

kernel.msgmax = 65536  //每个消息的最大字节数

kernel.msgmni = 1736   //系统范围内允许存在的最大消息队列数

kernel.msgmnb = 65536  //一个消息队列上允许的最大字节数

kernel.auto_msgmni = 1

fs.mqueue.msg_max = 10

fs.mqueue.msgsize_max = 8192

fs.mqueue.msg_default = 10

fs.mqueue.msgsize_default = 8192

对于System V消息队列一般内核还有一个限制:系统范围内的最大消息数,在Linux下这个限制由msgmnb*msgmni决定。

上面已经说过可以通过IPC_SET来设置使用中的消息队列的最大字节数。但是要在系统范围内对内核限制进行修改,在Linux下面可以通过修改/etc/sysctl.conf内核参数配置文件,然后配合sysctl命令来对内核参数进行设置。

继续阅读