生産者消費者模式是實際應用中常見的場景,一個或多個生産者線程往緩沖區添加資料,一個或多個消費者線程從緩沖區取資料,生産者消費者模型有這樣幾個正确性要求:
1.緩沖區是共享變量,一般由數組或者連結清單實作,是線程不安全的,同一時刻,隻能有一個線程操作緩沖區(要求互斥);
2.當發現緩沖區滿後,生産者必須等待消費者(要求同步);
3.緩沖區空後,消費者必須等待生産者(要求同步)
基于信号量解決這個模型,設計思路如下:
1.buffer緩沖區,大小n
2.buffer是線程不安全的共享變量,需要在臨界區進行,mutex=new Semaphore(1)
3.控制buffer滿了生産者等待消費者的計數信号量,初始值表明目前生産者可以往buffer中添加n個資料
fullSemp=new Semaphore(n),生産者生産一個元素該值-1,消費者添加元素該值+1
4.控制buffer空了消費者等待生産者的計數信号量,初始值表明目前消費者可以從buffer中取出0個資料
emptySemp=new Semaphore(0),生産者生産一個元素該值+1,消費者添加元素該值-1
5.buffer設計:
采用數組方式:基于下标插入和擷取元素
putIndex=0;插入元素的下标,生産者一直往後put,到最大時,歸0
takeIndex=0;擷取元素的下标,消費者一直往後take,到最大時,歸0
代碼如下:
producer(){
fullSemp-->P();//生産者可生産個數-1,如果減之後<0,阻塞
mutex-->P();//buffer數組,屬于共享資料,添加資料是線程不安全操作,需要互斥
insertToBuffer(new e);
mutex-->V();
emptySemp-->V();//消費者可消費個數+1,如果目前<=0,說明有消費者在阻塞,會自動喚醒一個消費者
}
consumer(){
emptySemp-->P();消費者可消費個數-1,如果減之後<0,阻塞
mutex-->P();//buffer數組,屬于共享資料,擷取資料是線程不安全操作,需要互斥
takeFrombuffer();
mutex-->V();
fullSemp-->V();//生産者可生産個數+1,如果目前<=0,說明有生産者在阻塞,會自動喚醒一個生産者
}
//為什麼說buffer的插入讀取操作是線程不安全的呢?因為涉及到共享變量takeIndex和putIndex的非原子性通路
void insertToBuffer(e){
buffer[putIndex++]=e;
if(putIndex==n){
putIndex=0;
}
}
e takeFrombuffer(){
e=buffer[takeIndex++];
buffer[takeIndex]==null;
if(takeIndex==n){
takeIndex=0;
}
return e;
}
考慮一個問題,producer裡的兩個P操作能調換順序麼?答案是不能
試想一下,如果調換順序,假如現在緩沖區已經滿了,再調用producer,就會先拿到mutex信号量,然後阻塞在fullSemp信号量上等待消費者,此時再調用consumer,就會阻塞在mutex信号量上等待生産者,産生了死鎖
基于管程解決這個模型,設計思路如下:
1.buffer緩沖區,大小n,依然用數組實作
2.int count=0;//表示緩沖區中現存元素數,初始為0
3.管程的互斥鎖Lock lock;
4.管程的條件變量,Condition notFull,表示緩沖區未滿的條件,當緩沖區滿時,條件轉為不滿足,生産者阻塞
5.管程的條件變量,Condition notEmpty,表示緩沖區未空的條件,當緩沖區空時,條件轉為不滿足,消費者阻塞
代碼如下:
producer(){
lock-->Acquire();//管程的生産消費邏輯是進入就加鎖,這和上面信号量的方式有所差別
while(count==bufferSize){
notFull.wait(&lock);//緩沖區滿,notFull條件轉為不滿足,執行等待,釋放互斥鎖
}
buffer[count++]=new e;
notEmpty.signal();//如果有消費者在等待,會喚醒其中一個
lock-->Release();
}
consumer(){
lock-->Acquire();//管程的生産消費邏輯是進入就加鎖,這和上面信号量的方式有所差別
while(count==0){
notEmpty.wait(&lock);//緩沖區空,notEmpty條件轉為不滿足,執行等待,釋放互斥鎖
}
e=buffer[count--];
buffer[count]=null;
notFull.signal();//如果有生産者在等待,會喚醒其中一個
lock-->Release();
}
還記得我在上一篇文章信号量與管程原理中留的一個問題嗎?我把它貼過來
Condition::Wait(lock){
numWaiting++;//等待的線程數增1
add this thread t to waitQ;//線程加到等待隊列中
release(lock);//wait操作一定是獲得鎖的情況下執行的,這裡要釋放掉已經拿到的鎖,讓其他線程可以執行
schedule();//這裡可以了解為留一定的時間讓其他線程去執行
acquire(lock);//為啥這裡還要再擷取鎖呢?
}
為啥釋放鎖後,後面又跟個擷取鎖?單看wait方法确實難以了解,我們把wait的内容帶入producer方法中
producer(){
lock-->Acquire();//管程的生産消費邏輯是進入就加鎖,這和上面信号量的方式有所差別
while(count==bufferSize){
//notFull.wait(&lock);//緩沖區滿,notFull條件轉為不滿足,執行等待,釋放互斥鎖
numWaiting++;//等待的線程數增1
add this thread t to waitQ;//線程加到等待隊列中
release(lock);//wait操作一定是獲得鎖的情況下執行的,這裡要釋放掉已經拿到的鎖,讓其他線程可以執行
schedule();//這裡可以了解為留一定的時間讓其他線程去執行
acquire(lock);//試想下,沒有這句會怎樣,目前線程已經釋放掉鎖了,然後留了一定的時間給其他線程執行,然後再執行while判斷,此時不論while是否滿足,後續操作都需要釋放鎖,鎖已經釋放掉了,哪來的鎖再去釋放?是以必須再視圖擷取下鎖,成功後才能繼續往下進行
再換個角度想一想,這裡難了解的原因是,釋放鎖操作在加鎖的前面,這和我們平時使用鎖操作的步驟是相反的,但無論怎樣,大家一定知道加鎖與釋放鎖一定是成對出現的,不管誰先誰後
}
buffer[count++]=new e;
notEmpty.signal();//如果有消費者在等待,會喚醒其中一個
lock-->Release();
}
PS:如果有不足之處,歡迎指出;如果解決了你的疑惑,就點個贊吧o(* ̄︶ ̄*)o