生産者消費者問題是多線程并發中一個非常經典的問題,相信學過作業系統課程的同學都清楚這個問題的根源。本文将就四種情況分析并介紹生産者和消費者問題,它們分别是:單生産者-單消費者模型,單生産者-多消費者模型,多生産者-單消費者模型,多生産者-多消費者模型,我會給出四種情況下的 C++11 并發解決方案。
一、單生産者-單消費者模型
顧名思義,單生産者-單消費者模型中隻有一個生産者和一個消費者,生産者不停地往産品庫中放入産品,消費者則從産品庫中取走産品,産品庫容積有限制,隻能容納一定數目的産品,如果生産者生産産品的速度過快,則需要等待消費者取走産品之後,産品庫不為空才能繼續往産品庫中放置新的産品,相反,如果消費者取走産品的速度過快,則可能面臨産品庫中沒有産品可使用的情況,此時需要等待生産者放入一個産品後,消費者才能繼續工作。C++11實作單生産者單消費者模型的代碼如下:
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
static const int repository_size = 10;//循環隊列的大小
static const int item_total = 20;//要生産的産品數目
std::mutex mtx;//互斥量,保護産品緩沖區
std::condition_variable repo_not_full;//條件變量訓示産品緩沖區不滿
std::condition_variable repo_not_empty;//條件變量訓示産品緩沖區不為空,就是緩沖區有産品
int item_buffer[repository_size];
static std::size_t read_position = 0;//消費者讀取産品的位置
static std::size_t write_position = 0;//生産者寫入産品的位置
std::chrono::seconds t(1);//a new feature of c++ 11 standard
void produce_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);
while (((write_position + 1) % repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);// 生産者等待"産品庫緩沖區不為滿"這一條件發生.
} //當緩沖區滿了之後我們就不能添加産品了
item_buffer[write_position] = i;//寫入産品
write_position++;
if (write_position == repository_size)//寫入的位置如果在隊列最後則重新設定
{
write_position = 0;
}
repo_not_empty.notify_all();//通知消費者産品庫不為空
//lck.unlock();//解鎖
}
int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
while (write_position == read_position)
{
std::cout << "Consumer is waiting for items..." << std::endl;
repo_not_empty.wait(lck);// 消費者等待"産品庫緩沖區不為空"這一條件發生.
}
data = item_buffer[read_position];//讀取産品
read_position++;
if (read_position >= repository_size)
{
read_position = 0;
}
repo_not_full.notify_all();//通知産品庫不滿
//lck.unlock();
return data;
}
void Producer_thread()
{
for (int i = 1; i <= item_total; ++i)
{
//std::this_thread::sleep_for(t);
std::cout << "生産者生産第" << i << "個産品" << std::endl;
produce_item(i);
}
}
void Consumer_thread()
{
static int cnt = 0;
while (1)
{
//std::this_thread::sleep_for(t);
int item = consume_item();
std::cout << "消費者消費第" << item << "個産品" << std::endl;
if (++cnt == item_total)
break;
}
}
int main()
{
std::thread producer(Producer_thread); // 建立生産者線程.
std::thread consumer(Consumer_thread); // 建立消費之線程.
producer.join();
consumer.join();
}
二、單生産者-多消費者模型
與單生産者和單消費者模型不同的是,單生産者-多消費者模型中可以允許多個消費者同時從産品庫中取走産品。是以除了保護産品庫在多個讀寫線程下互斥之外,還需要維護消費者取走産品的計數器,代碼如下:
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
static const int repository_size = 10;//循環隊列的大小
static const int item_total = 20;//要生産的産品數目
std::mutex mtx;//互斥量,保護産品緩沖區
std::mutex mtx_counter;//互斥量,保護産品計數器
std::condition_variable repo_not_full;//條件變量訓示産品緩沖區不滿
std::condition_variable repo_not_empty;//條件變量訓示産品緩沖區不為空,就是緩沖區有産品
int item_buffer[repository_size];//産品緩沖區,這裡使用了一個循環隊列
static std::size_t read_position = 0;//消費者讀取産品的位置
static std::size_t write_position = 0;//生産者寫入産品的位置
static std::size_t item_counter = 0;//消費者消費産品計數器
std::chrono::seconds t(1);//a new feature of c++ 11 standard
void produce_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);
//item buffer is full, just wait here.
while (((write_position + 1) % repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);// 生産者等待"産品庫緩沖區不為滿"這一條件發生.
} //當緩沖區滿了之後我們就不能添加産品了
item_buffer[write_position] = i;//寫入産品
write_position++;
if (write_position == repository_size)//寫入的位置如果在隊列最後則重新設定
{
write_position = 0;
}
repo_not_empty.notify_all();//通知消費者産品庫不為空
lck.unlock();//解鎖
}
int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
// item buffer is empty, just wait here.
while (write_position == read_position)
{
std::cout << "Consumer is waiting for items..." << std::endl;
repo_not_empty.wait(lck);// 消費者等待"産品庫緩沖區不為空"這一條件發生.
}
data = item_buffer[read_position];//讀取産品
read_position++;
if (read_position >= repository_size)
{
read_position = 0;
}
repo_not_full.notify_all();//通知産品庫不滿
lck.unlock();
return data;
}
void Producer_thread()
{
for (int i = 1; i <= item_total; ++i)
{
//std::this_thread::sleep_for(t);
std::cout << "生産者生産第" << i << "個産品" << std::endl;
produce_item(i);
}
}
void Consumer_thread()
{
bool read_to_exit = false;
while (1)
{
std::this_thread::sleep_for(t);
std::unique_lock<std::mutex> lck(mtx_counter);
if (item_counter < item_total)
{
int item = consume_item();
++item_counter;
std::cout << "消費者線程" << std::this_thread::get_id()
<< "消費第" << item << "個産品" << std::endl;
}
else
{
read_to_exit = true;
}
if (read_to_exit == true)
break;
}
std::cout << "Consumer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}
int main()
{
std::thread producer(Producer_thread); // 建立生産者線程.
std::vector<std::thread> thread_vector;
for (int i = 0; i != 5; ++i)
{
thread_vector.push_back(std::thread(Consumer_thread));// 建立消費者線程.
}
producer.join();
for (auto &thr : thread_vector)
{
thr.join();
}
}
三、多生産者-單消費者模型
與單生産者和單消費者模型不同的是,多生産者-單消費者模型中可以允許多個生産者同時向産品庫中放入産品。是以除了保護産品庫在多個讀寫線程下互斥之外,還需要維護生産者放入産品的計數器,代碼如下:
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
static const int repository_size = 10;//循環隊列的大小
static const int item_total = 20;//要生産的産品數目
std::mutex mtx;//互斥量,保護産品緩沖區
std::mutex mtx_counter;
std::condition_variable repo_not_full;//條件變量訓示産品緩沖區不滿
std::condition_variable repo_not_empty;//條件變量訓示産品緩沖區不為空,就是緩沖區有産品
int item_buffer[repository_size];//産品緩沖區,這裡使用了一個循環隊列
static std::size_t read_position = 0;//消費者讀取産品的位置
static std::size_t write_position = 0;//生産者寫入産品的位置
static std::size_t item_counter = 0;//計數器
std::chrono::seconds t(1);//a new feature of c++ 11 standard
void produce_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);
// item buffer is full, just wait here.
while (((write_position + 1) % repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);// 生産者等待"産品庫緩沖區不為滿"這一條件發生.
} //當緩沖區滿了之後我們就不能添加産品了
item_buffer[write_position] = i;//寫入産品
write_position++;
if (write_position == repository_size)//寫入的位置如果在隊列最後則重新設定
{
write_position = 0;
}
repo_not_empty.notify_all();//通知消費者産品庫不為空
lck.unlock();//解鎖
}
int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
// item buffer is empty, just wait here.
while (write_position == read_position)
{
std::cout << "Consumer is waiting for items..." << std::endl;
repo_not_empty.wait(lck);// 消費者等待"産品庫緩沖區不為空"這一條件發生.
}
data = item_buffer[read_position];//讀取産品
read_position++;
if (read_position >= repository_size)
{
read_position = 0;
}
repo_not_full.notify_all();//通知産品庫不滿
lck.unlock();
return data;
}
void Producer_thread()
{
bool read_to_exit = false;
while (1)
{
std::unique_lock<std::mutex> lck(mtx_counter);
if (item_counter < item_total)
{
++item_counter;
produce_item(item_counter);
std::cout << "生産者線程 " << std::this_thread::get_id()
<< "生産第 " << item_counter << "個産品" << std::endl;
}
else
{
read_to_exit = true;
}
if (read_to_exit == true)
break;
}
std::cout << "Producer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}
void Consumer_thread()
{
static int cnt = 0;
while (1)
{
std::this_thread::sleep_for(t);
int item = consume_item();
std::cout << "消費者消費第" << item << "個産品" << std::endl;
if (++cnt == item_total)
break;
}
}
int main()
{
std::vector<std::thread> thread_vector;
for (int i = 0; i != 5; ++i)
{
thread_vector.push_back(std::thread(Producer_thread));// 建立消費者線程.
}
std::thread consumer(Consumer_thread); // 建立消費之線程.
for (auto &thr : thread_vector)
{
thr.join();
}
consumer.join();
}
三、多生産者-多消費者模型
該模型可以說是前面兩種模型的綜合,程式需要維護兩個計數器,分别是生産者已生産産品的數目和消費者已取走産品的數目。另外也需要保護産品庫在多個生産者和多個消費者互斥地通路。
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
static const int repository_size = 10;//循環隊列的大小
static const int item_total = 20;//要生産的産品數目
std::mutex mtx;//互斥量,保護産品緩沖區
std::mutex producer_count_mtx;
std::mutex consumer_count_mtx;
std::condition_variable repo_not_full;//條件變量訓示産品緩沖區不滿
std::condition_variable repo_not_empty;//條件變量訓示産品緩沖區不為空,就是緩沖區有産品
int item_buffer[repository_size];//産品緩沖區,這裡使用了一個循環隊列
static std::size_t read_position = 0;//消費者讀取産品的位置
static std::size_t write_position = 0;//生産者寫入産品的位置
static size_t produced_item_counter = 0;
static size_t consumed_item_counter = 0;
std::chrono::seconds t(1);//a new feature of c++ 11 standard
std::chrono::microseconds t1(1000);
void produce_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);
// item buffer is full, just wait here.
while (((write_position + 1) % repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);// 生産者等待"産品庫緩沖區不為滿"這一條件發生.
} //當緩沖區滿了之後我們就不能添加産品了
item_buffer[write_position] = i;//寫入産品
write_position++;
if (write_position == repository_size)//寫入的位置如果在隊列最後則重新設定
{
write_position = 0;
}
repo_not_empty.notify_all();//通知消費者産品庫不為空
lck.unlock();//解鎖
}
int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
// item buffer is empty, just wait here.
while (write_position == read_position)
{
std::cout << "Consumer is waiting for items..." << std::endl;
repo_not_empty.wait(lck);// 消費者等待"産品庫緩沖區不為空"這一條件發生.
}
data = item_buffer[read_position];//讀取産品
read_position++;
if (read_position >= repository_size)
{
read_position = 0;
}
repo_not_full.notify_all();//通知産品庫不滿
lck.unlock();
return data;
}
void Producer_thread()
{
bool ready_to_exit = false;
while (1)
{
//std::this_thread::sleep_for(t);
std::unique_lock<std::mutex> lock(producer_count_mtx);
if (produced_item_counter < item_total)
{
++produced_item_counter;
produce_item(produced_item_counter);
std::cout << "生産者線程 " << std::this_thread::get_id()
<< "生産第 " << produced_item_counter << "個産品" << std::endl;
}
else
{
ready_to_exit = true;
}
lock.unlock();
if (ready_to_exit == true)
{
break;
}
}
std::cout << "Producer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}
void Consumer_thread()
{
bool read_to_exit = false;
while (1)
{
std::this_thread::sleep_for(t1);
std::unique_lock<std::mutex> lck(consumer_count_mtx);
if (consumed_item_counter < item_total)
{
int item = consume_item();
++consumed_item_counter;
std::cout << "消費者線程" << std::this_thread::get_id()
<< "消費第" << item << "個産品" << std::endl;
}
else
{
read_to_exit = true;
}
if (read_to_exit == true)
{
break;
}
}
std::cout << "Consumer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}
int main()
{
std::vector<std::thread> thread_vector1;
std::vector<std::thread> thread_vector2;
for (int i = 0; i != 5; ++i)
{
thread_vector1.push_back(std::thread(Producer_thread));// 建立生産者線程.
thread_vector2.push_back(std::thread(Consumer_thread));// 建立消費者線程.
}
for (auto &thr1 : thread_vector1)
{
thr1.join();
}
for (auto &thr2 : thread_vector2)
{
thr2.join();
}
}