生産者消費者問題:該問題描述了兩個共享固定大小緩沖區的程序——即所謂的“生産者”和“消費者”——在實際運作時會發生的問題。生産者的主要作用是生成一定量的資料放到緩沖區中,然後重複此過程。與此同時,消費者也在緩沖區消耗這些資料。該問題的關鍵就是要保證生産者不會在緩沖區滿時加入資料,消費者也不會在緩沖區中空時消耗資料。
我們可以用信号量解決生産者消費者問題,如下圖:

定義3個信号量,sem_full 和 sem_empty 用于生産者程序和消費者程序之間同步,即緩沖區為空才能生産,緩沖區不為空才能消費。由于共享同一塊緩沖區,在生産一個産品過程中不能消費産品,在消費一個産品的過程中不能生産産品,故再使用一個 sem_mutex 信号量來限制行為,即程序間互斥。
下面基于生産者消費者模型,來實作一個先進先出的共享記憶體段:
如上圖所示,定義兩個結構體,shmhead 是共享記憶體段的頭部,儲存了塊大小,塊數,讀寫索引。shmfifo 儲存了共享記憶體頭部的指針,有效負載的起始位址,建立的共享記憶體段的shmid,以及3個信号量。
下面來封裝幾個函數:
C++ Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | #include "shmfifo.h" #include <assert.h> shmfifo_t *shmfifo_init( int key, int blksize, int blocks) { shmfifo_t *fifo = (shmfifo_t *)malloc( sizeof(shmfifo_t)); assert(fifo != NULL); memset(fifo, 0, int shmid; shmid = shmget(key, 0); int size = sizeof(shmhead_t) + blksize * blocks; if (shmid == - 1) { fifo->shmid = shmget(key, size, IPC_CREAT | 0666); if (fifo->shmid == - ERR_EXIT( "shmget"); fifo->p_shm = (shmhead_t *)shmat(fifo->shmid, NULL, if (fifo->p_shm == (shmhead_t *) - "shmat"); fifo->p_payload = ( char *)(fifo->p_shm + 1); fifo->p_shm->blksize = blksize; fifo->p_shm->blocks = blocks; fifo->p_shm->rd_index = 0; fifo->p_shm->wr_index = fifo->sem_mutex = sem_create(key); fifo->sem_full = sem_create(key + fifo->sem_empty = sem_create(key + 2); sem_setval(fifo->sem_mutex, sem_setval(fifo->sem_full, blocks); sem_setval(fifo->sem_empty, } else fifo->shmid = shmid; fifo->sem_mutex = sem_open(key); fifo->sem_full = sem_open(key + fifo->sem_empty = sem_open(key + return fifo; } void shmfifo_put(shmfifo_t *fifo, const void *buf) sem_p(fifo->sem_full); sem_p(fifo->sem_mutex); memcpy(fifo->p_payload + fifo->p_shm->blksize * fifo->p_shm->wr_index, buf, fifo->p_shm->blksize); fifo->p_shm->wr_index = (fifo->p_shm->wr_index + 1) % fifo->p_shm->blocks; sem_v(fifo->sem_mutex); sem_v(fifo->sem_empty); void shmfifo_get(shmfifo_t *fifo, sem_p(fifo->sem_empty); memcpy(buf, fifo->p_payload + fifo->p_shm->blksize * fifo->p_shm->rd_index, fifo->p_shm->blksize); fifo->p_shm->rd_index = (fifo->p_shm->rd_index + sem_v(fifo->sem_full); void shmfifo_destroy(shmfifo_t *fifo) sem_d(fifo->sem_mutex); sem_d(fifo->sem_full); sem_d(fifo->sem_empty); shmdt(fifo->p_shm); shmctl(fifo->shmid, IPC_RMID, free(fifo); |
1、shmfifo_init:先配置設定shmfifo 結構體的記憶體,如果嘗試打開共享記憶體失敗則建立,建立的共享記憶體段大小 = shmhead大小 + 塊大小×塊數目,然後shmat将此共享記憶體段映射到程序位址空間,然後使用sem_create 建立3個信号量集,每個信号集隻有一個信号量,即上面提到的3個信号量,設定每個信号量的資源初始值。如果共享記憶體已經存在,則直接sem_open 打開即可。sem_xxx 系列封裝函數參考這裡。
2、shmfifo_put:參照第一個生産者消費者的圖,除去sem_p,sem_v 操作之外,中間就将buf 的内容memcpy 到對應緩沖區塊,然後移動wr_index。
3、shmfifo_get:與shmfifo_put 類似,執行的是相反的操作。
4、shmfifo_destroy:删除3個信号量集,将共享記憶體段從程序位址空間剝離,删除共享記憶體段,釋放shmfifo 結構體的記憶體。
下面是生産者程式和消費者程式:
shmfifo_send.c
typedef struct stu char name[ 32]; int age; } STU; int main( void) shmfifo_t *fifo = shmfifo_init( 1234, sizeof(STU), 3); STU s; memset(&s, sizeof(STU)); s.name[ 0] = 'A'; int i; for (i = 0; i < 5; i++) s.age = 20 + i; shmfifo_put(fifo, &s); s.name[ 0] = s.name[ 0] + 1; printf( "send ok\n"); return |
shmfifo_recv.c
shmfifo_get(fifo, &s); "name = %s age = %d\n", s.name, s.age); shmfifo_destroy(fifo); |
先運作生産者程序,輸出如下:
simba@ubuntu:~/Documents/code/linux_programming/UNP/system_v/shmfifo$ ./shmfifo_send
send ok
因為共享記憶體隻有3塊block,故發送了3次後再次P(semfull)就阻塞了,等待消費者讀取資料,現在運作消費者程序
simba@ubuntu:~/Documents/code/linux_programming/UNP/system_v/shmfifo$ ./shmfifo_recv
name = A age = 20
name = B age = 21
name = C age = 22
name = D age = 23
name = E age = 24
因為生産者已經建立了一塊共享記憶體,故消費者隻是打開而已,當讀取了第一塊資料之後,生産者會再次寫入,依次輸出後兩個 send ok,可以推論的是D是重新寫到共享記憶體開始的第一塊,E是第二塊,類似環形隊列。
從輸出可以看出,的确實作了資料的先進先出。
參考:《UNP》