天天看點

Linux基于單連結清單&環形隊列的多線程生産者消費者模型生産者–消費者模型簡述生産者–消費者間的關系生産者與消費者模型的實作

生産者–消費者模型簡述

  對于生産者–消費者模型,相信我們都不陌生,因為生活中,我們無時無刻不在扮演生産者或消費者。但是對于Linux中的生産者–消費者模型,大家又了解了一個什麼程度?

  其實,說白了就是一個生産,一個消費的關系,而且保證生産者在不生産的時候,消費者無法消費(這跟生活中一樣,隻不過生活中我們是确實消費不到任何東西,但在計算機中,我們就有可能消費到無效資料,、亂資料、過期資料等等);消費者的消費欲望飽和之後,生産者不生産(同上,如果市場中大量湧入某個商品,導緻市場急速飽和,那麼廠家戶很果斷的暫時放棄生産這個商品)。但是生活中的又跟計算機中的有差別,在于:一般情況下,消費者之間并無任何關系,但是計算機中,消費者是存在競争關系的,即A消費者拿走了這個資料,那麼剩餘的B,C,D,E…..等等都無法使用這個資料,因為已經被消費了。生産者之間當然是競争關系了,兩個供銷商為了業績,會競争成為一個大型超市的供貨對象,很相似的是,計算機中,多個生産者都是為了為這塊空間生産資料(寫資料),為了空間的使用權。而生産者與消費者之間的關系為互斥與同步,互斥就是競争關系,因為鄰界資源的通路一次隻能允許一個執行分支通路,是以:如果我在寫(生産),你就不能讀(消費);同步關系表示為:我在寫完之後,就會調用函數給你信号,喚醒你來消費,解決二者之間可能出現的饑餓問題。

  

生産者–消費者間的關系

  是以生産者與消費者之間的關系可以總結如下幾點:

  生産者與生産者:互斥關系;

  消費者與消費者:互斥關系;

  生産者與消費者:互斥與同步關系。

生産者與消費者模型的實作

  實作生産者與消費者模型時我們需要用到互斥鎖,與條件變量。

互斥鎖

   互斥鎖即為保護鄰界資源的一個保護“裝置”,這個我之前解釋過,跟二進制信号量有類似的地方。當鄰界資源未有執行分支使用時,鎖是開着的,隻要一有執行分支取通路鄰界資源,那麼鎖就會上鎖,向外表明:這塊鄰界資源已經有“人”使用,别“人”來之隻能等待(可以使阻塞和非阻塞),當通路鄰界資源的執行分支操作完畢,告訴互斥鎖,我已經通路完畢,互斥鎖開鎖,向外界表明:此時鄰界資源無“人”使用,可以申請。互斥鎖還會保證高優先級的執行分支不能一直持有鎖,這樣了解:每當一個執行分支操作完畢,就将他的優先級降低,讓他去等待隊列的後面排隊。

  

   相關函數:

  

#include <pthread.h>
       pthread_t lock;//定義一個鎖,如果這個鎖是全局的,就使用宏來初始化,如果是局部的,就使用函數pthread_mutex_init來初始化

       int pthread_mutex_lock(pthread_mutex_t *mutex);//上鎖函數
       int pthread_mutex_trylock(pthread_mutex_t *mutex);//試上鎖函數,即非阻塞等待
       int pthread_mutex_unlock(pthread_mutex_t *mutex);//解鎖函數

           

條件變量

  條件變量是用來描述鄰界資源内部狀态的一種方法,假如我一直在等,那麼我怎麼知道等待什麼時候,條件變量就是用來跟這個的,當鄰界資源中沒有可以用來供目前執行分支通路的時候,那麼我就挂起等待,并且,注意!注意!我還要解開我的鎖,不然我持有鎖并挂起等待,那麼别的執行分支申請不到鎖資源,也會陷入挂起等待,形成上一篇描述的死鎖問題。當鄰界資源中滿足我通路的條件時,你再回來喚醒我,并且此時我再次持有鎖資源,以此保證我的正常操作不被别的執行分支打擾。

  相關函數:

  

#include <pthread.h>

       int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, 
       const struct timespec *restrict abstime);
       //計時等待方式如果在給定時刻前條件沒有滿足,則傳回ETIMEDOUT,結束等待
       //即目前條件不滿足我的通路條件,是以是目前執行分支挂起等待
       int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);//同上,沒有計時,直到滿足條件為止

       int pthread_cond_broadcast(pthread_cond_t *cond);//喚醒所有的條件變量導緻進入挂起等待的執行分支
       int pthread_cond_signal(pthread_cond_t *cond);//喚醒單個
           

基于單連結清單的生産者–消費者模型

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

typedef struct list{
    struct list* next;
    int data;
}Node,*PNode;

Node* head;//定義一個全局的單連結清單,是兩個的兩個線程公共資源
pthread_cond_t procon=PTHREAD_COND_INITIALIZER;//定義條件變量并初始化
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;//定義互斥鎖并初始化

void initlist(PNode* head)
{
    *head=NULL;
}

void* consumer(void* val)
{
    int i=;
    while(i)
    {
        Node* tmp=NULL;
        pthread_mutex_lock(&lock);//上鎖,保證通路安全
        while(NULL == head)//用while原因是pthread_cond_wait可能調用失敗
        {
            printf("list empty,consunistd.humer just waitting...\n");
            pthread_cond_wait(&procon,&lock);//如果此時連結清單為空,即無法通路,是以consumer挂起等待product函數插入節點後通路,
                                             //當product函數插入節點時,會喚醒consumer
        }
        tmp=head;
        head=tmp->next;
        tmp->next=NULL;
        printf("----------------pthread tid: %u\n",pthread_self());//目前線程的tid
        printf("consumer success, total: %d\n",i++);
        pthread_mutex_unlock(&lock);//解鎖
        free(tmp);
        tmp=NULL;
    }
    return NULL;
}

void* product(void* val)
{
    initlist(&head);
    int i=;
    while(i)
    {
        pthread_mutex_lock(&lock);
        Node* node=(Node*)malloc(sizeof(Node));
        node->data=i;
        node->next=head;
        head=node;
        printf("product success,total: %d\n",i++);
        pthread_mutex_unlock(&lock);
        sleep();
        pthread_cond_signal(&procon);//取喚醒consumer
    }
    return NULL;
}
int main()
{
    void *ret;
    pthread_t pro,con,con1;
    pthread_create(&pro,NULL,product,NULL);
    pthread_create(&con,NULL,consumer,NULL);
    pthread_create(&con1,NULL,consumer,NULL);

    pthread_join(pro,&ret);
    pthread_join(con,&ret);
    return ;
}
           

程式結果截圖:

Linux基于單連結清單&amp;環形隊列的多線程生産者消費者模型生産者–消費者模型簡述生産者–消費者間的關系生産者與消費者模型的實作
Linux基于單連結清單&amp;環形隊列的多線程生産者消費者模型生産者–消費者模型簡述生産者–消費者間的關系生産者與消費者模型的實作

注意事項:

  提醒一點,create線程後必須join線程,不然線程不會運作,别問我為什麼。我去哭會/(ㄒoㄒ)/~

基于環形隊列的多線程的生産者–消費者模型

相關函數:

  初始化函數,*sem表示信号量,pshared為0時表示可供同一程序中的多個線程同步,value為信号量的數量,即鄰界資源的數量

#include <semaphore.h>

       int sem_init(sem_t *sem, int pshared, unsigned int value);
           

  sem_wait函數相當于二進制信号量的P操作,給目前信号量表示的鄰界資源的數量減一

#include <semaphore.h>

       int sem_wait(sem_t *sem);

       int sem_trywait(sem_t *sem);

       int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
           

  sem_post函數相當于二進制信号量的V操作,給目前信号量表示的鄰界資源的數量加一

#include <semaphore.h>

       int sem_post(sem_t *sem);
           

  sem_destroy函數,銷毀*sem表示的信号量。

#include <semaphore.h>

       int sem_destroy(sem_t *sem);
           
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include <semaphore.h>
#define SIZE 10

sem_t product_sem;
sem_t consumer_sem;
static int pro=SIZE;//一開始生産者可以生産
static int con=;//一開始沒有産品可供消費
static int p=;//p,c,d變量均是為了列印定義的
static int c=;
static int d=;
static int table[SIZE];//用數組表示一個環形隊列

void* product_fun(void* val)
{
    int i=;
    int index=;
    while(i)
    {
        sem_wait(&product_sem);//申請一個可生産的空間,即申請成功表示鄰界資源可寫,否則表示鄰界資源已滿不可寫入。

        table[p]=i;
        pro--;
        con++;
        printf("-----------------------------------------\n");
        printf("which product:%u\n",pthread_self());//哪個生産者生産的
        printf("product#  product success,total:%d\n",i++);//一共生産了多少個
        printf("profuct#  you have %d blank source!\n",pro);//生産者可用的生産空間
        printf("product#  you have %d data source!\n",con);//消費可供消費的産品的數量
        for(index=;index<SIZE;index++)//輸出目前生産的所有産品
        {
            if(table[index]!=)
                printf("array[%d]:%d ",index,table[index]);
        }
        printf("\n");
        printf("-----------------------------------------\n");
        printf("\n");

        sem_post(&consumer_sem);//給消費者可供消費的産品數量加一,即将鄰界資源中的可使用的資料加一
        p=(p+)%SIZE;//下标調整
        sleep();
    }
    return NULL;
}

void* consumer_fun(void* val)
{
    int i=;
    int temp=;
    while(i)
    {
        sem_wait(&consumer_sem);// 申請一個消費者可消費的産品,即此時鄰界資源中有可使用的資料。

        if(c!=)
            d=c-;
        temp=table[c];
        table[c]=;
        pro++;
        con--;
        printf("##########################################\n");
        printf("which consumer:%u\n",pthread_self());//哪個消費者消費
        printf("consume: %d\n",temp);//消費的資料為temp
        printf("consumer#  you have %d data source!\n",con);//可供消費的資料量
        printf("consumer#  you have %d blank source!\n",pro);//可供生産的空間
        printf("##########################################\n");
        printf("\n");

        sem_post(&product_sem);//給鄰界資源可寫的資源加一,表示消費一個資料,拿走資料後,空出一個位置可寫
        c=(c+)%SIZE;
    }
}

void destroy()
{
    sem_destroy(&product_sem);
    sem_destroy(&consumer_sem);
    exit();
}

void initsem()
{
    signal(,destroy);//信号捕捉函數,自定義2号信号量為銷毀兩個信号量
    int i=;
    for(i=;i<SIZE;++i)
        table[i]=;

    sem_init(&product_sem,,SIZE);
    sem_init(&consumer_sem,,);
}



int main()
{
    initsem();
    pthread_t product1,product2,consumer1,consumer2;
    pthread_create(&product1,NULL,product_fun,NULL);
    pthread_create(&product2,NULL,product_fun,NULL);
    pthread_create(&consumer1,NULL,consumer_fun,NULL);
    pthread_create(&consumer2,NULL,consumer_fun,NULL);
    pthread_join(product1,NULL);
    pthread_join(product2,NULL);
    pthread_join(consumer1,NULL);
    pthread_join(consumer2,NULL);
    return ;
}
           

程式結果截圖:

Linux基于單連結清單&amp;環形隊列的多線程生産者消費者模型生産者–消費者模型簡述生産者–消費者間的關系生産者與消費者模型的實作
Linux基于單連結清單&amp;環形隊列的多線程生産者消費者模型生産者–消費者模型簡述生産者–消費者間的關系生産者與消費者模型的實作
Linux基于單連結清單&amp;環形隊列的多線程生産者消費者模型生産者–消費者模型簡述生産者–消費者間的關系生産者與消費者模型的實作
Linux基于單連結清單&amp;環形隊列的多線程生産者消費者模型生産者–消費者模型簡述生産者–消費者間的關系生産者與消費者模型的實作

如有錯誤,請指出!

繼續閱讀