天天看点

生产者消费者模型(condition_variable)

作者:悠然醉清风
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <WinSock2.h>
#include <iostream>
#include <process.h>
using namespace std;
#pragma comment(lib,"ws2_32.lib")
enum {PORT=0x8888};
void recvProc(void* p)
{
SOCKET* pSock = (SOCKET*)p;
char s[256];
int n = 0;
while ((n = recv(*pSock, s, sizeof(s), 0)) > 0)
{
if (n < sizeof(s))
{
s[n] = 0;
cout << "socka=" << *pSock << ":" <<s<< endl;
}
}
cout << "socka=" << *pSock << " 的客户关闭了连接" << endl;
delete pSock;
}
int main()
{
WSADATA wd;
if (WSAStartup(0x0202, &wd))
return -1;
SOCKET sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
if (sock == INVALID_SOCKET)
{
cout << "socket() error" << WSAGetLastError() << endl;
return -1;
}
cout << "侦听sock=" << sock << endl;
SOCKADDR_IN sa = { AF_INET,htons(PORT) };
int n = bind(sock, (sockaddr*)&sa, sizeof(sa));
if (n)
{
cout << "bind error" << WSAGetLastError() << endl;
return -1;
}
n = listen(sock, 5);
while (true)
{
SOCKET socka = accept(sock, nullptr,nullptr);
cout << "socka=" << socka << "为您服务" << endl;
SOCKET* pSocka = new SOCKET;
*pSocka = socka;
_beginthread(recvProc, 0, pSocka);
}
return 0;
}
           

第二版本:

#include <iostream>
#include <mutex>
#include <thread>
#include <functional>
#include <list>
#include <condition_variable>
using namespace std;
class SyncQueue
{
public:
SyncQueue(int maxSize) :m_maxSize(maxSize) {}
void put(const int& x)
{
lock_guard<mutex> locker(m_mutex);
m_notFull.wait(m_mutex, [this]() {
return m_queue.size() != m_maxSize;
});
m_queue.push_back(x);
cout << x << "被生产了" << endl;
m_notEmpty.notify_one();
}
int take()
{
lock_guard<mutex> locker(m_mutex);
m_notEmpty.wait(m_mutex, [this]() {
return !m_queue.empty();
});
int x = m_queue.front();
m_queue.pop_front();
cout << x << "被消费了" << endl;
m_notFull.notify_all();
return x;
}
private:
list<int> m_queue;
int m_maxSize;
condition_variable_any m_notEmpty;
condition_variable_any m_notFull;
mutex m_mutex;
};
int main()
{
SyncQueue task(50);
auto produce = bind(&SyncQueue::put, &task, placeholders::_1);
auto consume = bind(&SyncQueue::take, &task);
thread t1[3];
thread t2[3];
for (int i = 0; i < 3; i++)
{
t1[i] = thread(produce, i + 50);
t2[i] = thread(consume);
}
for (int i = 0; i < 3; i++)
{
t1[i].join();
t2[i].join();
}
return 0;
}           

总结condition_variable 配合 unique_lock 使用更灵活一些,可以在任何时候自由地释放互斥锁,而 condition_variable_any 如果和 lock_guard 一起使用必须要等到其生命周期结束才能将互斥锁释放。但是,condition_variable_any 可以和多种互斥锁配合使用,应用场景也更广,而 condition_variable 只能和独占的非递归互斥锁(mutex)配合使用,有一定的局限性。

继续阅读