問題描述:
生産者-消費者問題,也叫做緩存綁定問題(bounded-buffer),是一個多程序同步問題。
- 即有兩個程序:制造少和消費者,共享一個固定大小的緩存
- 制造商的工作是制造一段資料,放進緩存,如此重複。
- 消費者一次消費一段資料,從緩存中取出。
- 要保證不讓制造商在緩存還是滿的時候仍要向内寫資料,不讓消費者試圖從空的緩存中取出資料。
問題分析:
- 要避免多個生産商競争一個空位的情況。
- 要避免生産商和消費者同時睡覺,造成死鎖
- 要避免多個消費者競争同一段資料的情況
基本思路:
- 首先我們需要定義三個信号量:
- empty:用來表示空位的數量,初值為n(緩存區的規模)
- full:用來表示剩餘商品的數量,初值為0
- mutex:用來表示互斥量,同一時間共享的記憶體隻能由一個程序通路。
- 首先考慮生産商,在多個生産商和多個消費者同時運作的情況下:
- 1.檢查共享記憶體是否滿
- 如果滿,說明無法急需生産,那麼睡覺。
- 否則,p(empty),程序繼續
- 2.每一個生産商在執行操作前申請互斥鎖
- 如果被其他程序上鎖,說明目前共享記憶體區域被其他程序使用,那麼進入等待隊列等待互斥鎖被釋放
- 否則,對互斥鎖上鎖,程序繼續
- 3.生産一個新的商品放入空位,并且釋放互斥鎖,v(full)
- 1.檢查共享記憶體是否滿
- 然後考慮消費者,在多個消費者和多個生産商同時存在的情況下:
- 1.檢查共享的記憶體中資料是否為空
- 如果空,說明無法消費,那麼睡覺
- 否則,p(full),程序繼續
- 2.在執行操作前申請互斥鎖
- 如果被其他程序上鎖,說明目前共享記憶體區域被其他程序使用,那麼進入等待隊列等待互斥鎖被釋放
- 否則,對互斥鎖上鎖,程序繼續
- 3.消費一個商品,并且釋放互斥鎖,v(empty)
- 1.檢查共享的記憶體中資料是否為空
注釋完整的實作方案(C語言)
- init.c : 對共享記憶體的資料進行初始化和清理工作
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/msg.h>
#include "ipc.h"
int main ( int argc , char * argv[] )
{
int i,producer_pid,consumer_pid,item,shmid;
semaphore mutex,empty,full;//定義互斥量mutex,定義信号量empty,full;
union semun sem_union;
void* shared_memory = (void*);
struct shared_use_st * shared_stuff;
/*
int semget ( key_t key , int num_sems , int sem_flags );
1.第一個參數key是一個用來允許不相關的程序通路相同信号量的整數值,
所有的信号量信号量是為不同的程式提供一個key來簡介通路的,
對于每一個信号量系統生成一個信号量辨別符。符号量鍵值隻可以由semget
獲得,所有其他的信号量函數所用的信号量辨別符都是由semget所傳回的。
2.num_sems參數是所需要的信号量數目,這個值通常是1
3.sem_flags參數是一個标記集合
IPC_CREAT與IPC_EXCL的組合來保證我們可以的到一個新的唯一的信号量
如果得不到會報錯
4.傳回值如果是一個整數,這是用于其他信号量函數的辨別符,
如果失敗,則會傳回-1
*/
if ( ( mutex = semget ( (key_t) KEY_MUTEX , , |IPC_CREAT)) == - )
{
fprintf ( stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if (( empty = semget( (key_t) KEY_EMPTY , , |IPC_CREAT)) == - )
{
fprintf ( stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if (( full = semget((key_t) KEY_FULL , , |IPC_CREAT)) == - )
{
fprintf ( stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((shmid = shmget ((key_t)KEY_SHM,sizeof (struct shared_use_st),|IPC_CREAT))==-)
{
fprintf ( stderr , "Failerd to create shared memory!");
exit(EXIT_FAILURE);
}
sem_union.val = ;
if ( semctl(mutex,,SETVAL,sem_union) == - )
{
fprintf ( stderr , "Failed to set semaphore!");
exit(EXIT_FAILURE);
}
sem_union.val = ;
if ( semctl(full,,SETVAL,sem_union) == - )
{
fprintf ( stderr , "Failed to set semaphore!");
exit(EXIT_FAILURE);
}
sem_union.val = BUFFER_SIZE;
if ( semctl(empty, , SETVAL , sem_union) == - )
{
fprintf ( stderr , "Failed to set semaphore!");
exit(EXIT_FAILURE);
}
/*
void* shmat ( int shmid , const void* shmaddr , int shmflg )
連結共享記憶體辨別符為shmid的共享記憶體,連結成功後把共享記憶體區對象
映射到調用程序的位址空間,随後可像本地空間一樣通路。
shmid 共享記憶體辨別符
shmaddr 指定共享記憶體出現在程序記憶體位址的什麼位置,直接指定為NULL讓核心自己決定
一個合适的位址位置.
SHM_RDONLY 為隻讀模式,其他為讀寫模式
*/
if (( shared_memory = shmat( shmid ,(void*) , )) == (void*)- )
{
fprintf ( stderr , "shmat failed\n");
exit(EXIT_FAILURE);
}
shared_stuff = ( struct shared_use_st * ) shared_memory;
for ( i = ; i < BUFFER_SIZE ; i ++ )
{
shared_stuff->buffer[i] = ;
}
shared_stuff->low = ;
shared_stuff->high = ;
shared_stuff->cur = ;
exit(EXIT_SUCCESS);
}
/*
int shmdt ( const void* shmaddr )
與shmat相反,是用來斷開與共享記憶體附加點的位址,進制本程序通路瓷片共享記憶體
shmaddr:連結共享記憶體的起始位址
成功傳回0,除錯傳回-1
*/
- producer.c: 生産商的程序代碼
#include "ipc.h"
int main ( int argc , char * argv[] )
{
int i,item,shmid;
semaphore mutex,empty,full;
union semun sem_union;
void* shared_memory = (void*);
struct shared_use_st *shared_stuff;
if ((mutex = semget((key_t)KEY_MUTEX,,|IPC_CREAT)) == - )
{
fprintf ( stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((empty = semget((key_t)KEY_EMPTY,,|IPC_CREAT)) == - )
{
fprintf ( stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((full = semget((key_t)KEY_FULL,,|IPC_CREAT)) == - )
{
fprintf ( stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((shmid=shmget((key_t)KEY_SHM,sizeof(struct shared_use_st),|IPC_CREAT)) == - )
{
fprintf ( stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((shared_memory = shmat(shmid,(void*),)) == (void*)-)
{
fprintf ( stderr , "shmat failed\n");
exit(EXIT_FAILURE);
}
shared_stuff = ( struct shared_use_st * ) shared_memory;
for ( i = ; i < ; i++ )
{
item = ++(shared_stuff->cur);
sleep();
printf ( "Producing item %d\n" , item );
sem_p ( empty );//減少一個空位
sem_p ( mutex );//鎖上互斥鎖
(shared_stuff->buffer)[(shared_stuff->high)] = item;
(shared_stuff->high) = ((shared_stuff->high)+) % BUFFER_SIZE;
printf ( "Inserting item %d\n" , item );
sem_v ( mutex );
sem_v ( full );
}
if ( shmdt(shared_memory) == - )
{
fprintf ( stderr , "shmat failed\n");
exit(EXIT_FAILURE);
}
printf ( "Finish!\n" );
getchar();
exit(EXIT_SUCCESS);
}
- consumer.c: 消費者的程序代碼
#include "ipc.h"
int main ( int argc , char* argv[] )
{
int i,item,shmid;
semaphore mutex,empty,full;
void* shared_memory = ( void* ) ;
struct shared_use_st* shared_stuff;
if ((mutex=semget((key_t)KEY_MUTEX,,|IPC_CREAT)) == - )
{
fprintf (stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((empty=semget((key_t)KEY_EMPTY,,|IPC_CREAT)) == - )
{
fprintf (stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((full=semget((key_t)KEY_FULL,,|IPC_CREAT)) == - )
{
fprintf (stderr , "Failed to create semaphore!");
exit(EXIT_FAILURE);
}
if ((shared_memory = shmat(shmid,(void*),) ) ==(void*)- )
{
fprintf ( stderr , "shmat failed\n");
exit(EXIT_FAILURE);
}
shared_stuff = (struct shared_use_st*)shared_memory;
for(i=;i<;i++)
{
sem_p(full);
sem_p(mutex);
item = shared_stuff->buffer[shared_stuff->low];
(shared_stuff->buffer)[(shared_stuff->low)]=;
shared_stuff->low = ((shared_stuff->low)+)%BUFFER_SIZE;
printf ( "Removing item %d\n" , item );
sem_v(mutex);
sem_v(empty);
printf ( "Consuming item %d\n" ,item );
sleep();
}
if ( shmdt(shared_memory) == - )
{
fprintf ( stderr , "shmat failed\n" );
exit ( EXIT_FAILURE);
}
printf ( "Finish!\n" );
getchar();
exit(EXIT_SUCCESS);
}