天天看點

信号量與管程實作生産者消費者模型

生産者消費者模式是實際應用中常見的場景,一個或多個生産者線程往緩沖區添加資料,一個或多個消費者線程從緩沖區取資料,生産者消費者模型有這樣幾個正确性要求:

1.緩沖區是共享變量,一般由數組或者連結清單實作,是線程不安全的,同一時刻,隻能有一個線程操作緩沖區(要求互斥);

2.當發現緩沖區滿後,生産者必須等待消費者(要求同步);

3.緩沖區空後,消費者必須等待生産者(要求同步)

基于信号量解決這個模型,設計思路如下:

1.buffer緩沖區,大小n

2.buffer是線程不安全的共享變量,需要在臨界區進行,mutex=new Semaphore(1)

3.控制buffer滿了生産者等待消費者的計數信号量,初始值表明目前生産者可以往buffer中添加n個資料

       fullSemp=new Semaphore(n),生産者生産一個元素該值-1,消費者添加元素該值+1

4.控制buffer空了消費者等待生産者的計數信号量,初始值表明目前消費者可以從buffer中取出0個資料

       emptySemp=new Semaphore(0),生産者生産一個元素該值+1,消費者添加元素該值-1

5.buffer設計:

       采用數組方式:基于下标插入和擷取元素

       putIndex=0;插入元素的下标,生産者一直往後put,到最大時,歸0

       takeIndex=0;擷取元素的下标,消費者一直往後take,到最大時,歸0

代碼如下:

producer(){

       fullSemp-->P();//生産者可生産個數-1,如果減之後<0,阻塞

       mutex-->P();//buffer數組,屬于共享資料,添加資料是線程不安全操作,需要互斥

       insertToBuffer(new e);

       mutex-->V();

       emptySemp-->V();//消費者可消費個數+1,如果目前<=0,說明有消費者在阻塞,會自動喚醒一個消費者

}

consumer(){

       emptySemp-->P();消費者可消費個數-1,如果減之後<0,阻塞

       mutex-->P();//buffer數組,屬于共享資料,擷取資料是線程不安全操作,需要互斥

       takeFrombuffer();

       mutex-->V();

       fullSemp-->V();//生産者可生産個數+1,如果目前<=0,說明有生産者在阻塞,會自動喚醒一個生産者

}

//為什麼說buffer的插入讀取操作是線程不安全的呢?因為涉及到共享變量takeIndex和putIndex的非原子性通路

void insertToBuffer(e){

       buffer[putIndex++]=e;

       if(putIndex==n){

              putIndex=0;

       }

}

e takeFrombuffer(){

       e=buffer[takeIndex++];

       buffer[takeIndex]==null;

       if(takeIndex==n){

              takeIndex=0;

       }

       return e;

}

考慮一個問題,producer裡的兩個P操作能調換順序麼?答案是不能

試想一下,如果調換順序,假如現在緩沖區已經滿了,再調用producer,就會先拿到mutex信号量,然後阻塞在fullSemp信号量上等待消費者,此時再調用consumer,就會阻塞在mutex信号量上等待生産者,産生了死鎖

基于管程解決這個模型,設計思路如下:

1.buffer緩沖區,大小n,依然用數組實作

2.int count=0;//表示緩沖區中現存元素數,初始為0

3.管程的互斥鎖Lock lock;

4.管程的條件變量,Condition notFull,表示緩沖區未滿的條件,當緩沖區滿時,條件轉為不滿足,生産者阻塞

5.管程的條件變量,Condition notEmpty,表示緩沖區未空的條件,當緩沖區空時,條件轉為不滿足,消費者阻塞

代碼如下: 

producer(){

      lock-->Acquire();//管程的生産消費邏輯是進入就加鎖,這和上面信号量的方式有所差別

      while(count==bufferSize){

            notFull.wait(&lock);//緩沖區滿,notFull條件轉為不滿足,執行等待,釋放互斥鎖

      }

      buffer[count++]=new e;

      notEmpty.signal();//如果有消費者在等待,會喚醒其中一個

      lock-->Release();

}

consumer(){

      lock-->Acquire();//管程的生産消費邏輯是進入就加鎖,這和上面信号量的方式有所差別

      while(count==0){

            notEmpty.wait(&lock);//緩沖區空,notEmpty條件轉為不滿足,執行等待,釋放互斥鎖

      }

      e=buffer[count--];

      buffer[count]=null;

      notFull.signal();//如果有生産者在等待,會喚醒其中一個

      lock-->Release();

}

還記得我在上一篇文章信号量與管程原理中留的一個問題嗎?我把它貼過來

Condition::Wait(lock){

              numWaiting++;//等待的線程數增1

              add this thread t to waitQ;//線程加到等待隊列中

              release(lock);//wait操作一定是獲得鎖的情況下執行的,這裡要釋放掉已經拿到的鎖,讓其他線程可以執行

              schedule();//這裡可以了解為留一定的時間讓其他線程去執行

              acquire(lock);//為啥這裡還要再擷取鎖呢?

}

為啥釋放鎖後,後面又跟個擷取鎖?單看wait方法确實難以了解,我們把wait的内容帶入producer方法中

producer(){

      lock-->Acquire();//管程的生産消費邏輯是進入就加鎖,這和上面信号量的方式有所差別

      while(count==bufferSize){

              //notFull.wait(&lock);//緩沖區滿,notFull條件轉為不滿足,執行等待,釋放互斥鎖

              numWaiting++;//等待的線程數增1

              add this thread t to waitQ;//線程加到等待隊列中

              release(lock);//wait操作一定是獲得鎖的情況下執行的,這裡要釋放掉已經拿到的鎖,讓其他線程可以執行

              schedule();//這裡可以了解為留一定的時間讓其他線程去執行

              acquire(lock);//試想下,沒有這句會怎樣,目前線程已經釋放掉鎖了,然後留了一定的時間給其他線程執行,然後再執行while判斷,此時不論while是否滿足,後續操作都需要釋放鎖,鎖已經釋放掉了,哪來的鎖再去釋放?是以必須再視圖擷取下鎖,成功後才能繼續往下進行

             再換個角度想一想,這裡難了解的原因是,釋放鎖操作在加鎖的前面,這和我們平時使用鎖操作的步驟是相反的,但無論怎樣,大家一定知道加鎖與釋放鎖一定是成對出現的,不管誰先誰後

      }

      buffer[count++]=new e;

      notEmpty.signal();//如果有消費者在等待,會喚醒其中一個

      lock-->Release();

}

PS:如果有不足之處,歡迎指出;如果解決了你的疑惑,就點個贊吧o(* ̄︶ ̄*)o

繼續閱讀