天天看點

生産者消費者模型(條件變量)賣蘋果的模型生産者消費者完整代碼

三種關系:互斥,同步,互斥和同步

兩類角色:生産者,消費者(線程)

一個交易場所:生産者消費者共享的區域

賣蘋果的模型

  • dish上面隻有一個蘋果
  • 買家必須要等賣家把蘋果放到dish上才可以去買蘋果。
  • 賣家必須等待買家把蘋果買走才可以生産蘋果
  • pthread_mutex_lock(&mutex); 和pthread_mutex_unlock(&mutex); 成對出現,裡面的操作為一個原子操作
  • pthread_cond_wait(&empty,&mutex);内部有一個加鎖解鎖操作
  • 過程詳解
    • 蘋果為0
    • 初始化時蘋果為0,買家在pthread_cond_wait(&empty,&mutex); 開始等待,賣家不進入while循環,生産蘋果後發出pthread_cond_signal(&empty);讓正在因dish上蘋果為0的買家停止等待pthread_cond_wait(&empty,&mutex);
    • 蘋果為1
    • 當一個賣家進入pthread_mutex_lock(&mutex); 但是發現蘋果為1時,就在while循環處等待pthread_cond_wait(&empty,&mutex);,直到買家發出信号 pthread_cond_signal(&full);賣家停止等待,此時蘋果為0,跳出while循環,開始生産蘋果。生産蘋果後發出pthread_cond_signal(&empty);讓正在因dish上蘋果為0的買家停止等待pthread_cond_wait(&empty,&mutex); 這樣這個過程就結束了

      *

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>

int dish = ;
pthread_cond_t full;
pthread_cond_t empty;
pthread_mutex_t mutex;


void * consumer(void * arg)
{
    int id = (int)arg;
    while()
    {
        pthread_mutex_lock(&mutex);//上鎖
        while(dish == )
        {
            //買家發現沒有蘋果開始等待
            pthread_cond_wait(&empty,&mutex);
        }
    usleep();

    dish = ;
    printf("consumer %d get an apple!!\n", id);
    //給賣家發送信号
    pthread_cond_signal(&full);
    pthread_mutex_unlock(&mutex);//解鎖

    }
    return NULL;
}




void * product(void *arg)
{
    int id = (int)arg;
    while()
    {
        pthread_mutex_lock(&mutex);    
        while(dish == )
            {
                pthread_cond_wait(&full,&mutex);
            }
            dish =;
             printf("producer %d put an apple!!\n", id);      
        pthread_cond_signal(&empty);
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}


int main()
{
    pthread_t tid;
    int set ;
    //初始化
    pthread_cond_init(&full,NULL);
    pthread_cond_init(&empty,NULL);
    pthread_mutex_init(&mutex,NULL);

    //四個賣家生産蘋果
    for(int i = ;i<;i++)
    {
        pthread_create(&tid,NULL,product,(void *)i);
    }
    //四個買家買蘋果
    for(int i = ;i<;i++)
    {
        pthread_create(&tid,NULL,consumer,(void *)i);
    }

    //等待thread辨別的線程終止,防止僵屍程序産生
    pthread_join(tid,NULL);

    //銷毀所有資源
    pthread_cond_destroy(&full);
    pthread_cond_destroy(&empty);
    pthread_mutex_destroy(&mutex);
}

           

如果對于互斥量含義尚不清楚,建議閱讀Linux/UNIX系統程式設計手冊

pthread_mutex_t
pthread_mutex_init
pthread_mutex_lock
pthread_mutex_unlock
pthread_mutex_destroy
           

模型采取Linux/UNIX系統程式設計手冊

這個模型很好,也把概念講清楚了,加了中文注釋

生産者

s = pthread_mutex_lock(&mtx);
if (s != )
    errExitEN(s, "pthread_mutex_lock");

avail++;        /*這是一個原子操作 */

s = pthread_mutex_unlock(&mtx);
if (s != )
    errExitEN(s, "pthread_mutex_unlock");

s = pthread_cond_signal(&cond);         /* 喚醒消費者 */
if (s != )
    errExitEN(s, "pthread_cond_signal");
           

消費者

s = pthread_mutex_lock(&mtx);
if (s != )
    errExitEN(s, "pthread_mutex_lock");

while (avail == )
 {           
    s = pthread_cond_wait(&cond, &mtx);
    if (s != )
        errExitEN(s, "pthread_cond_wait");
}
           

完整代碼

#include <time.h>
#include <pthread.h>
#include "tlpi_hdr.h"

static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

static int avail = ;

static void *
threadFunc(void *arg)
{
    int cnt = atoi((char *) arg);
    int s, j;

    for (j = ; j < cnt; j++) {
        sleep();

        /* Code to produce a unit omitted */

        s = pthread_mutex_lock(&mtx);
        if (s != )
            errExitEN(s, "pthread_mutex_lock");

        avail++;        /* Let consumer know another unit is available */

        s = pthread_mutex_unlock(&mtx);
        if (s != )
            errExitEN(s, "pthread_mutex_unlock");

        s = pthread_cond_signal(&cond);         /* Wake sleeping consumer */
        if (s != )
            errExitEN(s, "pthread_cond_signal");
    }

    return NULL;
}

int
main(int argc, char *argv[])
{
    pthread_t tid;
    int s, j;
    int totRequired;            /* Total number of units that all threads
                                   will produce */
    int numConsumed;            /* Total units so far consumed */
    Boolean done;
    time_t t;

    t = time(NULL);

    /* Create all threads */

    totRequired = ;
    for (j = ; j < argc; j++) {
        totRequired += atoi(argv[j]);

        s = pthread_create(&tid, NULL, threadFunc, argv[j]);
        if (s != )
            errExitEN(s, "pthread_create");
    }

    /* Loop to consume available units */

    numConsumed = ;
    done = FALSE;

    for (;;) {
        s = pthread_mutex_lock(&mtx);
        if (s != )
            errExitEN(s, "pthread_mutex_lock");

        while (avail == ) {            /* Wait for something to consume */
            s = pthread_cond_wait(&cond, &mtx);
            if (s != )
                errExitEN(s, "pthread_cond_wait");
        }

        /* At this point, 'mtx' is locked... */

        while (avail > ) {             /* Consume all available units */

            /* Do something with produced unit */

            numConsumed ++;
            avail--;
            printf("T=%ld: numConsumed=%d\n", (long) (time(NULL) - t),
                    numConsumed);

            done = numConsumed >= totRequired;
        }

        s = pthread_mutex_unlock(&mtx);
        if (s != )
            errExitEN(s, "pthread_mutex_unlock");

        if (done)
            break;

        /* Perhaps do other work here that does not require mutex lock */

    }

    exit(EXIT_SUCCESS);
}
           

繼續閱讀