什麼是生産者消費者模型?
生産者消費者模型就是通過一個容器來解決生産者和消費者的強耦合問題。生産者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,是以生産者生産完資料之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生産者要資料,而是直接從阻塞隊列裡去,阻塞隊列就相當于一個緩沖區,平衡了生産者和消費者之間的處理能力。這個阻塞隊列就是用來給生産者和消費者解耦的。
在多線程程式設計中阻塞隊列(Blocking Queue)是一種常用于實作生産者和消費者模型的資料結構。它與普通隊列的差別在于,當隊列為空時,從隊列取元素的操作将會被阻塞,直到隊列中被放入了元素;當隊列滿時,往隊列中放元素的操作也會被阻塞,直到有元素被從隊列中取出(以上操作都是基于不同的線程來說的,線程在對阻塞隊列程序操作時會被阻塞)。
生産者消費者模型優點:
- 解耦合。
- 支援忙閑不均。
- 支援并發。
簡記:
一個場所:阻塞隊列。
兩種角色:生産者和消費者。
三種關系:生産者與生産者之間應具有互斥關系、消費者與消費者之間應具有互斥關系、生産者與消費者之間應具有同步與互斥關系。
生産者與消費者模型的模拟實作
使用互斥量和條件變量模拟實作:
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
using std::cout;
using std::endl;
// 線程數量
const int n = 4;
// 共享變量
int resources = 1;
// 阻塞隊列
class BlockQueue{
public:
// 構造函數
BlockQueue(int cap = 10)
: _capacity(cap)
{
// 初始化互斥量
pthread_mutex_init(&_mutex, NULL);
// 初始化條件變量
pthread_cond_init(&_cond_pro, NULL);
pthread_cond_init(&_cond_con, NULL);
}
// 析構函數
~BlockQueue(){
// 銷毀互斥量
pthread_mutex_destroy(&_mutex);
// 銷毀條件變量
pthread_cond_destroy(&_cond_pro);
pthread_cond_destroy(&_cond_con);
}
public:
// 入隊
bool BlockQueuePush(int data){
// 上鎖
BlockQueueLock();
// 隊列滿
while(BlockQueueFull()){
// 等待
ProductorWait();
}
// 隊列不為滿,入隊
_queue.push(data);
// 喚醒消費者
ConsumerWakeUp();
// 解鎖
BlockQueueUnlock();
return true;
}
// 出隊
bool BlockQueuePop(int* data){
// 上鎖
BlockQueueLock();
// 隊列為空
while(BlockQueueEmpty()){
// 消費者等待
ConsumerWait();
}
// 隊列不為空,拿取隊中資料
*data = _queue.front();
_queue.pop();
// 喚醒生産者
ProductorWakeUp();
// 解鎖
BlockQueueUnlock();
return true;
}
private:
// 阻塞隊列上鎖
void BlockQueueLock(){
pthread_mutex_lock(&_mutex);
}
// 阻塞隊列解鎖
void BlockQueueUnlock(){
pthread_mutex_unlock(&_mutex);
}
// 阻塞隊列是否為空
bool BlockQueueEmpty(){
return _queue.empty();
}
// 阻塞隊列是否滿
bool BlockQueueFull(){
return _queue.size() == _capacity;
}
// 生産者等待
void ProductorWait(){
pthread_cond_wait(&_cond_pro, &_mutex);
}
// 消費者等待
void ConsumerWait(){
pthread_cond_wait(&_cond_con, &_mutex);
}
// 喚醒生産者
void ProductorWakeUp(){
pthread_cond_signal(&_cond_pro);
}
// 喚醒消費者
void ConsumerWakeUp(){
pthread_cond_signal(&_cond_con);
}
private:
// 隊列
std::queue<int> _queue;
// 容量
size_t _capacity;
// 互斥鎖
pthread_mutex_t _mutex;
// 條件變量:生産者
pthread_cond_t _cond_pro;
// 條件變量:消費者
pthread_cond_t _cond_con;
};
// 生産者
void* thr_productor(void* arg){
BlockQueue* bq = (BlockQueue*)arg;
while(1){
cout << "productor put resources: " << resources << endl;
// 生産者生産資料放入阻塞隊列
bq->BlockQueuePush(resources++);
sleep(1);
}
pthread_exit(0);
}
void* thr_consumer(void* arg){
BlockQueue* bq = (BlockQueue*)arg;
while(1){
int data;
// 消費者消費阻塞隊列中的資料
bq->BlockQueuePop(&data);
cout << "consumer get data: " << data << endl;
sleep(1);
}
}
int main(){
pthread_t ptid[n], ctid[n];
BlockQueue bq;
int i, ret;
// 線程建立
for(i = 0; i < n; ++i){
ret = pthread_create(&ptid[i], NULL, thr_productor, (void*)&bq);
if(ret != 0){
// 線程建立失敗
cout << "productor thread " << i << " create failed!\n";
}
ret = pthread_create(&ctid[i], NULL, thr_consumer, (void*)&bq);
if(ret != 0){
// 線程建立失敗
cout << "consumer thread " << i << " create failed!\n";
}
}
// 線程等待
for(i = 0; i < n; ++i){
pthread_join(ptid[i], NULL);
pthread_join(ctid[i], NULL);
}
pthread_exit(0);
}
編譯運作程式,結果如下:
使用POSIX信号量模拟實作:
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,達到無沖突的通路共享資源目的。但POSIX信号量可以用于線程間同步。
簡單了解一下什麼是信号量:
- 信号量是一個非負整數,所有通過它的線程/程序都會将該整數減一(通過它當然是為了使用資源),當該整數值為零時,所有試圖通過它的線程都将處于等待狀态。
- 在信号量上我們定義兩種操作: Wait(等待) 和 Release(釋放)。
- 當一個線程調用Wait操作時,它要麼得到資源然後将信号量減一,要麼一直等下去(指放入阻塞隊列),直到信号量大于等于一時。
-
Release(釋放)實際上是在信号量上執行加操作,該操作之是以叫做“釋放”是因為釋放了由信号量守護的資源。
接口介紹:
定義信号變量。
頭檔案:semaphore.h
sem_t sem;
頭檔案:semaphore.h
功能:初始化信号量。
int sem_init(sem_t *sem, int pshared, unsigned int value);
參數:
sem:信号變量。
pshared:選項标志,決定信号量用于程序間同步互斥還是線程間的同步互斥。
0:線程間。
!0:程序間。
value:信号量初始計數。
傳回值:成功傳回0,失敗傳回-1,errno被設定。
頭檔案:semaphore.h
功能:計數-1并等待,阻塞操作;計數<=0,阻塞。
int sem_wait(sem_t *sem);
參數:
sem:信号變量。
傳回值:成功傳回0,失敗傳回-1,errno被設定。
頭檔案:semaphore.h
功能:計數-1并等待,非阻塞操作;計數<=0,報錯傳回。
int sem_trywait(sem_t *sem);
參數:
sem:信号變量。
傳回值:成功傳回0,失敗傳回-1,errno被設定。
頭檔案:semaphore.h
功能:計數-1并等待,限時等待,計數<=0,計時報錯傳回。
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
參數:
sem:信号變量。
abs_timeout:等待時間。
傳回值:成功傳回0,失敗傳回-1,errno被設定。
頭檔案:semaphore.h
功能:計數+1并喚醒等待。
int sem_post(sem_t *sem);
參數:
sem:信号變量。
傳回值:成功傳回0,失敗傳回-1,errno被設定。
頭檔案:semaphore.h
功能:銷毀信号量。
int sem_destroy(sem_t *sem);
參數:
sem:信号變量。
傳回值:成功傳回0,失敗傳回-1,errno被設定。
代碼示範:
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <unistd.h>
using std::cout;
using std::endl;
// 線程數量
const int n = 4;
// 資源
int resources = 1;
class CircleBlockQueue{
public:
// 構造函數
CircleBlockQueue(int cap = 10)
: _circle_queue(cap)
, _capacity(cap)
, _pro_idx(0)
, _con_idx(0)
{
// 信号量初始化
sem_init(&_sem_data, 0, 0);
sem_init(&_sem_idle, 0, cap);
sem_init(&_sem_lock, 0, 1);
}
// 析構函數
~CircleBlockQueue(){
// 銷毀信号量
sem_destroy(&_sem_data);
sem_destroy(&_sem_idle);
sem_destroy(&_sem_lock);
}
// 入隊
bool CircleBlockQueuePush(int data){
// 生産者等待
ProductorWait();
// 上鎖
CircleBlockQueueLock();
// 資料入隊
_circle_queue[_pro_idx] = data;
_pro_idx = (_pro_idx + 1) % _capacity;
// 解鎖
CircleBlockQueueUnlock();
// 消費者喚醒
ConsumerWakeUp();
return true;
}
// 出隊
bool CircleBlockQueuePop(int* data){
// 消費者等待
ConsumerWait();
// 上鎖
CircleBlockQueueLock();
*data = _circle_queue[_con_idx];
_con_idx = (_con_idx + 1) % _capacity;
CircleBlockQueueUnlock();
ProductorWakeUp();
return true;
}
private:
// 上鎖
void CircleBlockQueueLock(){
// 等待信号量
sem_wait(&_sem_lock);
}
// 解鎖
void CircleBlockQueueUnlock(){
// 釋出信号量
sem_post(&_sem_lock);
}
// 生産者等待
void ProductorWait(){
// 等待信号量
sem_wait(&_sem_idle);
}
// 生産者喚醒
void ProductorWakeUp(){
// 釋出信号量
sem_post(&_sem_idle);
}
// 消費者等待
void ConsumerWait(){
// 等待信号量
sem_wait(&_sem_data);
}
// 消費者喚醒
void ConsumerWakeUp(){
// 釋出信号量
sem_post(&_sem_data);
}
private:
// 隊列
std::vector<int> _circle_queue;
// 最大容量
int _capacity;
// 生産者下标
int _pro_idx;
// 消費者下标
int _con_idx;
// 目前數量
sem_t _sem_data;
// 閑置數量
sem_t _sem_idle;
// 鎖
sem_t _sem_lock;
};
// 生産者
void* thr_productor(void* arg){
CircleBlockQueue* cbq = (CircleBlockQueue*)arg;
while(1){
// 生産資料放入循環阻塞隊列
cbq->CircleBlockQueuePush(resources);
cout << "productor put data: " << resources << endl;
++resources;
sleep(1);
}
pthread_exit(0);
}
// 消費者
void* thr_consumer(void* arg){
CircleBlockQueue* cbq = (CircleBlockQueue*)arg;
while(1){
int data;
// 消費者消費循環阻塞隊列中的資料
cbq->CircleBlockQueuePop(&data);
cout << "consumer get data: " << data << endl;
sleep(1);
}
}
int main(){
pthread_t ptid[n], ctid[n];
CircleBlockQueue cbq;
int i, ret;
// 線程建立
for(i = 0; i < n; ++i){
ret = pthread_create(&ptid[i], NULL, thr_productor, (void*)&cbq);
if(ret != 0){
cout << "productor thread " << i << " create failed!\n";
return -1;
}
ret = pthread_create(&ctid[i], NULL, thr_consumer, (void*)&cbq);
if(ret != 0){
cout << "consumer thread " << i << " create failed!\n";
return -1;
}
}
// 線程等待
for(i = 0; i < n; ++i){
pthread_join(ptid[i], NULL);
pthread_join(ctid[i], NULL);
}
pthread_exit(0);
}
編譯運作,結果如下: