天天看點

Linux多線程——生産者消費者模型一.生産者消費者模型

目錄

一.生産者消費者模型

        1.1 什麼是生成者消費者模型

        1.2 生産者消費者模型的優點

        1.3 基于阻塞隊列實作生産者消費者模型

         1.4 POSIX信号量

        1.4.1 信号量概念

        1.4.2 P操作和V操作

        1.4.3 了解信号量

        1.4.4 信号量的函數

         1.4.5 基于環形隊列實作生産者消費者模型

一.生産者消費者模型

        1.1 什麼是生成者消費者模型

        一個程序中的線程有兩種角色,一種是生産者,一種是消費者。生産者為消費者提供任務,消費者拿到任務,解決任務。在生成者和消費者之間還有一個"交易場所",是一個記憶體塊。生成者線程将任務放到記憶體塊中,消費者線程在記憶體塊中拿任務。當記憶體塊資料達到一高水位線時,生産者會進行等待,喚醒消費者拿任務,當記憶體塊資料達到一低水位線時,消費者會等待,并且喚醒生産者生産任務。

        生成者,消費者存在着3種關系。生産者和生産者之間是互斥的關系,消費者和消費者之間是互斥的關系,生産者和消費者之間是互斥和同步的關系。

        1.2 生産者消費者模型的優點

        例如一個正常的函數,不使用生産者消費者模型:

Linux多線程——生産者消費者模型一.生産者消費者模型

       上面是單線程的情況,即使是多線程,不使用生産者消費者模型,生産者直接給消費者送資料,整個程序的效率會是最慢的線程的效率。并且隻能生産一個資料,消費一個資料。兩者還是串行的,耦合度高。

        使用生産者消費者模型:

Linux多線程——生産者消費者模型一.生産者消費者模型

生産者和消費者模型的優點:

  • 實作線程的解耦
  • 支援線程之間并行運作
  • 效率高
  • 友善代碼維護

        1.3 基于阻塞隊列實作生産者消費者模型

        在多線程程式設計中,阻塞隊列是一種常用于實作生産者和消費者模型的資料結構。其普通隊列差別在于,當隊列為空時,從隊列擷取元素的操作将會被阻塞,直到隊列中被放入元素,當隊列滿的時候,往隊列中存放元素的操作也會被阻塞,直到有元素從隊列中取出。

實作的是多生産者,多消費者的模型。

#pragma once 

#include<iostream>
#include<queue>
#include<pthread.h>
//隊列元素個刷
#define NUM 5
//任務
struct Task{
  Task(){};
  Task(int x,int y)
    :_x(x)
    ,_y(y)
  {}
  int _x;
  int _y;
  int Add(){
    return _x + _y;
  }
};
//提供兩個接口,放任務,和拿任務
class blockqueue{
private:
  //加鎖
  void MakeLock(){
    pthread_mutex_lock(&_lock);
  }
  //取消鎖
  void CancelLock(){
    pthread_mutex_unlock(&_lock);
  }
  //喚醒消費者
  void WakeUpConsumer(){
    std::cout<<"Consumer wake up"<<std::endl;
    pthread_cond_signal(&_empty);

  }
  //喚醒生産者
  void WakeUpProductor(){
    std::cout<<"Productor wake up"<<std::endl;
    pthread_cond_signal(&_full);

  }
  //生産者等待
  void SleepProductor(){
    std::cout<<"Productor sleep"<<std::endl;
    pthread_cond_wait(&_full, &_lock);

  }
  //消費者等待
  void SleepConsumer(){
    std::cout<<"Consumer sleep"<<std::endl;
    pthread_cond_wait(&_empty, &_lock);

  }

public:
  blockqueue(size_t cap = NUM)
    :_cap(cap)
  {
    pthread_mutex_init(&_lock, nullptr);
    pthread_cond_init(&_full, nullptr);
    pthread_cond_init(&_empty, nullptr);
  }
  ~blockqueue(){
    pthread_cond_destroy(&_empty);
    pthread_cond_destroy(&_full);
    pthread_mutex_destroy(&_lock);

  }
  //放資料
  void Put(Task in){
    //q隊列是臨界資源,需要加鎖
    MakeLock();
    //需要使用循環
    while(_q.size()>=_cap){
      WakeUpConsumer();
      SleepProductor();
    }
    _q.push(in);
    CancelLock();

  }
  //擷取資料
  void Get(Task& out){
    MakeLock();
    while(_q.empty()){
      WakeUpProductor();
      SleepConsumer();
    }
    out=_q.front();
    _q.pop();
    CancelLock();

  }

private:
  std::queue<Task> _q;
  size_t _cap;
  pthread_mutex_t _lock;
  pthread_cond_t _empty;//消費者在此等待
  pthread_cond_t _full;//生産者在此等待

};


#include"BlockQueue.hpp"
#include<unistd.h>

#define PRO 2
#define CON 2
using namespace std;
//定義兩個互斥量,生産者和消費者之間要互相競争鎖
//決定哪個線程進來
pthread_mutex_t mutex1;
pthread_mutex_t mutex2;

void *Productor(void *arg){
  sleep(1);
  blockqueue *q = (blockqueue *)arg;
  while(true){
    sleep(1);
    int x=rand()%9+1;
    int y=rand()%20+1;
    Task t(x,y);
	//阻塞隊列是共享資源,需要上鎖
    pthread_mutex_lock(&mutex2);
    q->Put(t);
    cout<<pthread_self()<<":"<<x<<"+"<<y<<"="<<"?"<<endl;
    pthread_mutex_unlock(&mutex2);
  }
}
void *Consumer(void *arg){
  blockqueue *q = (blockqueue *)arg;
  while(true){
    sleep(1);
    Task t;
	//阻塞隊列是共享資源,需要上鎖
    pthread_mutex_lock(&mutex1);
    q->Get(t);
    cout<<pthread_self()<<":"<<t._x<<"+"<<t._y<<"="<<t.Add()<<endl;
    pthread_mutex_unlock(&mutex1);
  }

}

int main(){
  pthread_mutex_init(&mutex2,nullptr);
  pthread_mutex_init(&mutex1,nullptr);
  blockqueue *bq = new blockqueue();
  //生産者線程
  pthread_t td1[PRO];
  int i=0;
  for(; i<PRO; i++){
    pthread_create(td1+i, nullptr, Productor, (void *)bq);
  }
  //消費者線程
  pthread_t td2[CON];
  for(i=0; i<CON; i++){
    pthread_create(td2+i, nullptr, Consumer, (void *)bq);
  } 
  
  for(i=0; i<PRO; i++){
    pthread_join(td1[i], nullptr);
  }
  for(i=0; i<CON; i++){
    pthread_join(td2[i], nullptr);
  }
  pthread_mutex_destroy(&mutex2);
  pthread_mutex_destroy(&mutex1);
  delete bq;
  return 0;
}

           

 示範:

Linux多線程——生産者消費者模型一.生産者消費者模型

         1.4 POSIX信号量

        1.4.1 信号量概念

        有一種情況,我們可以将臨界資源分成若幹份,一個線程隻會使用臨界資源中的一份。

        這個時候就有了信号量,信号量本質是一個計數器,描述的是臨界資源的有效個數。

        1.4.2 P操作和V操作

        假如:臨界資源可以分成5個部分,記為count=5。count就被稱作信号量。

        count--,一個執行流占有一個部分的操作叫做P操作。

        count++,一個執行流結束使用臨界資源的一部風叫做V操作。

        當信号量count==0時,如果進行P操作,沒有信号量可以配置設定了,此時會阻塞等待。

        由于信号量每一個線程看到的是同一份資源,信号量也是臨界資源,要保證P,V操作是原子的。

        二進制信号量相當于互斥鎖: 二進制信号量隻有1個信号量,隻要一個線程占有,信号量的值就等于0,其它線程就需要等待。

        1.4.3 了解信号量

        OS中會有很多的信号量,OS系統需要對它們進行管理,管理需要進行描述:

信号量可以描述為:

struct sem{

......

int count;//臨界資源有效個數

mutex lock;//隻允許一個線程對臨界資源進行操作,需要上鎖

wait_queue *head;//等待隊列

......

}

        1.4.4 信号量的函數

  • 初始化
#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);
作用:初始化信号量
參數:
    sem,要初始化的信号量
    pshared:0表示線程間共享,非0表示程序間共享
    value:信号量初始值,信号量個數
           
  • 銷毀信号量
#include <semaphore.h>

int sem_destroy(sem_t *sem);
作用:銷毀定義的信号量
參數:
    sem:要銷毀的信号量
           
  • 等待信号量,P操作
#include <semaphore.h>

int sem_wait(sem_t *sem);
作用:等待信号量,将信号量的值減1,如果信号量為0,阻塞等待
參數:
    sem:要等待的信号量
           
Linux多線程——生産者消費者模型一.生産者消費者模型
  • 釋出信号,V操作
#include <semaphore.h>
int sem_post(sem_t *sem);

作用:表示資源使用完畢,将信号量做加1操作
參數:
    sem:要釋出的信号量
           

         1.4.5 基于環形隊列實作生産者消費者模型

Linux多線程——生産者消費者模型一.生産者消費者模型
  • 環形隊列采用數組模拟,用模運算來模拟環形特征
  • 當隊列滿了或者隊列為空時,都是消費者的下标和生産者的下标相同。不好判斷為空和滿的情況。
    • 有兩種方法:
    • 1.少用一個元素空間,這個時候為空時,下标相等,為滿時,生産者下标加1在取模等于消費者下标。
    • 2.增加一個計數器,來記錄元素個數。
  • 我們這裡正好有信号量這個計數器,隊列裡的每一個位置代表一個信号量。正好信号量就是這個計數器。

定義兩個信号量,一個信号量表示空格字space_sem,一個信号量表示資料_data_sem。

生産者:放元素,關注的說空格子這個信号量。

僞代碼:

        P(space_sem)

        生産資料

        V(data_sem)

消費者:拿元素,關注的是資料這個信号量。 

僞代碼:

        P(data_sem)

        生産資料

        V(space_sem)

        執行到同一位置時,為空或者滿,此時要不就是space_sem為臨界資源總有效個數,data_sem為0,要不就是data_sem為臨界資源總有效個數,space_sem為0。這個時候,放資料和拿資料總會有一個在等待(P操作)。

        當生産者快,消費者慢時,一開始生産者将資料放滿,在消費者消費一個,在生産者生産一個。隊列經常是滿的。

        當生産者,消費者快時,一開始沒資料,需要生産者生産,在消費一個,現象時生産一個消費一個,隊列經常是空的。

多消費者多生産者:

#pragma once 
#include<iostream>
#include<vector>
#include<semaphore.h>
#include<pthread.h>
#define NUM 5

class RingQueue{
  private:
    void P(sem_t& s){
      //信号量減減操作,如果為0等待
      sem_wait(&s);
    }
    void V(sem_t& s){
      //信号量加加操作
      sem_post(&s);
    }
  public:
    RingQueue(size_t cap = NUM)
      :_v(cap)
      ,_cap(cap)
      ,_cindex(0)
      ,_pindex(0)
      {
        sem_init(&_space_sem, 0, cap);
        sem_init(&_data_sem, 0, 0);
      }

    void Put(const int& in){
      //生産者關注格子數
      P(_space_sem);
      _v[_pindex]=in;
      _pindex++;
      _pindex %= _cap;
      V(_data_sem);

    }
    void Get(int& out){
      //消費者關注資料
      P(_data_sem);
      out = _v[_cindex];
      _cindex++;
      _cindex %= _cap;
      V(_space_sem);

    }

    ~RingQueue(){
      sem_destroy(&_space_sem);
      sem_destroy(&_data_sem);
      _cindex = 0;
      _pindex = 0;
    }

  private:
    std::vector<int> _v;//隊列
    size_t _cap;//隊列容量
    sem_t _space_sem;//格子信号量
    sem_t _data_sem;//資料信号量
    
    int _cindex;//消費者位置
    int _pindex;//生産者位置

};
#include"RingQueue.hpp"
#include<unistd.h>

using namespace std;

#define CON 4
#define PRO 4

pthread_mutex_t mutex1;
pthread_mutex_t mutex2;
void *consumer(void *arg){
  RingQueue *rq=(RingQueue *)arg;
  while(1){
    sleep(1);
    int x=0;
    pthread_mutex_lock(&mutex1);
    rq->Get(x);
    pthread_mutex_unlock(&mutex1);
    cout<<pthread_self()<<":"<<"consumer get a data :"<<x<<endl;

  }
}

void *productor(void *arg){
  RingQueue *rq=(RingQueue *)arg;
  while(1){
    //sleep(1);
    int x=rand()%10+1;
    pthread_mutex_lock(&mutex2);
    rq->Put(x);
    pthread_mutex_unlock(&mutex2);
    cout<<pthread_self()<<":"<<"productor put a data :"<<x<<endl;

  }
}
int main(){
  RingQueue *rq = new RingQueue();
  pthread_t td1[CON];
  pthread_t td2[PRO];
  int i=0;
  for(; i<CON; i++){
    pthread_create(td1+i, nullptr, consumer, (void *)rq);
  }
  for(i=0; i<PRO; i++){
    pthread_create(td2+i, nullptr, productor, (void *)rq);
  }
  

  for(i=0; i<CON; i++){
    pthread_join(td1[i], nullptr);
  }
  for(i=0; i<PRO; i++){
    pthread_join(td2[i], nullptr);
  }
  delete rq;

  return 0;
}

           

繼續閱讀