生産者–消費者模型簡述
對于生産者–消費者模型,相信我們都不陌生,因為生活中,我們無時無刻不在扮演生産者或消費者。但是對于Linux中的生産者–消費者模型,大家又了解了一個什麼程度?
其實,說白了就是一個生産,一個消費的關系,而且保證生産者在不生産的時候,消費者無法消費(這跟生活中一樣,隻不過生活中我們是确實消費不到任何東西,但在計算機中,我們就有可能消費到無效資料,、亂資料、過期資料等等);消費者的消費欲望飽和之後,生産者不生産(同上,如果市場中大量湧入某個商品,導緻市場急速飽和,那麼廠家戶很果斷的暫時放棄生産這個商品)。但是生活中的又跟計算機中的有差別,在于:一般情況下,消費者之間并無任何關系,但是計算機中,消費者是存在競争關系的,即A消費者拿走了這個資料,那麼剩餘的B,C,D,E…..等等都無法使用這個資料,因為已經被消費了。生産者之間當然是競争關系了,兩個供銷商為了業績,會競争成為一個大型超市的供貨對象,很相似的是,計算機中,多個生産者都是為了為這塊空間生産資料(寫資料),為了空間的使用權。而生産者與消費者之間的關系為互斥與同步,互斥就是競争關系,因為鄰界資源的通路一次隻能允許一個執行分支通路,是以:如果我在寫(生産),你就不能讀(消費);同步關系表示為:我在寫完之後,就會調用函數給你信号,喚醒你來消費,解決二者之間可能出現的饑餓問題。
生産者–消費者間的關系
是以生産者與消費者之間的關系可以總結如下幾點:
生産者與生産者:互斥關系;
消費者與消費者:互斥關系;
生産者與消費者:互斥與同步關系。
生産者與消費者模型的實作
實作生産者與消費者模型時我們需要用到互斥鎖,與條件變量。
互斥鎖
互斥鎖即為保護鄰界資源的一個保護“裝置”,這個我之前解釋過,跟二進制信号量有類似的地方。當鄰界資源未有執行分支使用時,鎖是開着的,隻要一有執行分支取通路鄰界資源,那麼鎖就會上鎖,向外表明:這塊鄰界資源已經有“人”使用,别“人”來之隻能等待(可以使阻塞和非阻塞),當通路鄰界資源的執行分支操作完畢,告訴互斥鎖,我已經通路完畢,互斥鎖開鎖,向外界表明:此時鄰界資源無“人”使用,可以申請。互斥鎖還會保證高優先級的執行分支不能一直持有鎖,這樣了解:每當一個執行分支操作完畢,就将他的優先級降低,讓他去等待隊列的後面排隊。
相關函數:
#include <pthread.h>
pthread_t lock;//定義一個鎖,如果這個鎖是全局的,就使用宏來初始化,如果是局部的,就使用函數pthread_mutex_init來初始化
int pthread_mutex_lock(pthread_mutex_t *mutex);//上鎖函數
int pthread_mutex_trylock(pthread_mutex_t *mutex);//試上鎖函數,即非阻塞等待
int pthread_mutex_unlock(pthread_mutex_t *mutex);//解鎖函數
條件變量
條件變量是用來描述鄰界資源内部狀态的一種方法,假如我一直在等,那麼我怎麼知道等待什麼時候,條件變量就是用來跟這個的,當鄰界資源中沒有可以用來供目前執行分支通路的時候,那麼我就挂起等待,并且,注意!注意!我還要解開我的鎖,不然我持有鎖并挂起等待,那麼别的執行分支申請不到鎖資源,也會陷入挂起等待,形成上一篇描述的死鎖問題。當鄰界資源中滿足我通路的條件時,你再回來喚醒我,并且此時我再次持有鎖資源,以此保證我的正常操作不被别的執行分支打擾。
相關函數:
#include <pthread.h>
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
//計時等待方式如果在給定時刻前條件沒有滿足,則傳回ETIMEDOUT,結束等待
//即目前條件不滿足我的通路條件,是以是目前執行分支挂起等待
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);//同上,沒有計時,直到滿足條件為止
int pthread_cond_broadcast(pthread_cond_t *cond);//喚醒所有的條件變量導緻進入挂起等待的執行分支
int pthread_cond_signal(pthread_cond_t *cond);//喚醒單個
基于單連結清單的生産者–消費者模型
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
typedef struct list{
struct list* next;
int data;
}Node,*PNode;
Node* head;//定義一個全局的單連結清單,是兩個的兩個線程公共資源
pthread_cond_t procon=PTHREAD_COND_INITIALIZER;//定義條件變量并初始化
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;//定義互斥鎖并初始化
void initlist(PNode* head)
{
*head=NULL;
}
void* consumer(void* val)
{
int i=;
while(i)
{
Node* tmp=NULL;
pthread_mutex_lock(&lock);//上鎖,保證通路安全
while(NULL == head)//用while原因是pthread_cond_wait可能調用失敗
{
printf("list empty,consunistd.humer just waitting...\n");
pthread_cond_wait(&procon,&lock);//如果此時連結清單為空,即無法通路,是以consumer挂起等待product函數插入節點後通路,
//當product函數插入節點時,會喚醒consumer
}
tmp=head;
head=tmp->next;
tmp->next=NULL;
printf("----------------pthread tid: %u\n",pthread_self());//目前線程的tid
printf("consumer success, total: %d\n",i++);
pthread_mutex_unlock(&lock);//解鎖
free(tmp);
tmp=NULL;
}
return NULL;
}
void* product(void* val)
{
initlist(&head);
int i=;
while(i)
{
pthread_mutex_lock(&lock);
Node* node=(Node*)malloc(sizeof(Node));
node->data=i;
node->next=head;
head=node;
printf("product success,total: %d\n",i++);
pthread_mutex_unlock(&lock);
sleep();
pthread_cond_signal(&procon);//取喚醒consumer
}
return NULL;
}
int main()
{
void *ret;
pthread_t pro,con,con1;
pthread_create(&pro,NULL,product,NULL);
pthread_create(&con,NULL,consumer,NULL);
pthread_create(&con1,NULL,consumer,NULL);
pthread_join(pro,&ret);
pthread_join(con,&ret);
return ;
}
程式結果截圖:
注意事項:
提醒一點,create線程後必須join線程,不然線程不會運作,别問我為什麼。我去哭會/(ㄒoㄒ)/~
基于環形隊列的多線程的生産者–消費者模型
相關函數:
初始化函數,*sem表示信号量,pshared為0時表示可供同一程序中的多個線程同步,value為信号量的數量,即鄰界資源的數量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
sem_wait函數相當于二進制信号量的P操作,給目前信号量表示的鄰界資源的數量減一
#include <semaphore.h>
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
sem_post函數相當于二進制信号量的V操作,給目前信号量表示的鄰界資源的數量加一
#include <semaphore.h>
int sem_post(sem_t *sem);
sem_destroy函數,銷毀*sem表示的信号量。
#include <semaphore.h>
int sem_destroy(sem_t *sem);
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include <semaphore.h>
#define SIZE 10
sem_t product_sem;
sem_t consumer_sem;
static int pro=SIZE;//一開始生産者可以生産
static int con=;//一開始沒有産品可供消費
static int p=;//p,c,d變量均是為了列印定義的
static int c=;
static int d=;
static int table[SIZE];//用數組表示一個環形隊列
void* product_fun(void* val)
{
int i=;
int index=;
while(i)
{
sem_wait(&product_sem);//申請一個可生産的空間,即申請成功表示鄰界資源可寫,否則表示鄰界資源已滿不可寫入。
table[p]=i;
pro--;
con++;
printf("-----------------------------------------\n");
printf("which product:%u\n",pthread_self());//哪個生産者生産的
printf("product# product success,total:%d\n",i++);//一共生産了多少個
printf("profuct# you have %d blank source!\n",pro);//生産者可用的生産空間
printf("product# you have %d data source!\n",con);//消費可供消費的産品的數量
for(index=;index<SIZE;index++)//輸出目前生産的所有産品
{
if(table[index]!=)
printf("array[%d]:%d ",index,table[index]);
}
printf("\n");
printf("-----------------------------------------\n");
printf("\n");
sem_post(&consumer_sem);//給消費者可供消費的産品數量加一,即将鄰界資源中的可使用的資料加一
p=(p+)%SIZE;//下标調整
sleep();
}
return NULL;
}
void* consumer_fun(void* val)
{
int i=;
int temp=;
while(i)
{
sem_wait(&consumer_sem);// 申請一個消費者可消費的産品,即此時鄰界資源中有可使用的資料。
if(c!=)
d=c-;
temp=table[c];
table[c]=;
pro++;
con--;
printf("##########################################\n");
printf("which consumer:%u\n",pthread_self());//哪個消費者消費
printf("consume: %d\n",temp);//消費的資料為temp
printf("consumer# you have %d data source!\n",con);//可供消費的資料量
printf("consumer# you have %d blank source!\n",pro);//可供生産的空間
printf("##########################################\n");
printf("\n");
sem_post(&product_sem);//給鄰界資源可寫的資源加一,表示消費一個資料,拿走資料後,空出一個位置可寫
c=(c+)%SIZE;
}
}
void destroy()
{
sem_destroy(&product_sem);
sem_destroy(&consumer_sem);
exit();
}
void initsem()
{
signal(,destroy);//信号捕捉函數,自定義2号信号量為銷毀兩個信号量
int i=;
for(i=;i<SIZE;++i)
table[i]=;
sem_init(&product_sem,,SIZE);
sem_init(&consumer_sem,,);
}
int main()
{
initsem();
pthread_t product1,product2,consumer1,consumer2;
pthread_create(&product1,NULL,product_fun,NULL);
pthread_create(&product2,NULL,product_fun,NULL);
pthread_create(&consumer1,NULL,consumer_fun,NULL);
pthread_create(&consumer2,NULL,consumer_fun,NULL);
pthread_join(product1,NULL);
pthread_join(product2,NULL);
pthread_join(consumer1,NULL);
pthread_join(consumer2,NULL);
return ;
}
程式結果截圖:
如有錯誤,請指出!