天天看點

Linux IPC System V Message Queue

Linux IPC:

1. pipe   ---- 匿名管道

2. fifo     ----- 有名管道

3. signal   ---- 異步信号

4. semaphore  --- 信号燈,用于同步

5. message queue ---- posix message queue,   system-v message queue

6. shared memory  ---- 共享記憶體, 最快的程序間通信

System-V Message Queue

API:

1. key_t ftok(const char* pathname, int proj_id);   //convert a pathname and a project idendifier  to  a System V IPC key.

2. int msgget(key_t key, int msgflag);   //get a message queue idendifier

3. int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

4. ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,  int msgflg);

5. int msgctl(int msqid, int cmd, struct msqid_ds *buf);

Example:

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <sys/wait.h>

#define BUFFER_SIZE 256
struct msg_buf
{
	long mtype;
	char mtext[BUFFER_SIZE];
};

typedef enum
{
	MSG_TYPE_NOTIFY = 1,
	MSG_TYPE_OVER,
	MSG_TYPE_OVER_ACK,
	MSG_TYPE_UNDEFINE
}SYSTEM_V_MSG_TYPE;

static int main_msq, child_msq;

int child_loop()
{
	int reval;
	struct msg_buf childBuf;
	int msgflag = IPC_NOWAIT|MSG_NOERROR;
	//sleep(2);
	while(1)
	{	
		reval = msgrcv(child_msq, &childBuf, sizeof(msg_buf), /*MSG_TYPE_NOTIFY*/0,msgflag);
		if(reval < 0)
		{
			//printf("child_msg msgrcv error, errno %d\n", errno);
			//loop
			sleep(1);
		}
		else
		{
			switch(childBuf.mtype)
			{
				case MSG_TYPE_NOTIFY:
					printf("child_loop msgrcv: MSG_TYPE_NOTIFY=> %s\n", childBuf.mtext);
					break;
				case MSG_TYPE_OVER:
					printf("child_loop msgrcv: MSG_TYPE_OVER\n");
					childBuf.mtype = MSG_TYPE_OVER_ACK;
					childBuf.mtext[0] = '\0';
					reval = msgsnd(main_msq, &childBuf, sizeof(msg_buf), IPC_NOWAIT);
					if(reval < 0)
					{
						printf("child_loop msgsnd error, errno = %d\n", errno);
					}
					printf("child_loop msgsnd MSG_TYPE_OVER_ACK successfully!\n");
					break;
				default:
					//default
					break;
			}
		}
	}
	return 0;
}

int main_loop(pid_t cpid)
{	
	int msgflag = IPC_NOWAIT;
	struct msg_buf mainBuf;
	int reval, status=0;
	
	mainBuf.mtype = MSG_TYPE_NOTIFY;
	strcpy(mainBuf.mtext, "hello, child!");
	//printf("child_msq = %d\n", child_msq);
	//printf("mainBuf = 0x%p, mtype = %ld, mtext = %s\n", &mainBuf, mainBuf.mtype, mainBuf.mtext);
	//printf("IPC_NOWAIT = %d\n", IPC_NOWAIT);
	for(int i=0; i<10; i++)
	{
		reval = msgsnd(child_msq, &mainBuf, sizeof(msg_buf), IPC_NOWAIT);
		if(reval < 0)
		{
			printf("main_loop msgsnd error, reval = %d, errno = %d\n", reval, errno);
			sleep(1);
		}
		else
		{
			printf("main_loop msgsnd MSG_TYPE_NOTIFY successfully!\n");
			sleep(1);
		}
	}
	
	mainBuf.mtype = MSG_TYPE_OVER;
	mainBuf.mtext[0] = '\0';
	reval = msgsnd(child_msq, &mainBuf, sizeof(msg_buf), IPC_NOWAIT);
	if(reval < 0)
	{
		printf("main_loop msgsnd error, errno = %d\n", errno);
	}

	printf("main_loop msgsnd MSG_TYPE_OVER successfully!\n");
	
	int break_flag = 0;
	while(1)
	{
		reval = msgrcv(main_msq, &mainBuf, sizeof(msg_buf), /*MSG_TYPE_OVER_ACK*/0,  IPC_NOWAIT|MSG_NOERROR);
		if(reval < 0)
		{
			printf("main_loop msgrcv error, errno = %d\n", errno);
			sleep(1);
			continue;
		}
		//printf("msgrcv %ld => %s\n", mainBuf.mtype, mainBuf.mtext);
		//printf("msgrcv MSG_TYPE_OVER_ACK\n");
		switch(mainBuf.mtype)
		{
			case MSG_TYPE_OVER_ACK:
				printf("main_loop msg recv MSG_TYPE_OVER_ACK successfully!\n");
				kill(cpid, SIGKILL);
				wait(&status);
				printf("main_loop: child process is killed\n");
				printf("status = %d\n", status);
				break_flag = 1;
				break;
			default:
				//sleep(1);
				//default
				break;
		}
		if(break_flag == 1)
		{
			break;
		}
		sleep(1);

	}
	return 0;
}
int main(int argc, char* argv[])
{
	pid_t cpid;
	key_t key;
	int gflag, reval;
	// main_msq, child_msq;
	
	key = ftok("./msgqueue1", 1);
	if(key ==-1)
	{
		printf("ftok error\n");
		printf("errno = %d\n", errno);
		exit(EXIT_FAILURE);
	}
	gflag = IPC_CREAT | IPC_EXCL;
	main_msq = msgget(key, gflag|00666);
	if(main_msq < 0)
	{
		printf("msgget error\n");
		printf("errno = %d\n", errno);
		exit(EXIT_FAILURE);
	}
	printf("./msgqueue1 create successfully! msgid = %d\n", main_msq);
	key = ftok("./msgqueue2", 2);
	if(key ==-1)
	{
		printf("ftok error\n");
		printf("errno = %d\n", errno);
		exit(EXIT_FAILURE);
	}
	gflag = IPC_CREAT | IPC_EXCL;
	child_msq = msgget(key, gflag|00666);
	if(child_msq < 0)
	{
		printf("msgget error\n");
		printf("errno = %d\n", errno);
		exit(EXIT_FAILURE);
	}
	printf("./msgqueue2 create successfully! msgid = %d\n", child_msq);
	
	cpid = fork();

	if(cpid < 0)
	{
		printf("fork error\n");
		exit(EXIT_FAILURE);
	}
	
	if(cpid == 0)	/*child process*/
	{
		child_loop();
	}
	else		/*main process*/
	{
		main_loop(cpid);
	}
	
	reval = msgctl(main_msq, IPC_RMID, NULL);
	if(reval < 0)
	{
		printf("main msg remove error\n");
	}
	printf("main msg remove successfully!\n");
	
	reval = msgctl(child_msq, IPC_RMID, NULL);
	if(reval < 0)
	{
		printf("child msg remove error\n");
	}
	printf("child msg remove successfully!\n");

	return 0;
}
           

2012. 8. 23

繼續閱讀