天天看點

c++ 11 多線線程系列-----------生産者消費者

        生産者消費者問題是多線程并發中一個非常經典的問題,相信學過作業系統課程的同學都清楚這個問題的根源。本文将就四種情況分析并介紹生産者和消費者問題,它們分别是:單生産者-單消費者模型,單生産者-多消費者模型,多生産者-單消費者模型,多生産者-多消費者模型,我會給出四種情況下的 C++11 并發解決方案。

一、單生産者-單消費者模型

         顧名思義,單生産者-單消費者模型中隻有一個生産者和一個消費者,生産者不停地往産品庫中放入産品,消費者則從産品庫中取走産品,産品庫容積有限制,隻能容納一定數目的産品,如果生産者生産産品的速度過快,則需要等待消費者取走産品之後,産品庫不為空才能繼續往産品庫中放置新的産品,相反,如果消費者取走産品的速度過快,則可能面臨産品庫中沒有産品可使用的情況,此時需要等待生産者放入一個産品後,消費者才能繼續工作。C++11實作單生産者單消費者模型的代碼如下:

#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>

static const int repository_size = 10;//循環隊列的大小
static const int item_total = 20;//要生産的産品數目

std::mutex mtx;//互斥量,保護産品緩沖區

std::condition_variable repo_not_full;//條件變量訓示産品緩沖區不滿
std::condition_variable repo_not_empty;//條件變量訓示産品緩沖區不為空,就是緩沖區有産品

int item_buffer[repository_size];

static std::size_t read_position = 0;//消費者讀取産品的位置
static std::size_t write_position = 0;//生産者寫入産品的位置

std::chrono::seconds t(1);//a  new feature of c++ 11 standard

void produce_item(int i)
{
  std::unique_lock<std::mutex> lck(mtx);
  while (((write_position + 1) % repository_size) == read_position)
  {
    std::cout << "Producer is waiting for an empty slot..." << std::endl;
    repo_not_full.wait(lck);// 生産者等待"産品庫緩沖區不為滿"這一條件發生.
  }                            //當緩沖區滿了之後我們就不能添加産品了

  item_buffer[write_position] = i;//寫入産品
  write_position++;

  if (write_position == repository_size)//寫入的位置如果在隊列最後則重新設定
  {
    write_position = 0;
  }

  repo_not_empty.notify_all();//通知消費者産品庫不為空

  //lck.unlock();//解鎖
}

int consume_item()
{
  int data;
  std::unique_lock<std::mutex> lck(mtx);
  while (write_position == read_position)
  {
    std::cout << "Consumer is waiting for items..." << std::endl;
    repo_not_empty.wait(lck);// 消費者等待"産品庫緩沖區不為空"這一條件發生.
  }                            

  data = item_buffer[read_position];//讀取産品
  read_position++;

  if (read_position >= repository_size)
  {
    read_position = 0;
  }
    
  repo_not_full.notify_all();//通知産品庫不滿
  //lck.unlock();

  return data;
}

void Producer_thread()
{
  for (int i = 1; i <= item_total; ++i)
  {
    //std::this_thread::sleep_for(t);
    std::cout << "生産者生産第" << i  << "個産品" << std::endl;
    produce_item(i);
  }
}

void Consumer_thread()
{
  static int cnt = 0;
  while (1)
  {
    //std::this_thread::sleep_for(t);
    int item = consume_item();
    std::cout << "消費者消費第" << item << "個産品" << std::endl;

    if (++cnt == item_total)
      break;
  }
}

int main()
{
  std::thread producer(Producer_thread); // 建立生産者線程.
  std::thread consumer(Consumer_thread); // 建立消費之線程.
  producer.join();
  consumer.join();
}      

二、單生産者-多消費者模型

           與單生産者和單消費者模型不同的是,單生産者-多消費者模型中可以允許多個消費者同時從産品庫中取走産品。是以除了保護産品庫在多個讀寫線程下互斥之外,還需要維護消費者取走産品的計數器,代碼如下:

#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
static const int repository_size = 10;//循環隊列的大小
static const int item_total = 20;//要生産的産品數目

std::mutex mtx;//互斥量,保護産品緩沖區
std::mutex mtx_counter;//互斥量,保護産品計數器

std::condition_variable repo_not_full;//條件變量訓示産品緩沖區不滿
std::condition_variable repo_not_empty;//條件變量訓示産品緩沖區不為空,就是緩沖區有産品

int item_buffer[repository_size];//産品緩沖區,這裡使用了一個循環隊列

static std::size_t read_position = 0;//消費者讀取産品的位置
static std::size_t write_position = 0;//生産者寫入産品的位置

static std::size_t item_counter = 0;//消費者消費産品計數器

std::chrono::seconds t(1);//a  new feature of c++ 11 standard

void produce_item(int i)
{
  std::unique_lock<std::mutex> lck(mtx);
  //item buffer is full, just wait here.
  while (((write_position + 1) % repository_size) == read_position)
  {
    std::cout << "Producer is waiting for an empty slot..." << std::endl;
    repo_not_full.wait(lck);// 生産者等待"産品庫緩沖區不為滿"這一條件發生.
  }                            //當緩沖區滿了之後我們就不能添加産品了

  item_buffer[write_position] = i;//寫入産品
  write_position++;

  if (write_position == repository_size)//寫入的位置如果在隊列最後則重新設定
  {
    write_position = 0;
  }

  repo_not_empty.notify_all();//通知消費者産品庫不為空

  lck.unlock();//解鎖
}

int consume_item()
{
  int data;
  std::unique_lock<std::mutex> lck(mtx);
  // item buffer is empty, just wait here.
  while (write_position == read_position)
  {
    std::cout << "Consumer is waiting for items..." << std::endl;
    repo_not_empty.wait(lck);// 消費者等待"産品庫緩沖區不為空"這一條件發生.
  }

  data = item_buffer[read_position];//讀取産品
  read_position++;

  if (read_position >= repository_size)
  {
    read_position = 0;
  }

  repo_not_full.notify_all();//通知産品庫不滿
  lck.unlock();

  return data;
}

void Producer_thread()
{
  for (int i = 1; i <= item_total; ++i)
  {
    //std::this_thread::sleep_for(t);
    std::cout << "生産者生産第" << i << "個産品" << std::endl;
    produce_item(i);
  }
}

void Consumer_thread()
{
  bool read_to_exit = false;
  while (1)
  {
    std::this_thread::sleep_for(t);
    std::unique_lock<std::mutex> lck(mtx_counter);
    if (item_counter < item_total)
    {
      int item = consume_item();
      ++item_counter;
      std::cout << "消費者線程" << std::this_thread::get_id()
        << "消費第" << item << "個産品" << std::endl;
    }
    else
    {
      read_to_exit = true;
    }
    if (read_to_exit == true)
      break;
  }

  std::cout << "Consumer thread " << std::this_thread::get_id()
    << " is exiting..." << std::endl;

}

int main()
{
  std::thread producer(Producer_thread); // 建立生産者線程.
  std::vector<std::thread> thread_vector;
  for (int i = 0; i != 5; ++i)
  {
    thread_vector.push_back(std::thread(Consumer_thread));// 建立消費者線程.
  }
  
  producer.join();
  for (auto &thr : thread_vector)
  {
    thr.join();
  }

}      

三、多生産者-單消費者模型

          與單生産者和單消費者模型不同的是,多生産者-單消費者模型中可以允許多個生産者同時向産品庫中放入産品。是以除了保護産品庫在多個讀寫線程下互斥之外,還需要維護生産者放入産品的計數器,代碼如下:

#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>

static const int repository_size = 10;//循環隊列的大小
static const int item_total = 20;//要生産的産品數目

std::mutex mtx;//互斥量,保護産品緩沖區
std::mutex mtx_counter;

std::condition_variable repo_not_full;//條件變量訓示産品緩沖區不滿
std::condition_variable repo_not_empty;//條件變量訓示産品緩沖區不為空,就是緩沖區有産品

int item_buffer[repository_size];//産品緩沖區,這裡使用了一個循環隊列

static std::size_t read_position = 0;//消費者讀取産品的位置
static std::size_t write_position = 0;//生産者寫入産品的位置

static std::size_t item_counter = 0;//計數器

std::chrono::seconds t(1);//a  new feature of c++ 11 standard

void produce_item(int i)
{
  std::unique_lock<std::mutex> lck(mtx);
  // item buffer is full, just wait here.
  while (((write_position + 1) % repository_size) == read_position)
  {
    std::cout << "Producer is waiting for an empty slot..." << std::endl;
    repo_not_full.wait(lck);// 生産者等待"産品庫緩沖區不為滿"這一條件發生.
  }                            //當緩沖區滿了之後我們就不能添加産品了

  item_buffer[write_position] = i;//寫入産品
  write_position++;

  if (write_position == repository_size)//寫入的位置如果在隊列最後則重新設定
  {
    write_position = 0;
  }

  repo_not_empty.notify_all();//通知消費者産品庫不為空

  lck.unlock();//解鎖
}

int consume_item()
{
  int data;
  std::unique_lock<std::mutex> lck(mtx);
  // item buffer is empty, just wait here.
  while (write_position == read_position)
  {
    std::cout << "Consumer is waiting for items..." << std::endl;
    repo_not_empty.wait(lck);// 消費者等待"産品庫緩沖區不為空"這一條件發生.
  }

  data = item_buffer[read_position];//讀取産品
  read_position++;

  if (read_position >= repository_size)
  {
    read_position = 0;
  }

  repo_not_full.notify_all();//通知産品庫不滿
  lck.unlock();

  return data;
}

void Producer_thread()
{
  bool read_to_exit = false;
  while (1)
  {
    std::unique_lock<std::mutex> lck(mtx_counter);
    if (item_counter < item_total)
    {
      ++item_counter;
      produce_item(item_counter);
      std::cout << "生産者線程 " << std::this_thread::get_id()
        << "生産第  " << item_counter << "個産品" << std::endl;
    }
    else
    {
      read_to_exit = true;
    }

    if (read_to_exit == true)
      break;
  }

  std::cout << "Producer thread " << std::this_thread::get_id()
    << " is exiting..." << std::endl;

}

void Consumer_thread()
{
  static int cnt = 0;
  while (1)
  {
    std::this_thread::sleep_for(t);
    int item = consume_item();
    std::cout << "消費者消費第" << item << "個産品" << std::endl;

    if (++cnt == item_total)
      break;
  }
}

int main()
{
  std::vector<std::thread> thread_vector;
  for (int i = 0; i != 5; ++i)
  {
    thread_vector.push_back(std::thread(Producer_thread));// 建立消費者線程.
  }

  std::thread consumer(Consumer_thread); // 建立消費之線程.

  for (auto &thr : thread_vector)
  {
    thr.join();
  }

  consumer.join();
}      

三、多生産者-多消費者模型

          該模型可以說是前面兩種模型的綜合,程式需要維護兩個計數器,分别是生産者已生産産品的數目和消費者已取走産品的數目。另外也需要保護産品庫在多個生産者和多個消費者互斥地通路。

#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>

static const int repository_size = 10;//循環隊列的大小
static const int item_total = 20;//要生産的産品數目

std::mutex mtx;//互斥量,保護産品緩沖區
std::mutex producer_count_mtx;
std::mutex consumer_count_mtx;

std::condition_variable repo_not_full;//條件變量訓示産品緩沖區不滿
std::condition_variable repo_not_empty;//條件變量訓示産品緩沖區不為空,就是緩沖區有産品

int item_buffer[repository_size];//産品緩沖區,這裡使用了一個循環隊列

static std::size_t read_position = 0;//消費者讀取産品的位置
static std::size_t write_position = 0;//生産者寫入産品的位置

static size_t produced_item_counter = 0;
static size_t consumed_item_counter = 0;

std::chrono::seconds t(1);//a  new feature of c++ 11 standard
std::chrono::microseconds t1(1000);

void produce_item(int i)
{
  std::unique_lock<std::mutex> lck(mtx);
  // item buffer is full, just wait here.
  while (((write_position + 1) % repository_size) == read_position)
  {
    std::cout << "Producer is waiting for an empty slot..." << std::endl;
    repo_not_full.wait(lck);// 生産者等待"産品庫緩沖區不為滿"這一條件發生.
  }                            //當緩沖區滿了之後我們就不能添加産品了

  item_buffer[write_position] = i;//寫入産品
  write_position++;

  if (write_position == repository_size)//寫入的位置如果在隊列最後則重新設定
  {
    write_position = 0;
  }

  repo_not_empty.notify_all();//通知消費者産品庫不為空

  lck.unlock();//解鎖
}

int consume_item()
{
  int data;
  std::unique_lock<std::mutex> lck(mtx);
  // item buffer is empty, just wait here.
  while (write_position == read_position)
  {
    std::cout << "Consumer is waiting for items..." << std::endl;
    repo_not_empty.wait(lck);// 消費者等待"産品庫緩沖區不為空"這一條件發生.
  }

  data = item_buffer[read_position];//讀取産品
  read_position++;

  if (read_position >= repository_size)
  {
    read_position = 0;
  }

  repo_not_full.notify_all();//通知産品庫不滿
  lck.unlock();

  return data;
}

void Producer_thread()
{
  bool ready_to_exit = false;
  while (1)
  {
    //std::this_thread::sleep_for(t);
    std::unique_lock<std::mutex> lock(producer_count_mtx);
    if (produced_item_counter < item_total)
    {
      ++produced_item_counter;
      produce_item(produced_item_counter);
      std::cout << "生産者線程 " << std::this_thread::get_id()
        << "生産第  " << produced_item_counter << "個産品" << std::endl;
    }
    else
    {
      ready_to_exit = true;
    }

    lock.unlock();

    if (ready_to_exit == true)
    {
      break;
    }
  }

  std::cout << "Producer thread " << std::this_thread::get_id()
    << " is exiting..." << std::endl;
}

void Consumer_thread()
{
  bool read_to_exit = false;
  while (1)
  {
    std::this_thread::sleep_for(t1);
    std::unique_lock<std::mutex> lck(consumer_count_mtx);
    if (consumed_item_counter < item_total)
    {
      int item = consume_item();
      ++consumed_item_counter;
      std::cout << "消費者線程" << std::this_thread::get_id()
        << "消費第" << item << "個産品" << std::endl;
    }
    else
    {
      read_to_exit = true;
    }

    if (read_to_exit == true)
    {
      break;
    }
  }

  std::cout << "Consumer thread " << std::this_thread::get_id()
    << " is exiting..." << std::endl;
}

int main()
{
  std::vector<std::thread> thread_vector1;
  std::vector<std::thread> thread_vector2;
  for (int i = 0; i != 5; ++i)
  {
    thread_vector1.push_back(std::thread(Producer_thread));// 建立生産者線程.
    thread_vector2.push_back(std::thread(Consumer_thread));// 建立消費者線程.

  }

  for (auto &thr1 : thread_vector1)
  {
    thr1.join();
  }

  for (auto &thr2 : thread_vector2)
  {
    thr2.join();
  }
}      

繼續閱讀