多線程并發應用程式有一個經典的模型,即生産者/消費者模型。系統中,産生消息的是生産者,處理消息的是消費者,消費者和生産者通過一個緩沖區進行消息傳遞。生産者産生消息後送出到緩沖區,然後通知消費者可以從中取出消息進行處理。消費者處理完資訊後,通知生産者可以繼續提供消息。
要實作這個模型,關鍵在于消費者和生産者這兩個線程進行同步。也就是說:隻有緩沖區中有消息時,消費者才能夠提取消息;隻有消息已被處理,生産者才能産生消息送出到緩沖區。
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <time.h>
#include <string.h>
// 信号量和緩沖區
struct data
{
sem_t empty; // 用來控制生産者,隻有緩沖區為空,生産者才可以生産消息
sem_t full; // 用來控制消費者,隻有緩沖區有資料,才可以消費
char buf[32]; // 消息緩沖區
};
struct data msg;
// 生産者線程工作函數
void *Produce(void *v)
{
char *buf[] = {"蘋果", "梨", "香蕉", "榴蓮", "橙子", "西瓜", "芒果", "火龍果"};
while (1)
{
// 隻有當緩沖區空才能進,生産消息
sem_wait(&msg.empty);
strcpy(msg.buf, buf[rand()%8]);
printf ("放了一個水果: %s\n", msg.buf);
int time = rand() % 100 + 1;
usleep(time*10000);
// 生産完了,通知消費者進行消費
sem_post(&msg.full);
}
}
// 消費者線程工作函數
void *Consum(void *v)
{
char buf[32];
while (1)
{
// 隻有當緩沖區不為空才能進,消費消息
sem_wait(&msg.full);
strcpy(buf, msg.buf);
printf ("吃了一個 %s\n", buf);
int time = rand() % 100 + 1;
usleep(time*10000);
// 消費完了,通知生産則會進行生産
sem_post(&msg.empty);
}
}
int main()
{
srand ((unsigned int)time(NULL));
// 初始化信号量
sem_init(&msg.empty, 0, 1); // 生産者,一開始要生産消息
sem_init(&msg.full, 0, 0); // 消費者,一開始要不能消費消息
pthread_t produceId;
pthread_t consumId;
// 建立生産者線程
pthread_create(&produceId, NULL, Produce, NULL);
// 建立消費者線程
pthread_create(&consumId, NULL, Consum, NULL);
// 等待線程結束
pthread_join(produceId, NULL);
pthread_join(consumId, NULL);
// 銷毀信号量
sem_destroy(&msg.empty);
sem_destroy(&msg.full);
return 0;
}
上述消費者/生産者模型比較簡單,緩沖區中隻能容納一條消息。生産者每送出一條消息到緩沖區中,就會通知消費者,等消費者取走消息之後才能送出下一條消息。同樣,消費者也必須等待生産者送出一條消息後才能進行處理。這種設計的效率是比較低下的。
如果将緩沖區設計為一個先進先出的隊列,可以同時容納多條消息,那麼隻要緩沖區不滿,生産者就可以送出消息;同時,隻要緩沖區不空,消費者就可以取出消息進行處理。這将大大提高整個程式的效率。
實作時,可以利用信号量計數的特性,用信号量的值表示緩沖區中消息的個數及空閑空間的個數。但這時由于生産者和消費者可能同時通路緩沖區,故需要再用一個互斥量來進行保護。
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <time.h>
#include <string.h>
#include "SqQueue.h"
// 信号量和緩沖區
struct data
{
sem_t empty; // 用來控制生産者,隻有緩沖區為空,生産者才可以生産消息
sem_t full; // 用來控制消費者,隻有緩沖區有資料,才可以消費
Queue q; // 緩沖區隊列
};
struct data msg;
// 互斥鎖
pthread_mutex_t mutex;
int num = 0;
// 生産者線程工作函數
void *Produce(void *v)
{
while (1)
{
int time = rand() % 100 + 1;
usleep(time*10000);
// 隻要隊列不滿 就能生産消息, empty代表目前隊列剩餘的空間
sem_wait(&msg.empty);
pthread_mutex_lock(&mutex); // 搶鎖
num++; // 生産一個消息
// 将消息放入到隊列裡面
EnQueue (&(msg.q), num);
printf ("生産一條消息\n");
pthread_mutex_unlock(&mutex); // 解鎖
// 生産完了,通知消費者進行消費
sem_post(&msg.full);
}
}
// 消費者線程工作函數
void *Consum(void *v)
{
char buf[32];
while (1)
{
int time = rand() % 100 + 1;
usleep(time*10000);
// 隻有緩沖區有資料,就能消費消息, full目前隊列消息的個數
sem_wait(&msg.full);
pthread_mutex_lock(&mutex); // 搶鎖
int num;
DeQueue(&(msg.q), &num); // 去隊列裡取出一條消息
printf("消費了一條消息: %d\n", num);
pthread_mutex_unlock(&mutex); // 解鎖
// 消費完了,通知生産則會進行生産
sem_post(&msg.empty);
}
}
int main()
{
srand ((unsigned int)time(NULL));
// 初始化信号量
sem_init(&msg.empty, 0, 10); // 生産者,一開始要生産 10 條消息
sem_init(&msg.full, 0, 0); // 消費者,一開始要不能消費消息
// 初始化互斥鎖
pthread_mutex_init(&mutex, NULL);
// 初始化隊列
InitQueue(&(msg.q));
pthread_t produceId;
pthread_t consumId;
int i = 0;
for (i = 0; i < 5; i++)
{
// 建立生産者線程
pthread_create(&produceId, NULL, Produce, NULL);
pthread_detach(produceId);
}
// 建立消費者線程
pthread_create(&consumId, NULL, Consum, NULL);
// 等待線程結束
pthread_join(consumId, NULL);
// 銷毀信号量
sem_destroy(&msg.empty);
sem_destroy(&msg.full);
// 銷毀互斥鎖
pthread_mutex_destroy(&mutex);
return 0;
}