天天看点

生产者和消费者多线程同步,C++实现

首先来简化问题,先假设生产者和消费者都只有一个,且缓冲区也只有一个。

  第一.从缓冲区取出产品和向缓冲区投放产品必须是互斥进行的。可以用关键段和互斥量来完成。

    第二.生产者要等待缓冲区为空,这样才可以投放产品,消费者要等待缓冲区不为空,这样才可以取出产品进行消费。并且由于有二个等待过程,所以要用二个事件或信号量来控制。

#pragma once
#include<Windows.h>
#include<process.h>
#include<deque>
#include<iostream>
using namespace std;


int total = 10;
int t = 0;
int cons = 0;
deque<int> dq;
HANDLE g_EventProd;//生产者事件
HANDLE g_EventCons;//消费者事件
CRITICAL_SECTION g_cs;
const int num = 2;
HANDLE th[num];

unsigned int __stdcall ProdFun(LPVOID pm)
{
	while(t < total)
	{
		WaitForSingleObject(g_EventProd,INFINITE);
		Sleep(50);

		EnterCriticalSection(&g_cs);		
		cout<<"编号为"<<GetCurrentThreadId()<<"的生产者,入队数为"<<t<<endl;
		dq.push_back(t++);
		LeaveCriticalSection(&g_cs);

		SetEvent(g_EventCons);
		Sleep(50);
	}
	cout<<"编号为"<<GetCurrentThreadId()<<"的生产者结束"<<endl;
	return 0;
}

unsigned int __stdcall ConsFun(LPVOID pm)
{
	bool flag = false;
	while(!flag)
	{
		WaitForSingleObject(g_EventCons,INFINITE);
		Sleep(50);

		EnterCriticalSection(&g_cs);
		cout<<"编号为"<<GetCurrentThreadId()<<"的消费者,出队数为"<<dq.front()<<endl;
		dq.pop_front();
		cons++;
		if(cons == total)
			flag = true;
		LeaveCriticalSection(&g_cs);

		SetEvent(g_EventProd);
		Sleep(50);
	}
	cout<<"编号为"<<GetCurrentThreadId()<<"的消费者结束"<<endl;
	Sleep(10);
	return 0;
}

void PACone()
{
	g_EventProd = CreateEvent(NULL,FALSE,TRUE,NULL);//自动置位,生产者事件设为有初试状态,直接调度 
	g_EventCons = CreateEvent(NULL,FALSE,FALSE,NULL);
	InitializeCriticalSection(&g_cs);

	th[0] = (HANDLE)_beginthreadex(NULL,0,&ProdFun,NULL,0,NULL);
	th[1] = (HANDLE)_beginthreadex(NULL,0,&ConsFun,NULL,0,NULL);

	WaitForMultipleObjects(num,th,TRUE,INFINITE);
	CloseHandle(g_EventProd);
	CloseHandle(g_EventCons);
	DeleteCriticalSection(&g_cs);
}
           

结果如下:

生产者和消费者多线程同步,C++实现

现在考虑有N个生产者线程和N个消费者线程同时执行。显然线程内需要用临界区CS来保证互斥,而线程之间的同步则只能用信号量来保证,首先让生成者线程有一定数目的信号量,这样生产者线程waitforsingleobject则可以直接减1后继续运行,产生一个数后,将消费者的信号量加1;同理,每次消费者输出数后要将生成者的信号量加1,另外考虑最后一个数输出时,此时生产者线程已结束,而可能还有更多的消费者线程在等待信号量大于0,因而在最后一个数输出后需要对消费者线程的信号量加1,这样就能保证所有消费者线程的顺利结束。

代码如下:

#pragma once
#include<Windows.h>
#include<assert.h>
#include<process.h>
#include<deque>
#include<iostream>
using namespace std;


int total = 25;//产品总数
int p = 0;//生产者已生产的个数
int c = 0;//消费者已消费的个数
deque<int> dq;//产品队列
HANDLE g_SemaProd;//生产者信号量
HANDLE g_SemaCons;//消费者信号量
CRITICAL_SECTION g_cs;//临界区变量
const int tnum = 19;//线程总数
const int pnum = 7;//生产者线程总数
HANDLE th[tnum];

//设置控制台输出颜色  
BOOL SetConsoleColor(WORD wAttributes)  
{  
    HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE);  
    if (hConsole == INVALID_HANDLE_VALUE)  
        return FALSE;     
    return SetConsoleTextAttribute(hConsole, wAttributes);  
}

unsigned int __stdcall ProdFun(LPVOID pm)
{
	while(p < total)
	{
		WaitForSingleObject(g_SemaProd,INFINITE);//检查生产者信号量是否大于0,否则一直等待
		Sleep(50);

		EnterCriticalSection(&g_cs);
		if(p < total)
		{
			dq.push_back(p);
			cout<<"编号为"<<GetCurrentThreadId()<<"的生产者,入队数为"<<p++<<endl;
		}
		else
		{
			ReleaseSemaphore(g_SemaProd,1,NULL);//在有多个生产者线程时,也要考虑最后一个数有多个线程已经在while循环里面等待,
			//因而也要使生产者信号量加1,让生产者线程结束
		}
		LeaveCriticalSection(&g_cs);

		ReleaseSemaphore(g_SemaCons,1,NULL);//消费者信号量加1
		Sleep(10);
	}
	cout<<"编号为"<<GetCurrentThreadId()<<"的生产者结束"<<endl;
	return 0;
}

unsigned int __stdcall ConsFun(LPVOID pm)
{
	while(c < total)
	{
		WaitForSingleObject(g_SemaCons,INFINITE);//检查消费者信号是否大于0,否则一直等待
		Sleep(50);

		EnterCriticalSection(&g_cs);
		if(!dq.empty())//在最后一个输出时,可能多个消费者线程都在等待,因而要判断是否产品队列为空
		{
			cout<<"编号为"<<GetCurrentThreadId()<<"的消费者,出队数为"<<dq.front()<<endl;
			dq.pop_front();
			c++;
		}
		if(c == total)//结束标志,已消费完,通知其他消费者线程,消费者信号量加1
		{
			ReleaseSemaphore(g_SemaCons,1,NULL);//不写此处则只有一个消费者线程顺利结束,其余消费者线程陷入无限等待
			LeaveCriticalSection(&g_cs);
			break;
		}
		LeaveCriticalSection(&g_cs);
		ReleaseSemaphore(g_SemaProd,1,NULL);//生成者信号量加1
		Sleep(10);
	}
	
	cout<<"编号为"<<GetCurrentThreadId()<<"的消费者结束"<<endl;
	return 0;
}

void PACN()
{
	assert(pnum < tnum);
	cout<<"-----------生产者和消费者进程同步---------"<<endl;
	cout<<"--------------作者:don_lvsml--------------"<<endl;
	cout<<"生产者线程数:"<<pnum<<";消费者线程数为:"<<(tnum - pnum)<<endl;
	g_SemaProd = CreateSemaphore(NULL,2,2,NULL);//生产者信号量,初始信号量为4,调用WaitForSingleObject不会等待,直接减1后运行
	g_SemaCons = CreateSemaphore(NULL,0,2,NULL);//消费者信号量,
	InitializeCriticalSection(&g_cs);

	for(int i = 0;i < pnum;i++)
		th[i] = (HANDLE)_beginthreadex(NULL,0,ProdFun,NULL,0,NULL);

	for(int i = pnum;i<tnum;i++)
		th[i] = (HANDLE)_beginthreadex(NULL,0,ConsFun,NULL,0,NULL);
	WaitForMultipleObjects(tnum,th,TRUE,INFINITE);

	CloseHandle(g_SemaProd);
	CloseHandle(g_SemaCons);
	DeleteCriticalSection(&g_cs);
}
           

结果为:

生产者和消费者多线程同步,C++实现
生产者和消费者多线程同步,C++实现
生产者和消费者多线程同步,C++实现
生产者和消费者多线程同步,C++实现

继续阅读