天天看點

利用多線程和信号量,互斥量實作的經典的生産者與消費者模型

多線程并發應用程式有一個經典的模型,即生産者/消費者模型。系統中,産生消息的是生産者,處理消息的是消費者,消費者和生産者通過一個緩沖區進行消息傳遞。生産者産生消息後送出到緩沖區,然後通知消費者可以從中取出消息進行處理。消費者處理完資訊後,通知生産者可以繼續提供消息。
           
要實作這個模型,關鍵在于消費者和生産者這兩個線程進行同步。也就是說:隻有緩沖區中有消息時,消費者才能夠提取消息;隻有消息已被處理,生産者才能産生消息送出到緩沖區。
           
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <time.h>
#include <string.h>

// 信号量和緩沖區
struct data
{
	sem_t empty;  // 用來控制生産者,隻有緩沖區為空,生産者才可以生産消息
	sem_t full;   // 用來控制消費者,隻有緩沖區有資料,才可以消費
	char buf[32]; // 消息緩沖區
};

struct data msg;

// 生産者線程工作函數
void *Produce(void *v)
{
	char *buf[] = {"蘋果", "梨", "香蕉", "榴蓮", "橙子", "西瓜", "芒果", "火龍果"};
	
	while (1)
	{
		// 隻有當緩沖區空才能進,生産消息
		sem_wait(&msg.empty);
		
		strcpy(msg.buf, buf[rand()%8]);
		printf ("放了一個水果: %s\n", msg.buf);

		int time = rand() % 100 + 1;
		usleep(time*10000);		
		
		// 生産完了,通知消費者進行消費
		sem_post(&msg.full);
	}
}

// 消費者線程工作函數
void *Consum(void *v)
{
	char  buf[32];
	while (1)
	{
		// 隻有當緩沖區不為空才能進,消費消息
		sem_wait(&msg.full);
		
		strcpy(buf, msg.buf);
		printf ("吃了一個  %s\n", buf);
		
		int time = rand() % 100 + 1;
		usleep(time*10000);
		
		// 消費完了,通知生産則會進行生産
		sem_post(&msg.empty);
	}
}

int main()
{
	srand ((unsigned int)time(NULL));
	
	// 初始化信号量
	sem_init(&msg.empty, 0, 1);  // 生産者,一開始要生産消息
	sem_init(&msg.full, 0, 0);   // 消費者,一開始要不能消費消息

	pthread_t produceId;
	pthread_t consumId;
	// 建立生産者線程
	pthread_create(&produceId, NULL, Produce, NULL);
	
	// 建立消費者線程
	pthread_create(&consumId, NULL, Consum, NULL);
	
	// 等待線程結束
	pthread_join(produceId, NULL);
	pthread_join(consumId, NULL);
	
	// 銷毀信号量
	sem_destroy(&msg.empty);
	sem_destroy(&msg.full);
	return 0;
}
           
上述消費者/生産者模型比較簡單,緩沖區中隻能容納一條消息。生産者每送出一條消息到緩沖區中,就會通知消費者,等消費者取走消息之後才能送出下一條消息。同樣,消費者也必須等待生産者送出一條消息後才能進行處理。這種設計的效率是比較低下的。
           
如果将緩沖區設計為一個先進先出的隊列,可以同時容納多條消息,那麼隻要緩沖區不滿,生産者就可以送出消息;同時,隻要緩沖區不空,消費者就可以取出消息進行處理。這将大大提高整個程式的效率。
           
實作時,可以利用信号量計數的特性,用信号量的值表示緩沖區中消息的個數及空閑空間的個數。但這時由于生産者和消費者可能同時通路緩沖區,故需要再用一個互斥量來進行保護。
           
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <time.h>
#include <string.h>
#include "SqQueue.h"

// 信号量和緩沖區
struct data
{
	sem_t empty;  // 用來控制生産者,隻有緩沖區為空,生産者才可以生産消息
	sem_t full;   // 用來控制消費者,隻有緩沖區有資料,才可以消費
	Queue q;      // 緩沖區隊列
};

struct data msg;
// 互斥鎖
pthread_mutex_t mutex;

int num = 0;

// 生産者線程工作函數
void *Produce(void *v)
{	
	while (1)
	{
		int time = rand() % 100 + 1;
		usleep(time*10000);

		// 隻要隊列不滿 就能生産消息, empty代表目前隊列剩餘的空間
		sem_wait(&msg.empty);
		pthread_mutex_lock(&mutex);    // 搶鎖
		
		num++;   // 生産一個消息
  		
		// 将消息放入到隊列裡面
		EnQueue (&(msg.q), num);
		printf ("生産一條消息\n");
		
		pthread_mutex_unlock(&mutex);  // 解鎖
		// 生産完了,通知消費者進行消費
		sem_post(&msg.full);
	}
}

// 消費者線程工作函數
void *Consum(void *v)
{
	char  buf[32];
	while (1)
	{
		int time = rand() % 100 + 1;
		usleep(time*10000);
		
		// 隻有緩沖區有資料,就能消費消息, full目前隊列消息的個數
		sem_wait(&msg.full);
		pthread_mutex_lock(&mutex);    // 搶鎖
		
		int num;
		DeQueue(&(msg.q), &num);   // 去隊列裡取出一條消息
		printf("消費了一條消息: %d\n", num);
		
		pthread_mutex_unlock(&mutex);  // 解鎖
		// 消費完了,通知生産則會進行生産
		sem_post(&msg.empty);
	}
}

int main()
{
	srand ((unsigned int)time(NULL));
	
	// 初始化信号量
	sem_init(&msg.empty, 0, 10); // 生産者,一開始要生産 10 條消息
	sem_init(&msg.full, 0, 0);   // 消費者,一開始要不能消費消息
	
	// 初始化互斥鎖
	pthread_mutex_init(&mutex, NULL);
	
	// 初始化隊列
	InitQueue(&(msg.q));

	pthread_t produceId;
	pthread_t consumId;
	
	int i = 0;
	for (i = 0; i < 5; i++)
	{
		// 建立生産者線程
		pthread_create(&produceId, NULL, Produce, NULL);
		pthread_detach(produceId);
	}
	
	// 建立消費者線程
	pthread_create(&consumId, NULL, Consum, NULL);
	
	// 等待線程結束
	pthread_join(consumId, NULL);
	
	// 銷毀信号量
	sem_destroy(&msg.empty);
	sem_destroy(&msg.full);
	
	// 銷毀互斥鎖
	pthread_mutex_destroy(&mutex);
	return 0;
}
           


繼續閱讀