天天看點

Linux程序間通信 - 消息隊列

1.1.  什麼是消息隊列

消息隊列提供了一種從一個程序向另一個程序發送一個資料塊的方法。每個資料塊都被認為含有一個類型,接收程序可以獨立地接收含有不同類型的資料結構。我們可以通過發送消息來避免命名管道的同步和阻塞問題。但是消息隊列與命名管道一樣,每個資料塊都有一個最大長度的限制。

1.2.  相關的接口函數介紹

Linux提供了一系列消息隊列的函數接口來讓我們友善地使用它來實作程序間的通信。它的用法與其他兩個System V PIC機制,即信号量和共享記憶體相似。

1、    ftok函數: 動态擷取key值

功  能:把從path導出的資訊與id的低序8位組合成一個key_t值(也稱IPC鍵)
原  型:key_t ftok(const char  *path, int id);
傳回值:成功則傳回key, 否則傳回 -1
參  數:path:已存在的檔案或目錄(必須存在)
        id:項目ID,取低序8位值,故有效範圍為(1  - 255),
說  明:1)  如果使用同一個項目ID,那麼對于不同檔案的兩個路徑名可能産生相同的key值。
        2)  在通路同一共享記憶體的多個程序先後調用ftok函數的時間段中,如果 path指定的檔案(或目錄)
            被删除且重新建立,可能得到不一樣的鍵值
        3) 必須path檔案(或目錄)存在才可建立成功,和path的所屬權限無關,即目前使用者沒有該path的通路
            權限也可以建立成功
原  理:ftok的典型實作調用stat函數,然後組合以下三個值:
        1.path所在的檔案系統的資訊(stat結構的st_dev成員) 
        2.該檔案在本檔案系統内的索引節點号(stat結構的st_ino成員),也稱i節點
        3.id的低序8位(不能為0)
       key的32位值 = 0X + id的低序8位 + stat.st_dev的低序8位 + stat.st_ino的低序16位      

2、    msgget函數:建立消息

功  能:建立一個新隊列或打開一個存在的隊列
原  型:int msgget(key_t, key, int msgflg);
傳回值:成功則傳回與鍵值key相對應的消息隊列描述字qid,否則傳回 -1
參  數:key:鍵值,可由ftok擷取,或者直接使用宏定義的整數值,或者IPC_PRIVATE
        msgflg:消息标志:IPC_CREAT、IPC_EXCL
說  明:IPC_PRIVATE:建立一個該程序獨占的消息隊列,其它程序不能通路該消息隊列
       IPC_CREAT:若消息隊列不存在,建立一個新的消息隊列,否則,傳回存在的消息隊列
       IPC_CREAT | IPC_EXCL:IPC_EXCL标志本身沒有多大意義,與IPC_CREAT一起使用,保證隻建立新
            的消息隊列,若對應key的消息隊列已經存在,則傳回錯誤
       IPC_NOWAIT:小隊列以非阻塞的方式擷取(若不能擷取,立即傳回錯誤)
原  理:1) 如果key == IPC_PRIVATE,則申請一塊記憶體,建立一個新的消息隊列(資料結構msqid_ds),
            将其初始化後加入到msgque向量表中的某個空位置處,傳回标示符
        2) 在msgque向量表中找鍵值為key的消息隊列,如果沒有找到,結果有二:
            msgflag表示不建立新的隊列,則錯誤傳回
            msgflag表示要建立新的隊列(IPC_CREAT),則建立新消息隊列,建立過程如1)
        3) 如果在msgque向量表中找到了鍵值為key的消息隊列,則有以下情況:
            如果msgflg表示一定要建立新的消息隊列而且不允許有相同鍵值的隊列存在,則錯誤傳回。
            如果找到的隊列是不能用的或已經損壞的隊列,則錯誤傳回
            認證和存取權限檢查,如果該隊列不允許msgflg要求的存取,則錯誤傳回
            正常,傳回隊列的辨別符      

3、    msgsnd函數:發送消息

功  能:發送一個消息到消息對列
原  型:int msgsnd (int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg)
傳回值:0:成功
        -1:非阻塞方式通路滿消息隊列傳回
        EACCES:沒有該消息隊列寫權限
        EAGAIN:msgflg = IPC_NOWAIT,并且已達到整個消息的最大長度(msg_qbytes)
        EFAULT:消息隊列位址msgp無法擷取
        EIDRM:消息隊列已經被删除
        EINTR:消息隊列等待寫入的時候被中斷
        EINVAL:無效msqid、沒有活動的消息類型值、無效msgsz值(小于0或大于系統消息隊列最大值MSGMAX)
        ENOMEM:記憶體不夠
參  數:msqid:消息隊列qid
        msgp:消息内容結構體指針
        msgsz:消息消息内容結構體中,除了消息類型msg_type之後的大小
        msgflg:消息标志:
            0(忽略該标志位,以阻塞的方式發送消息到消息隊列)、
            IPC_NOWAIT(以非阻塞的方式發送消息,若消息隊列滿,函數立即傳回EAGAIN)
說  明:msgsz可以優化成實際資料的長度,相比配置設定空間的長度,可以增加單個隊列中的消息個數
原  理:1)  計算id = (unsigned int) msqid % MSGMNI,然後根據id在linux系統消息隊列向量msgque[MSGMNI]中
            查找對應的消息隊列,并進行認證檢查,合法性檢查
        2)  如果隊列已滿,以可中斷等待狀态(TASK_INTERRUPTIBLE)将目前程序挂起在wwait等待隊列(發送
            消息等待隊列)上(msgflag==0)
        3)  否則,根據msgbuf的大小申請一塊空間,并在其上建立一個消息資料結構struct msg(核心空間),
            将消息緩沖區中的消息内容拷貝到該記憶體塊中消息頭的後面(從使用者空間拷貝到核心空間)
        4)  将消息資料結構加入到消息隊列的隊尾,修改隊列msqid_ds的相應參數
        5)  喚醒在該消息隊列的rwait程序隊列(讀等待程序隊列)上等待讀的程序,并傳回      

4、    msgrcv函數:接收消息

功  能:從消息隊列接收一個消息到msgbuf*
原  型:int msgrcv(int msqid, struct msgbuf *msgp, size_t msgsz,long msgtyp, int msgflg)
傳回值:成功傳回讀取消息的實際位元組數
        -1:消息長度大于msgsz
        E2BIG:msgflg ≠ MSG_NOERROR,并且實際消息的長度大于msgsz
        EACCES:沒有該消息隊列讀權限
        EAGAIN:msgflg = IPC_NOWAIT,并且消息隊列不可用
        EFAULT:消息隊列位址無法擷取
        EIDRM:消息隊列已經被删除
        EINTR:消息隊列等待寫入的時候被中斷
        EINVAL:msqid無效值、msgsz小于0
        ENOMSG:msgflg = IPC_NOWAIT,并且在消息隊列中不存在請求的消息類型msgtyp
參  數:msqid:消息隊列qid
        msgp:接收到的消息将要存放的緩沖區
        msgsz:消息資料配置設定空間的大小
        msgtype:期望接收的消息類型:
            0(擷取隊列中的第一個消息)、
            大于0(擷取具有相同消息類型的第一個資訊)、
            小于0(擷取類型等于或小于msgtype的絕對值的第一個消息,從類型值最小的消息開始)
        msgflg:消息标志:
            0(忽略)、
            IPC_NOWAIT(如果消息隊列為空,不阻塞等待,傳回一個EAGAIN)、
            MSG_EXCEPT(取出第一個消息類型大于0,并且消息類型不等于msgtyp的消息)、
            MSG_NOERROR(如果實際消息的長度大于msgsz,對該消息進行截斷)
說  明:
原  理:1)  計算id = (unsigned int) msqid % MSGMNI,然後根據id在linux系統消息隊列向量msgque[MSGMNI]
            中查找對應的消息隊列,并進行認證檢查,合法性檢查
        2)  根據msgtyp搜尋消息隊列,情況有二:
            如果找不到所要的消息,則以可中斷等待狀态(TASK_INTERRUPTIBLE),将目前程序挂起在rwait等
            待隊列上;
            如果找到所要的消息,則将消息從隊列中摘下,調整隊列msqid_ds參數,喚醒該消息隊列的wwait進
            程隊列上等待寫的程序,将消息内容拷貝到使用者空間的消息緩沖區msgp中,釋放核心中該消息所占用
            的空間,傳回      

5、    msgctl函數:控制消息

功  能:對消息隊列進行設定以及相關操作,具體操作由cmd指定
原  型:int msgctl (int msqid, int cmd, struct msqid_ds *buf)
傳回值:0:成功
        -1:失敗
        EACCES:沒有讀的權限同時,cmd 是IPC_STAT
        EFAULT:buf 指向的位址無效
        EIDRM:在讀取中隊列被删除
        EINVAL:msgqid無效, 或者msgsz 小于0
        EPERM:IPC_SET或者IPC_RMID 指令被使用,但調用程式沒有寫的權限
參  數:msqid:消息隊列qid
        cmd:消息隊列執行的操作:
            IPC_STAT(取取消息隊列的結構體msqid_ds,并将其存儲在buf指定的位址中)、 
            IPC_SET(設定消息隊列的資料結構msqid_ds中的msg_perm 成員的值。這個值取自buf參數)
            IPC_EMID(從系統核心中移走消息隊列)
        buf:消息隊列msqid_ds結構體指針
說  明:1) 消息隊列中的資料結構中唯一可以改動的元素就是ipc_perm。它包括隊列的存取權限和關于隊列創
            建者和擁有者的資訊。你可以改變使用者的uid、使用者的組gid以及消息隊列的存取權限mode      

1.3.  核心限制

消息隊列是IPC資源資訊中的一種,是以可以通過ipcs确定系統的目前IPC限制及已使用的資源狀況。

檢視IPC:ipcs  [-u](檢視目前的IPC資源狀況)、[-l](可以檢視IPC限制值)

建立IPC:ipcmk  [ [-q msqid] [-m shmid] [-s semid] [-Qmsgkey] [-M shmkey] [-S semkey] ... ]

删除IPC:ipcrm  [[-Q] [-S <size>] [-M <nsems>]]

# ipcs -l
 
------ Shared Memory Limits --------
max number of segments = 4096
max seg size (kbytes) = 67108864
max total shared memory (kbytes) = 17179869184
min seg size (bytes) = 1
 
------ Semaphore Limits --------
max number of arrays = 128
max semaphores per array = 250
max semaphores system wide = 32000
max ops per semop call = 32
semaphore max value = 32767
 
------ Messages: Limits --------
max queues system wide = 10                 【系統最多的消息隊列數量(最多支援同時10個消息隊列)】
max size of message (bytes) = 2048          【單個消息的最大位元組數2048】
default max size of queue (bytes) = 65536   【預設的單個隊列的大小65536】      

修改消息隊列參數如下:

1、  永久修改:

在root使用者下修改/etc/sysctl.conf配置檔案

具體方法為:在sysctl.conf中加上 kernel.msgmni=10kernel.msgmax=2048 kernel.msgmnb=65535,然後運作sysctl –p即可修改。

在Red Hat中,rc.sysinit初始化腳本将自動讀取/etc/sysctl.conf檔案

而在SUSE Linux中,還需要激活 boot.sysctl服務(chkconfig boot.sysctl on)

2、  臨時修改:

root使用者下sysctl -w kernel.msgmni=10 kernel.msgmax=2048 kernel.msgmnb=65535

1.4.  執行個體

發送資訊的程式MsgSend.c的源代碼如下:

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/msg.h>
#include <errno.h>

#define MSGKEY 1024
#define MAX_TEXT 512  
struct msg_st
{
    long int msg_type;      //第一個參數必須是long的消息類型
    char text[MAX_TEXT];    //此後可以定義多個數組和其他值
};

#define SPACE "                "
#define PRINT_X(var1, var2) printf(""#var2"%.*s: 0X%08X\n", \
        strlen(SPACE) - strlen(#var2), SPACE, var1.var2)
#define PRINT_D(var1, var2) printf(""#var2"%.*s: %d\n", \
        strlen(SPACE) - strlen(#var2), SPACE, var1.var2)

void Show(int msqid)
{
    int iRet = 0;
    struct msqid_ds buf;
    
    iRet = msgctl(msqid, IPC_STAT, &buf);
    if(0 != iRet)
    {
        printf("msgctl error, errno=%d [%s]\n", errno, strerror(errno));
        return ;
    }
    
    printf("\n");
    PRINT_X(buf.msg_perm, __key);
    PRINT_D(buf.msg_perm, uid);
    PRINT_D(buf.msg_perm, gid);
    PRINT_D(buf.msg_perm, cuid);
    PRINT_D(buf.msg_perm, cgid);
    PRINT_X(buf.msg_perm, mode);
    PRINT_X(buf.msg_perm, __seq);
    
    PRINT_X(buf, msg_stime);
    PRINT_X(buf, msg_rtime);
    PRINT_X(buf, msg_ctime);
    PRINT_D(buf, __msg_cbytes);
    PRINT_D(buf, msg_qnum);
    PRINT_D(buf, msg_qbytes);
    PRINT_X(buf, msg_lspid);
    PRINT_X(buf, msg_lrpid);
    printf("\n");
}


int main()  
{
    int running = 1;
    struct msg_st data;
    char buffer[MAX_TEXT];
    int msgid = -1;

    //建立消息隊列
    msgid = msgget(MSGKEY, 0666 | IPC_CREAT);
    if(msgid == -1)  
    {
        fprintf(stderr, "msgget failed, error=%d [%s]\n", errno, strerror(errno));
        exit(EXIT_FAILURE);
    }
    
    Show(msgid);

    //向消息隊列中寫消息,直到寫入end
    while(running)
    {
        //輸入資料
        printf("Enter some text: ");
        fgets(buffer, MAX_TEXT, stdin);
        data.msg_type = 1;    //注意此處設定消息類型
        strcpy(data.text, buffer);
        
        /* 向隊列發送資料, 如果是字元串,則參數3可以修改為字元串的長度,可以節省記憶體
           msgsnd(msgid, (void*)&data, strlen(data.text), 0) */
        if(msgsnd(msgid, (void*)&data, MAX_TEXT, 0) == -1)
        {
            fprintf(stderr, "msgsnd failed, error=%d [%s]\n", errno, strerror(errno));
            exit(EXIT_FAILURE);
        }

        Show(msgid);
        
        //輸入end結束輸入
        if(strncmp(buffer, "end", 3) == 0)
        {
            running = 0;
        }
        sleep(1);
    }
    
    exit(EXIT_SUCCESS);
}
           

接收資訊的程式MsgReceive.c的源代碼如下:

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/msg.h>

#define MSGKEY 1024
#define MAX_TEXT 512
struct msg_st
{
    long int msg_type;
    char text[MAX_TEXT];
};

int main()
{
    int running = 1;
    int msgid = -1;
    struct msg_st data;
    long int msgtype = 0; //此處設定為0,表示接收所有消息

    //建立消息隊列
    msgid = msgget(MSGKEY, 0666 | IPC_CREAT);
    if(msgid == -1)
    {  
        fprintf(stderr, "msgget failed, error=%d [%s]\n", errno, strerror(errno));
        exit(EXIT_FAILURE);
    }
    
    //從隊列中擷取消息,直到遇到end消息為止
    while(running)
    {
        if(msgrcv(msgid, (void*)&data, MAX_TEXT, msgtype, 0) == -1)
        {
            fprintf(stderr, "msgrcv failed, error=%d [%s]\n", errno, strerror(errno));
            exit(EXIT_FAILURE);
        }
        
        printf("You wrote: %s\n",data.text);
        
        //遇到end結束
        if(strncmp(data.text, "end", 3) == 0)
        {
            running = 0;
        }
    }
    
    //删除消息隊列
    if(msgctl(msgid, IPC_RMID, 0) == -1)
    {
        fprintf(stderr, "msgctl(IPC_RMID) failed, error=%d [%s]\n", 
                errno, strerror(errno));
        exit(EXIT_FAILURE);
    }
    
    exit(EXIT_SUCCESS);
}
           

1.5.  注意事項

一、每次msgrcv一個消息時:
1、那個消息會在核心中移除 
2、每次msgrcv都隻會給一個消息出來,不管你用多大的buf來接收,都是可以的。如果msgrcv的bufSize小于實際的該消息的大小,那麼可以設定一個标志(MSG_NOERROR):表示截斷。 如果不設定,那麼會報錯,取不出來
   消息滿了,則預設0為阻塞,直到有了空間位置,才能snd消息進入到核心
   消息空了,則預設0為阻塞,直到有了一個消息位置,才能 rcv消息進入到程序記憶體
二、如果指定msgflg = MSG_NOERROR,如果函數取得的消息長度大于msgsz,将隻傳回msgsz 長度的資訊,剩下的部分被丢棄了。如果不指定這個參數,E2BIG 将被傳回,而消息則留在隊列中不被取出
三、如果使用msgctl控制單個隊列的大小(msg_qbytes),當設定的值小于系統設定的值時,可以生效;當設定的值大于系統設定的值時,設定會失敗,此時還是原來系統設定的值      

繼續閱讀