天天看點

程序通信之消息隊列執行個體

在前面,我們了解程序通信的概念以及常見的程序通信方式,我做個整理如下圖:

程式通信之消息隊列執行個體

我們繼續通過執行個體來學習程序通信,今天主要是講消息隊列是如何實作程序通信的,代碼均在Ubuntu16.04下測試。

程式通信之消息隊列執行個體

如上圖所示,消息隊列是一種間接的通信方式,它提供了一種從一個程序向另一個程序發送一個資料塊的方法。 每個資料塊都被認為含有一個類型,接收程序可以獨立地接收含有不同類型的資料結構。我們可以通過發送消息來避免命名管道的同步和阻塞問題。但是消息隊列與命名管道一樣,每個資料塊都有一個最大長度的限制。

注意消息隊列和管道的不同點:

  1. 消息隊列是雙向通信,管道一般都是單向通信
  2. 消息隊列是基于消息(是以有類型這麼一說)的,而管道是基于位元組流的
  3. 消息隊列的生命周期是随核心存在而存在,而非管道随程序存在

消息隊列一般會涉及到這麼些函數:

程式通信之消息隊列執行個體

對于這些,我們可以在Linux下同man指令擷取資訊,我們簡要介紹一下這些函數。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
           

msgget用于建立一個新的消息隊列或者打開一個現存的消息隊列。

其中key用于差別不同的消息隊列,這樣兩個不相關程序可以通過事先約定的key值通過消息隊列進行消息收發。例如程序A向key消息隊列發送消息,程序B從Key消息隊列讀取消息。一般來說,key可通過ftok函數獲得。

而msgflg則是一個權限标志,表示消息隊列的通路權限,它與檔案的通路權限一樣。它一般指定兩個參數:IPC _ CREAT和IPC _ EXCL,如果單獨用IPC _ CREAT時候,它就建立一個消息隊列,如果該消息隊列已存在則打開,如果指定IPC _ CREAT和IPC _ EXCL,如果該消息隊列存在的話則出錯。

注意IPC _ EXCL單獨使用無意義,他隻有和IPC _ CREAT一起使用才有意義,保證消息隊列是建立的而非已有的!

該函數傳回一個以key命名的消息隊列的辨別符(非零整數),失敗時傳回-1。

是以建立create _ msg _ queue或者擷取get _ msg _ queue一個消息隊列可以這麼寫:

int msg_queue(int msgflag)
{
    key_t key = ftok(PATH, PROJ_ID);
    if (key == -)
    {
        printf("ftok failure\n");
        exit(-);
    }

    int msg_queue_id = msgget(key, msgflag | ); //消息隊列的權限為
    if (msg_queue_id == -)
    {
        printf("queue_id failure\n");
        exit(-);
    }
    return msg_queue_id;
}

int get_msg_queue()
{
    return msg_queue(IPC_CREAT);
}

int create_msg_queue()
{
    return msg_queue(IPC_CREAT | IPC_EXCL);
}
           

對于這個消息隊列,我采用是server端建立消息隊列,client擷取消息隊列,這樣子,他們兩就可以進行程序通信了。我們可以用ipcs -q指令檢視消息隊列,如下圖:

程式通信之消息隊列執行個體

消息隊列的id為32768,而client和server端列印也符合要求。

注意一點,當我們執行完程式後,用ipcs -q依舊能檢視消息隊列,此時client和server程序已經結束,是以也驗證我們前面所說的,消息隊列的生命周期是随核心存在的。

那麼,這樣一來問題就來了,如果我們程序中不去顯式銷毀消息隊列,顯然消息隊列會越來越多占用資源,是以,我們需要删除消息隊列,這裡有兩個辦法,一是使用ipcrm -q msqid指令删除,二是采用函數調用删除。我們分别說一下。

程式通信之消息隊列執行個體

而更多的,我們采用調用函數銷毀消息隊列,而在這裡,需要調用下面這個函數:

int msgctl (int msqid, int cmd, struct msqid_ds *buf); //消息隊列屬性控制 
參數:
    msqid:消息隊列的辨別符。
    cmd:執行的控制指令,即要執行的操作。包括以下選項:
        IPC_STAT:讀取消息隊列屬性。取得此隊列的msqid_ds 結構,并将其存放在buf指向的結構中。 
        IPC_SET :設定消息隊列屬性。
        IPC_RMID:删除消息隊列。
        IPC_INFO:讀取消息隊列基本情況。此指令等同于 ipcs 指令。
        這  條指令(IPC_STAT、IPC_SET、IPC_INFO 和 IPC_RMID)也可用于信号量和共享存儲。
    buf:臨時的 msqid_ds 結構體類型的變量。用于存儲讀取的消息隊列屬性或需要修改的消息隊列屬性。

舉例:msgctl(qid, IPC_RMID, NULL)  //删除消息隊列
           

是以删除代碼就很好寫了:

void destroy_msg_queue(int msqid)
{
    int ret = msgctl(msqid, IPC_RMID, NULL);
    if (ret == -)
    {
        fprintf(stderr, "destroy_msg_queue failure\n");
        exit(-);
    }
}
           

建立好一個消息隊列,client和server兩個也都得到消息隊列ID,下面就開始我們的程序通信,也就是發送資訊,接收資訊。

下面介紹msgsnd,msgrcv函數。

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); //将消息送入消息隊列
參數:  
    msqid:消息隊列的辨別符。
    msgp:指向消息緩沖區的指針,此位置用來暫時存儲發送和接收的消息,是一個使用者可定義的通用結構,形态如下

struct msgbuf {
    long mtype;     /* 消息類型,必須 > 0 */
    char mtext[size];  /* 消息文本 */
};
    msgsz:消息的大小。
    msgflg:用來指明程序在隊列資料滿(msgsnd)或空(msgrcv)的情況下所應采取的行動。
        如果設定為 IPC_NOWAIT,則在消息隊列已滿時不發送消息并且調用程序立即傳回錯誤資訊EAGAIN。
        如果設定為 ,則調用程序阻塞直至消息隊列不為滿。
    size:使用者可指定大小  
傳回說明:  
成功執行時,msgsnd()傳回, 失敗傳回-

舉例:msgsnd(g_msg_id,&msg_snd,sizeof(msg_snd.msg_item),IPC_NOWAIT); //非阻塞發送消息    



ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); //從消息隊列讀取資訊
參數:
    msgtyp:
    msgtyp = :收取隊列中的第一條消息,任意類型。
    msgtyp > :收取第一條 msgtyp 類型的消息。  這個能判斷自己該不該收這個消息是不是想要的發信人發來的
    msgtyp < :收取第一條最低類型(小于或等于 msgtyp 的絕對值)的消息。 
    其他參數參考msgsnd函數。

傳回說明:  
成功執行時,msgrcv()傳回, 失敗傳回-

舉例:msgrcv(msgid,&msg_rbuf,sizeof(msg_rbuf.msg_item),,); //阻塞接收
           

執行個體

介紹完以上函數特性,我們寫一個程序通信的簡單執行個體,類似一個小型聊天程式,基于消息隊列實作。

如下圖:

程式通信之消息隊列執行個體

它有用戶端client和伺服器端server,初始化,我們讓server端建立消息隊列,client擷取該消息隊列,然後兩者進行互相通信,比如client發出資訊,server接受資訊,然後server發送資訊,client接受資訊。

注意幾點:

  1. 消息隊列是由server端建立和銷毀的,client端擷取改消息隊列,是以,必須先運作./ server然後運作./client
  2. 預設是client先發資訊,server端擷取到消息,然後server發送資訊,client接受這樣子展現互動性

代碼如下:

Com.h聲明

#ifndef __MSG_QUEUE__
#define __MSG_QUEUE__

#include<unistd.h>
#include<stdio.h>
#include<stdlib.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<string.h>

#define CLIENT_TYPE 1
#define SERVER_TYPE 2

#define PATH "./"
#define PROJ_ID 256 
#define TEXT_SIZE 512  



struct MsgBuf
{
    long mtype;
    char mtext[TEXT_SIZE];
};

typedef struct MsgBuf MsgBuf;



int msg_queue();

int get_msg_queue();

int create_msg_queue();

int send_msg(int msg_queue_id, MsgBuf* msgbuf, long type);

int receive_msg(int msg_queue_id, MsgBuf* msgbuf, long type);

void destroy_msg_queue(int msqid);

#endif 
           

Com.c對Com.h實作

#include "com.h"


int msg_queue(int msgflag)
{
    key_t key = ftok(PATH, PROJ_ID);
    if (key == -)
    {
        fprintf(stderr, "ftok failure\n");
        exit(-);
    }

    int msg_queue_id = msgget(key, msgflag | );
    if (msg_queue_id == -)
    {
        fprintf(stderr, "queue_id failure\n");
        exit(-);
    }
    return msg_queue_id;
}

int get_msg_queue()
{
    return msg_queue(IPC_CREAT);
}

int create_msg_queue()
{
    return msg_queue(IPC_CREAT | IPC_EXCL);
}

void destroy_msg_queue(int msqid)
{
    int ret = msgctl(msqid, IPC_RMID, NULL);
    if (ret == -)
    {
        fprintf(stderr, "destroy_msg_queue failure\n");
        exit(-);
    }
}

int send_msg(int msg_queue_id, MsgBuf* msgbuf, long type)
{
    msgbuf->mtype = type;
    msgsnd(msg_queue_id, (const void*)msgbuf, sizeof(msgbuf->mtext), ); //以阻塞方式發送
}

int receive_msg(int msg_queue_id, MsgBuf* msgbuf, long type)
{
    msgbuf->mtype = type;
    msgrcv(msg_queue_id, (void*)msgbuf, sizeof(msgbuf->mtext), type, );
}
           

Server.c

#include "com.h"

int main()
{
    int msg_queue_id = create_msg_queue();
    printf("server succeeds in connecting the client\n\n"); 

    MsgBuf msgbuf;
    //memset(msgbuf.mtext, '\0', TEXT_SIZE);


    while ()
    {
        receive_msg(msg_queue_id, &msgbuf, CLIENT_TYPE);
        printf("client # %s\n", msgbuf.mtext);
        msgbuf.mtext[] = '\0';//清空緩沖區

        printf("please input the message # ");
        scanf("%s", msgbuf.mtext);
        send_msg(msg_queue_id, &msgbuf, SERVER_TYPE);
        msgbuf.mtext[] = '\0';//清空緩沖區
    }


    destroy_msg_queue(msg_queue_id);
    return ;
}
           

Client.c

#include"com.h"

int main()
{
    int msg_queue_id = get_msg_queue();
    printf("client succeeds in connecting with the server!\n\n");

    MsgBuf msgbuf;
    //memset(msgbuf.mtext, '\0', TEXT_SIZE);


    while ()
    {
        printf("please input the message # ");
        scanf("%s", msgbuf.mtext);
        send_msg(msg_queue_id, &msgbuf, CLIENT_TYPE);
        msgbuf.mtext[] = '\0';//清空緩沖區

        receive_msg(msg_queue_id, &msgbuf, SERVER_TYPE); //client希望接收server端的資訊
        printf("server # %s\n", msgbuf.mtext);
        msgbuf.mtext[] = '\0';//清空緩沖區
    }

    return ;
}
           

makefile

.PHONY:all
all:client server
client:client.c com.c
    gcc -o $@ $^
server:server.c com.c
    gcc -o $@ $^
.PHONY:clean
clean:
    rm -f client server
           

繼續閱讀