生産者消費者模型:是一種典型的設計模式。 他的應用場景是針對大量的資料産生與處理的場景
組成
一個場所:線程安全的資料隊列 兩種角色:生産者與消費者 三種關系
生産者與生産者關系:互斥
消費者與消費者關系:互斥
生産者與消費者關系:同步+互斥
為什麼要使用生産者消費者模型
生産者消費者模式就是通過一個容器來解決生産者和消費者的強耦合問題。生産者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,是以生産者生産完資料之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生産者要資料,而是直接從阻塞隊列裡取,阻塞隊列就相當于一個緩沖區,平衡了生産者和消費者的處理能力。這個阻塞隊列就是用來給生産者和消費者解耦的
生産者消費者模型優點
解耦合、支援忙閑不均,支援并發
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIwczX0xiRGZkRGZ0Xy9GbvNGL2EzXlpXazxSP9EkTx0EVOBnVtplb1cVWvB3MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLxgjM2AjNwkTM2ETNwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
基于阻塞隊列的生産者消費者模型
#define CAPCITY 5
class BlockQueue
{
public:
BlockQueue(int cap = CAPCITY) : _capcity(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond_cus, nullptr);
pthread_cond_init(&_cond_pro, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond_pro);
pthread_cond_destroy(&_cond_cus);
}
void push(const int &val)
{
pthread_mutex_lock(&_mutex);
//隊列滿了 等待 消費者喚醒
while (_queue.size() == CAPCITY)
pthread_cond_wait(&_cond_pro, &_mutex);
_queue.push(val);
//喚醒消費者進行消費
pthread_cond_signal(&_cond_cus);
pthread_mutex_unlock(&_mutex);
}
void pop(int *data)
{
pthread_mutex_lock(&_mutex);
while (_queue.empty())
pthread_cond_wait(&_cond_cus, &_mutex);
*data = _queue.front();
_queue.pop();
pthread_cond_signal(&_cond_pro);
pthread_mutex_unlock(&_mutex);
}
private:
queue<int> _queue; //阻塞隊列
int _capcity; //隊列容量
pthread_mutex_t _mutex;
pthread_cond_t _cond_pro; //生産者條件變量
pthread_cond_t _cond_cus; //消費者條件變量
};
void *customer(void *arg)
{
BlockQueue *q = (BlockQueue *)arg;
int data;
while (1)
{
q->pop(&data);
printf("消費者:%p消費資料%d\n", pthread_self(), data);
//usleep(1);
}
return NULL;
}
void *product(void *arg)
{
BlockQueue *q = (BlockQueue *)arg;
int data = 0;
while (1)
{
q->push(data);
printf("生産者:%p生産資料%d\n", pthread_self(), data++);
//usleep(1);
}
return NULL;
}
int main()
{
BlockQueue q;
int ret;
pthread_t cid[4], pid[4];
for (int i = 0; i < 4; ++i)
{
ret = pthread_create(&cid[i], nullptr, customer, &q);
if (ret != 0)
return -1;
ret = pthread_create(&pid[i], nullptr, product, &q);
if (ret != 0)
return -1;
}
for (int i = 0; i < 4; ++i)
{
pthread_join(cid[i], nullptr);
pthread_join(pid[i], nullptr);
}
}
基于環形隊列的生産消費模型
#define MAX_QUEUE 5
class RingQueue
{
public:
RingQueue(int cap = MAX_QUEUE)
: _capacity(cap), _pwrite(0), _pread(0), _array(cap)
{
sem_init(&_sem_data, 0, 0);
sem_init(&_sem_idle, 0, cap);
sem_init(&_sem_lock, 0, 1);
}
~RingQueue()
{
sem_destroy(&_sem_data);
sem_destroy(&_sem_idle);
sem_destroy(&_sem_lock);
}
void push(const int &val)
{
sem_wait(&_sem_idle);
//加鎖
sem_wait(&_sem_lock);
_array[_pwrite] = val;
_pwrite = (_pwrite + 1) % _capacity;
//解鎖
sem_post(&_sem_lock);
sem_post(&_sem_data);
}
void pop(int *data)
{
sem_wait(&_sem_data);
sem_wait(&_sem_lock);
*data = _array[_pread];
_pread = (_pread + 1) % _capacity;
sem_post(&_sem_lock);
sem_post(&_sem_idle);
}
private:
int _capacity; //循環隊列的容量
int _pwrite; //讀指針
int _pread; //寫指針
std::vector<int> _array;
sem_t _sem_data; //資料節點計數
sem_t _sem_idle; //空閑節點計數
sem_t _sem_lock; //用于實作互斥
};
void *product(void *arg)
{
RingQueue *q = (RingQueue *)arg;
int val = 1;
while (1)
{
q->push(val);
printf("push data: %d\n", val++);
}
return nullptr;
}
void *customer(void *arg)
{
RingQueue *q = (RingQueue *)arg;
int val;
while (1)
{
q->pop(&val);
printf("pop data: %d\n", val);
}
return nullptr;
}
int main()
{
RingQueue q;
pthread_t pid[4], cid[4];
for (int i = 0; i < 4; ++i)
{
pthread_create(&pid[i], nullptr, product, &q);
pthread_create(&cid[i], nullptr, customer, &q);
}
for (int i = 0; i < 4; ++i)
{
pthread_join(pid[0], nullptr);
pthread_join(cid[0], nullptr);
}
return 0;
}