天天看点

设计模式:生产者-消费者模式

  • 引:生产者-消费者模式是一个十分经典的多线程并发协作的模式,严格地讲并不属于设计模式的范畴,但该程序设计思想同样为程序设计提供了很好的思路,所以就加在设计模式版块了。弄懂生产者-消费者问题能够让我们对并发编程的理解加深。所谓生产者-消费者问题,实际上主要是包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。但是,这个共享数据区域中应该具备这样的线程间并发协作的功能:
    • 如果共享数据区已满的话,阻塞生产者继续生产数据放置入内;
    • 如果共享数据区为空的话,阻塞消费者继续消费数据;
  • 概要:
    • 该模式主要是是多线程并发中条件变量(cv)相关的内容,相关知识可见之前的总结。
    • 代码中涉及到了c++11的一些新特性:右值引用、转发型引用、std::move和std::forward以及typename关键字的另一种用法,相关内容较多,还需慢慢消化。
  • 下面通过实例,说明一个简易的生产者-消费者模式
//sync_queue.h

#include<list>
#include<thread>
#include<mutex>
#include<condition_variable>
#include<system_error>
#include<iostream>

template<typename T>
class SyncQueue
{
public:
	SyncQueue();
	~SyncQueue();

	void put(const T& val);
	void put(T&& val);
	void get(T& val);
private:
	std::condition_variable m_cond;
	std::mutex m_mutex;
	std::list<T> m_q;

};

template<typename T>
SyncQueue<T>::SyncQueue() {}

template<typename T>
SyncQueue<T>::~SyncQueue() {}

template<typename T>
void SyncQueue<T>::get(T& val)
{
	std::unique_lock<std::mutex> lck(m_mutex);
	
	//体会下:下面使用while不使用if是防止虚假唤醒:
	//假设是在等待消费队列,一个线程A被nodify,
	//但是还没有获得锁时,另一个线程B获得了锁,并消费掉了队列中的数据。
	//B退出或wait后,A获得了锁,而这时条件已不满足。
	//对于下面情况,虚假唤醒后在A线程再次通过while内参数判断,如果为空则继续等待。如果用if则A线程会向下执行,输出超时(实际不是超时)
	while (m_q.empty())
	{
		if (m_cond.wait_for(lck, std::chrono::milliseconds{ 2000 }) == std::cv_status::timeout)
		{
			break;
		}
	}
	if (m_q.empty())
	{
		throw "超时";
	}
	else
	{
		val = m_q.front();
		m_q.pop_front();
		std::cout << "取出一个元素" << std::endl;
	}
}

template<typename T>
void SyncQueue<T>:: put(const T& val)
{
	{
		std::lock_guard<std::mutex> lck(m_mutex);
		m_q.push_back(val);
	}
	m_cond.notify_one();
}

template<typename T>
void SyncQueue<T>::put(T&& val)
{
	std::lock_guard<std::mutex> lck(m_mutex);
	m_q.push_back(std::forward<T>(val));
	m_cond.notify_one();
}
           
//main.cpp

#include "sync_queue.h"


SyncQueue<int> g_queue;

void produce()
{
	static int i = 0;
	while (true)
	{
		g_queue.put(++i);
		std::this_thread::sleep_for(std::chrono::milliseconds(1000));

	}
}

void consumer()
{
	while (true)
	{
		try
		{
			int val;
			g_queue.get(val);
			std::cout << "获取" << val << std::endl;
		}
		catch (const char* e)
		{
			std::cout << e << std::endl;
		}
	}
}

int main()
{
	std::thread t_c{consumer};
	std::thread t_p{ produce };

	t_c.join();
	t_p.join();
	return 0;
}

// 在多个消费者的情况 put 函数还可以进行一些改进 

// if(m_q.size()>n)
// {m_cond.notify_all();}
// else
// {m_cond.notify_one();} 
           

继续阅读