1. 算法思想
因為有多個緩沖區,是以生産者線程沒有必要在生成新的資料之前等待最後一個資料被消費者線程處理完畢。同樣,消費者線程并不一定每次隻能處理一個資料。在多緩沖區機制下,線程之間不必互相等待形成死鎖,因而提高了效率。
多個緩沖區就好像使用一條傳送帶替代托架,傳送帶上一次可以放多個産品。生産者在緩沖區尾 加入資料,而消費者則在緩沖區頭讀取資料。當緩沖區滿的時候, 緩沖區就上鎖并等待消費者線程讀取資料;每一個生産或消費動作使得傳送帶向前移動一個機關,因而,消費者線程讀取資料的順序和資料産生順序是相同的。
可 以引入一個count計數器來表示已經被使用的緩沖區數量。用hNotEmptyEvent 和hNotFullEvent信号量來同步生産者和消費者線程。每當生産者線程發現緩沖區滿( count=BufferSize ),它就等待hNotEmptyEvent 事件。同樣,當消費者線程發現緩沖區空,它就開始等待hNotEmptyEvent。生産者線程寫入一個新的資料之後,就立刻發出 hNotEmptyEvent 來喚醒正在等待的消費者線程;消費者線程在讀取一個資料之後,就發出hNotFullEvent 來喚醒正在等待的生産者線程。
程式的設計思想大緻為:設定一while循環,pi生産者通路臨界區,得到權限通路緩沖區,如果緩沖區滿 的,則等待,直到緩沖區非滿;通路互斥鎖,當得到互斥鎖 且 緩沖區非滿時,跳出while循環,開始産生新資料,并把資料存放于Buffer緩沖區中,當數 據存放結束則結束臨界區;接着喚醒消費者線程;ci消費者通路臨界區,得到權限通路緩沖區,如果緩沖區為空,沒有可以處理的資料,則釋放互斥鎖且等待,直 到緩沖區非空;當等到緩沖區非空時,跳出while循環;消費者獲得資料,并根據所獲得的資料按類别消費(當消費者獲得的資料為大寫字母時,則把大寫字母 轉換成小寫字母,并顯示;當消費者獲得的資料為小寫字母時,則把小寫字母轉換成大寫字母,并顯示;當消費者獲得的資料為字元0、1、2、……8、9時,把 這些字元直接顯示到螢幕;當消費者獲得的資料為符号(+、-、*、\……)時,把這些符号列印成7行7列的菱形);處理完資料後,結束臨界區;接着喚醒生 産者線程。
另外,由于有界緩沖區是一個臨界資源,必須互斥使用,是以還需要再設定一個互斥信号量g_hMutex,起初值為1。
2 算法實作
生 産者-消費者問題是一個經典的程序同步問題,該問題最早由Dijkstra提出,用以示範他提出的信号量機制。在同一個程序位址空間内執行的兩個線程。生 産者線程生産物品,然後将物品放置在一個空緩沖區中供消費者線程消費。消費者線程從緩沖區中獲得物品,然後釋放緩沖區。當生産者線程生産物品時,如果沒有 空緩沖區可用,那麼生産者線程必須等待消費者線程釋放出一個空緩沖區。當消費者線程消費物品時,如果沒有滿的緩沖區,那麼消費者線程将被阻塞,直到新的物 品被生産出來。
#include <windows.h>
#include <iostream>
const unsigned short SIZE_OF_BUFFER = 10; //緩沖區長度
unsigned short ProductID = 0; //産品号
unsigned short ConsumeID = 0; //将被消耗的産品号
unsigned short in = 0; //産品進緩沖區時的緩沖區下标
unsigned short out = 0; //産品出緩沖區時的緩沖區下标
<b>int g_buffer[SIZE_OF_BUFFER]; //</b><b>緩沖區是個循環隊列</b>
bool g_continue = true; //控制程式結束
<b>HANDLE g_hMutex; //</b><b>用于線程間的互斥</b><b>保護緩沖區臨界資源</b><b></b>
HANDLE g_hFullSemaphore; //<b>當緩沖區滿時迫使生産者等待</b><b> </b><b></b>
HANDLE g_hEmptySemaphore; //<b>當緩沖區空時迫使消費者等待</b>
DWORD WINAPI Producer(LPVOID); //生産者線程
DWORD WINAPI Consumer(LPVOID); //消費者線程
int main()
{
// Create mutex and semophores
g_hMutex = CreateMutex(NULL,FALSE,NULL);
g_hFullSemaphore = CreateSemaphore(NULL,<b>SIZE_OF_BUFFER-1</b>,SIZE_OF_BUFFER-1,NULL);
g_hEmptySemaphore = CreateSemaphore(NULL,<b>0</b>,SIZE_OF_BUFFER-1,NULL);
// 調整下面的數值,可以發現,當生産者個數多于消費者個數時,
// 生産速度快,生産者經常等待消費者;反之,消費者經常等待
const unsigned short PRODUCERS_COUNT = 3; //生産者的個數
const unsigned short CONSUMERS_COUNT = 1; //消費者的個數
// 總的線程數
const unsigned short THREADS_COUNT = PRODUCERS_COUNT+CONSUMERS_COUNT;
HANDLE hThreads[PRODUCERS_COUNT]; //各線程的handle
DWORD producerID[CONSUMERS_COUNT]; //生産者線程的辨別符
DWORD consumerID[THREADS_COUNT]; //消費者線程的辨別符
// 建立生産者線程
for (int i=0;i <PRODUCERS_COUNT;++i){
hThreads[i]=CreateThread(NULL,0,Producer,NULL,0,&producerID[i]);
if (hThreads[i]==NULL) return -1;
}
//建立消費者線程
for (int i=0;i <CONSUMERS_COUNT;++i){
hThreads[PRODUCERS_COUNT+i]=CreateThread(NULL,0,Consumer,NULL,0,&consumerID[i]);
while(g_continue){
if(getchar()){ //按回車後終止程式運作
g_continue = false;
}
return 0;
}
/**********
*生産者
*/
DWORD WINAPI Producer (LPVOID lpPara)
while (g_continue){
<b> WaitForSingleObject(g_hFullSemaphore,INFINITE);</b> // if g_hFullSemaphore= 0, means the buffer is full, then we have to wait
<b>WaitForSingleObject(g_hMutex,INFINITE);</b> // protect the cirtical section of buffer.
Produce();
Append();
Sleep(1500);
<b>ReleaseMutex(g_hMutex);</b>
<b>ReleaseSemaphore(g_hEmptySemaphore,1,NULL);</b> // tell consumer that buffer is no empty, since g_hEmptySemaphore=1
*消費者
DWORD WINAPI Consumer (LPVOID lpPara)
// if g_hEmptySemaphore=0, means there is no resource in buffer, we have to wait
<b>WaitForSingleObject(g_hEmptySemaphore,INFINITE);</b>
<b>WaitForSingleObject(g_hMutex,INFINITE);</b>
Take();
Consume();
<b> ReleaseSemaphore(g_hFullSemaphore,1,NULL); // tell producer that buffer is not full, he can continue to produce.</b>
/*******
*生産一個産品。簡單模拟了一下,僅輸出新産品的ID号
void Produce()
std::cerr < < "Producing " < < ++ProductID < < " ... ";
std::cerr < < "Succeed " < < std::endl;
/******
*把新生産的産品放入緩沖區
void Append()
std::cerr < < "Appending a product ... ";
g_buffer[in] = ProductID;
in = (in+1)%SIZE_OF_BUFFER;
//輸出緩沖區目前的狀态
for (int i=0;i <SIZE_OF_BUFFER;++i){
std::cout < < i < < ": " < < g_buffer[i];
if (i==in) std::cout < < " <-- 生産 ";
if (i==out) std::cout < < " <-- 消費 ";
std::cout < < std::endl;
/****************
*從緩沖區中取出一個産品
void Take()
std::cerr < < "Taking a product ... ";
ConsumeID = g_buffer[out];
out = (out+1)%SIZE_OF_BUFFER;
/***********
*消耗一個産品
void Consume()
std::cerr < < "Consuming " < < ConsumeID < < " ... ";