天天看點

程序間同步互斥經典問題(一)生産者-消費者問題

問題描述:

生産者-消費者問題,也叫做緩存綁定問題(bounded-buffer),是一個多程序同步問題。

  • 即有兩個程序:制造少和消費者,共享一個固定大小的緩存
  • 制造商的工作是制造一段資料,放進緩存,如此重複。
  • 消費者一次消費一段資料,從緩存中取出。
  • 要保證不讓制造商在緩存還是滿的時候仍要向内寫資料,不讓消費者試圖從空的緩存中取出資料。

問題分析:

  1. 要避免多個生産商競争一個空位的情況。
  2. 要避免生産商和消費者同時睡覺,造成死鎖
  3. 要避免多個消費者競争同一段資料的情況

基本思路:

  • 首先我們需要定義三個信号量:
    • empty:用來表示空位的數量,初值為n(緩存區的規模)
    • full:用來表示剩餘商品的數量,初值為0
    • mutex:用來表示互斥量,同一時間共享的記憶體隻能由一個程序通路。
  • 首先考慮生産商,在多個生産商和多個消費者同時運作的情況下:
    • 1.檢查共享記憶體是否滿
      • 如果滿,說明無法急需生産,那麼睡覺。
      • 否則,p(empty),程序繼續
    • 2.每一個生産商在執行操作前申請互斥鎖
      • 如果被其他程序上鎖,說明目前共享記憶體區域被其他程序使用,那麼進入等待隊列等待互斥鎖被釋放
      • 否則,對互斥鎖上鎖,程序繼續
    • 3.生産一個新的商品放入空位,并且釋放互斥鎖,v(full)
  • 然後考慮消費者,在多個消費者和多個生産商同時存在的情況下:
    • 1.檢查共享的記憶體中資料是否為空
      • 如果空,說明無法消費,那麼睡覺
      • 否則,p(full),程序繼續
    • 2.在執行操作前申請互斥鎖
      • 如果被其他程序上鎖,說明目前共享記憶體區域被其他程序使用,那麼進入等待隊列等待互斥鎖被釋放
      • 否則,對互斥鎖上鎖,程序繼續
    • 3.消費一個商品,并且釋放互斥鎖,v(empty)

注釋完整的實作方案(C語言)

  • init.c : 對共享記憶體的資料進行初始化和清理工作
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/msg.h>
#include "ipc.h"

int main ( int argc , char * argv[] )
{
    int i,producer_pid,consumer_pid,item,shmid;
    semaphore mutex,empty,full;//定義互斥量mutex,定義信号量empty,full;
    union semun sem_union;
    void* shared_memory = (void*);
    struct shared_use_st * shared_stuff;
    /*
        int semget ( key_t key , int num_sems , int sem_flags );
        1.第一個參數key是一個用來允許不相關的程序通路相同信号量的整數值,
        所有的信号量信号量是為不同的程式提供一個key來簡介通路的,
        對于每一個信号量系統生成一個信号量辨別符。符号量鍵值隻可以由semget
        獲得,所有其他的信号量函數所用的信号量辨別符都是由semget所傳回的。

        2.num_sems參數是所需要的信号量數目,這個值通常是1

        3.sem_flags參數是一個标記集合
        IPC_CREAT與IPC_EXCL的組合來保證我們可以的到一個新的唯一的信号量
        如果得不到會報錯

        4.傳回值如果是一個整數,這是用于其他信号量函數的辨別符,
        如果失敗,則會傳回-1
    */
        if ( ( mutex = semget ( (key_t) KEY_MUTEX ,  , |IPC_CREAT)) == - )
        {
            fprintf ( stderr , "Failed to create semaphore!");
            exit(EXIT_FAILURE);
        }
        if (( empty = semget( (key_t) KEY_EMPTY ,  , |IPC_CREAT)) == - )
        {
            fprintf ( stderr , "Failed to create semaphore!");
            exit(EXIT_FAILURE);
        }
        if (( full = semget((key_t) KEY_FULL ,  , |IPC_CREAT)) == - )
        {
            fprintf ( stderr , "Failed to create semaphore!");
            exit(EXIT_FAILURE);
        }
        if ((shmid = shmget ((key_t)KEY_SHM,sizeof (struct shared_use_st),|IPC_CREAT))==-)
        {
            fprintf ( stderr , "Failerd to create shared memory!");
            exit(EXIT_FAILURE);           
        }

        sem_union.val = ;
        if ( semctl(mutex,,SETVAL,sem_union) == - )
        {
            fprintf ( stderr , "Failed to set semaphore!");
            exit(EXIT_FAILURE);
        }

        sem_union.val = ;
        if ( semctl(full,,SETVAL,sem_union) == - )
        {
            fprintf ( stderr , "Failed to set semaphore!");
            exit(EXIT_FAILURE);
        }


        sem_union.val = BUFFER_SIZE;
        if ( semctl(empty,  , SETVAL , sem_union) == - )
        {
            fprintf ( stderr , "Failed to set semaphore!");
            exit(EXIT_FAILURE);
        }
        /*
            void* shmat ( int shmid , const void* shmaddr , int shmflg )
            連結共享記憶體辨別符為shmid的共享記憶體,連結成功後把共享記憶體區對象
            映射到調用程序的位址空間,随後可像本地空間一樣通路。
            shmid  共享記憶體辨別符
            shmaddr 指定共享記憶體出現在程序記憶體位址的什麼位置,直接指定為NULL讓核心自己決定
            一個合适的位址位置.
            SHM_RDONLY 為隻讀模式,其他為讀寫模式
        */
        if (( shared_memory = shmat( shmid ,(void*) , )) == (void*)- )
        {
            fprintf ( stderr , "shmat failed\n");
            exit(EXIT_FAILURE);
        }

        shared_stuff = ( struct shared_use_st * ) shared_memory;

        for ( i = ; i < BUFFER_SIZE ; i ++ )
        {
            shared_stuff->buffer[i] = ;
        }
        shared_stuff->low = ;
        shared_stuff->high = ;
        shared_stuff->cur = ;

        exit(EXIT_SUCCESS);
}

    /*
       int shmdt ( const void* shmaddr )
       與shmat相反,是用來斷開與共享記憶體附加點的位址,進制本程序通路瓷片共享記憶體
       shmaddr:連結共享記憶體的起始位址
       成功傳回0,除錯傳回-1
    */
           
  • producer.c: 生産商的程序代碼
#include "ipc.h"


int main ( int argc , char * argv[] )
{
    int i,item,shmid;
    semaphore  mutex,empty,full;
    union semun sem_union;
    void* shared_memory = (void*);
    struct shared_use_st *shared_stuff;

    if ((mutex = semget((key_t)KEY_MUTEX,,|IPC_CREAT)) == - )
    {
        fprintf ( stderr , "Failed to create semaphore!");
        exit(EXIT_FAILURE);
    }
    if ((empty = semget((key_t)KEY_EMPTY,,|IPC_CREAT)) == - )
    {
        fprintf ( stderr , "Failed to create semaphore!");
        exit(EXIT_FAILURE);
    }
    if ((full = semget((key_t)KEY_FULL,,|IPC_CREAT)) == - )
    {
        fprintf ( stderr , "Failed to create semaphore!");
        exit(EXIT_FAILURE);
    }
    if ((shmid=shmget((key_t)KEY_SHM,sizeof(struct shared_use_st),|IPC_CREAT)) == - )
    {
        fprintf ( stderr , "Failed to create semaphore!");
        exit(EXIT_FAILURE);
    }
    if ((shared_memory = shmat(shmid,(void*),)) == (void*)-)
    {
        fprintf ( stderr , "shmat failed\n");
        exit(EXIT_FAILURE);
    }
    shared_stuff = ( struct shared_use_st * ) shared_memory;
    for ( i =  ; i <  ; i++ )
    {
        item = ++(shared_stuff->cur);
        sleep();
        printf ( "Producing item %d\n" , item );
        sem_p ( empty );//減少一個空位
        sem_p ( mutex );//鎖上互斥鎖
        (shared_stuff->buffer)[(shared_stuff->high)] = item;
        (shared_stuff->high) = ((shared_stuff->high)+) % BUFFER_SIZE;
        printf ( "Inserting item %d\n" , item );
        sem_v ( mutex );
        sem_v ( full );
    }

    if ( shmdt(shared_memory) == - )
    {
        fprintf ( stderr , "shmat failed\n");
        exit(EXIT_FAILURE);
    }
    printf ( "Finish!\n" );
    getchar();
    exit(EXIT_SUCCESS);
}
           
  • consumer.c: 消費者的程序代碼
#include "ipc.h"

int main ( int argc , char* argv[] )
{
    int i,item,shmid;
    semaphore mutex,empty,full;
    void* shared_memory = ( void* ) ;
    struct shared_use_st* shared_stuff;

    if ((mutex=semget((key_t)KEY_MUTEX,,|IPC_CREAT)) == - )
    {
        fprintf (stderr , "Failed to create semaphore!");
        exit(EXIT_FAILURE);
    }
    if ((empty=semget((key_t)KEY_EMPTY,,|IPC_CREAT)) == - )
    {
        fprintf (stderr , "Failed to create semaphore!");
        exit(EXIT_FAILURE);
    }
    if ((full=semget((key_t)KEY_FULL,,|IPC_CREAT)) == - )
    {
        fprintf (stderr , "Failed to create semaphore!");
        exit(EXIT_FAILURE);
    }
    if ((shared_memory = shmat(shmid,(void*),) ) ==(void*)- )
    {
        fprintf ( stderr , "shmat failed\n");
        exit(EXIT_FAILURE);
    }
    shared_stuff = (struct shared_use_st*)shared_memory;
    for(i=;i<;i++)
    {
        sem_p(full);
        sem_p(mutex);
        item = shared_stuff->buffer[shared_stuff->low];
        (shared_stuff->buffer)[(shared_stuff->low)]=;
        shared_stuff->low = ((shared_stuff->low)+)%BUFFER_SIZE;
        printf ( "Removing item %d\n" , item );
        sem_v(mutex);
        sem_v(empty);
        printf ( "Consuming item %d\n" ,item );
        sleep();
    }

    if ( shmdt(shared_memory) == - ) 
    {
        fprintf ( stderr , "shmat failed\n" );
        exit ( EXIT_FAILURE);
    }
    printf ( "Finish!\n" );
    getchar();
    exit(EXIT_SUCCESS);
}
           

繼續閱讀