天天看點

生産者消費者模型(condition_variable)

作者:悠然醉清風
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <WinSock2.h>
#include <iostream>
#include <process.h>
using namespace std;
#pragma comment(lib,"ws2_32.lib")
enum {PORT=0x8888};
void recvProc(void* p)
{
SOCKET* pSock = (SOCKET*)p;
char s[256];
int n = 0;
while ((n = recv(*pSock, s, sizeof(s), 0)) > 0)
{
if (n < sizeof(s))
{
s[n] = 0;
cout << "socka=" << *pSock << ":" <<s<< endl;
}
}
cout << "socka=" << *pSock << " 的客戶關閉了連接配接" << endl;
delete pSock;
}
int main()
{
WSADATA wd;
if (WSAStartup(0x0202, &wd))
return -1;
SOCKET sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
if (sock == INVALID_SOCKET)
{
cout << "socket() error" << WSAGetLastError() << endl;
return -1;
}
cout << "偵聽sock=" << sock << endl;
SOCKADDR_IN sa = { AF_INET,htons(PORT) };
int n = bind(sock, (sockaddr*)&sa, sizeof(sa));
if (n)
{
cout << "bind error" << WSAGetLastError() << endl;
return -1;
}
n = listen(sock, 5);
while (true)
{
SOCKET socka = accept(sock, nullptr,nullptr);
cout << "socka=" << socka << "為您服務" << endl;
SOCKET* pSocka = new SOCKET;
*pSocka = socka;
_beginthread(recvProc, 0, pSocka);
}
return 0;
}
           

第二版本:

#include <iostream>
#include <mutex>
#include <thread>
#include <functional>
#include <list>
#include <condition_variable>
using namespace std;
class SyncQueue
{
public:
SyncQueue(int maxSize) :m_maxSize(maxSize) {}
void put(const int& x)
{
lock_guard<mutex> locker(m_mutex);
m_notFull.wait(m_mutex, [this]() {
return m_queue.size() != m_maxSize;
});
m_queue.push_back(x);
cout << x << "被生産了" << endl;
m_notEmpty.notify_one();
}
int take()
{
lock_guard<mutex> locker(m_mutex);
m_notEmpty.wait(m_mutex, [this]() {
return !m_queue.empty();
});
int x = m_queue.front();
m_queue.pop_front();
cout << x << "被消費了" << endl;
m_notFull.notify_all();
return x;
}
private:
list<int> m_queue;
int m_maxSize;
condition_variable_any m_notEmpty;
condition_variable_any m_notFull;
mutex m_mutex;
};
int main()
{
SyncQueue task(50);
auto produce = bind(&SyncQueue::put, &task, placeholders::_1);
auto consume = bind(&SyncQueue::take, &task);
thread t1[3];
thread t2[3];
for (int i = 0; i < 3; i++)
{
t1[i] = thread(produce, i + 50);
t2[i] = thread(consume);
}
for (int i = 0; i < 3; i++)
{
t1[i].join();
t2[i].join();
}
return 0;
}           

總結condition_variable 配合 unique_lock 使用更靈活一些,可以在任何時候自由地釋放互斥鎖,而 condition_variable_any 如果和 lock_guard 一起使用必須要等到其生命周期結束才能将互斥鎖釋放。但是,condition_variable_any 可以和多種互斥鎖配合使用,應用場景也更廣,而 condition_variable 隻能和獨占的非遞歸互斥鎖(mutex)配合使用,有一定的局限性。

繼續閱讀