我在8年前去面試程式員的時候,一個不大的公司,裡面的開發主管接待了我們,給我的題目就是寫一段程式模拟生産者消費者問題,當時可把我難壞了,一下子感覺自己的知識儲備竟然如此的匮乏。
而在我從事DBA工作之後,經常會有大批量并發的環境,有的需要排障,有的需要優化,在很多并發的場景中,發現生産者消費者問題可以模拟出很多實際中的問題,是以生産者消費者問題非常重要,也是我想不斷改進和探索的一類問題。
引入倉庫的必要性
要想使用程式來模拟,其實也不用花太多的時間,我們簡單說說需要考慮的地方。首先生産者,消費者是兩個實體對象,生産者生産物品,消費者消費物品,如果在生産者中定義生産的流程,在消費者中定義消費的流程,兩個對象就需要彼此引用,依賴性太高,而且實際上性能也好不到哪裡去,是以就需要一個緩沖器,一個中間對象,我們就叫做倉庫吧,生産的物品推入倉庫,消費的物品從倉庫中取出,這樣生産者和消費者就能夠取消之間的引用,直接通過倉庫引用來同步狀态,降低耦合。
是以我們的一個初步設想就是生産者-->倉庫<--消費者 這樣的模式。
生産者消費者的幾種類型和實作方式
當然生産者消費者問題有兩種類型,一種就是使用某種機制來保護生産者和消費者之間的同步,另外一種和Linux中的管道思路相似。相對來說第一種類型的處理方式更為通用,大體分為三類具體的實作方式:
經典的wait(),notify()方法
await(),signal()方法
使用阻塞隊列(BlockingQueue),比如LinkedBlockingQueue
通用的對象類
為了更快出成果,盡可能快速了解,我也參考了一些資料,下午下班前寫了下面的程式。我就簡單說明第一種和第二種吧。
因為實體類對象是通用的,我就不再重複列出了,有生産者Producer和消費者Consumer兩個類。
生産者類
import com.jeanron.test1.Storage;
public class Producer extends Thread {
// 每次生産的産品數量
private int num;
// 所在放置的倉庫
private Storage storage;
// 構造函數,設定倉庫
public Producer(Storage storage) {
this.storage = storage;
}
public Producer(Storage storage, int num) {
this.num = num;
// 線程run函數
public void run() {
produce(num);
// 調用倉庫Storage的生産函數
public void produce(int num) {
storage.produce(num);
// get/set方法
public int getNum() {
return num;
public void setNum(int num) {
public Storage getStorage() {
return storage;
public void setStorage(Storage storage) {
}
消費者類
public class Consumer extends Thread
{
// 每次消費的産品數量
private int num;
// 所在放置的倉庫
private Storage storage;
// 構造函數,設定倉庫
public Consumer(Storage storage)
{
this.storage = storage;
}
public Consumer(Storage storage,int num)
this.num = num;
// 線程run函數
public void run()
consume(num);
// 調用倉庫Storage的生産函數
public void consume(int num)
storage.consume(num);
// get/set方法
public int getNum()
return num;
public void setNum(int num)
this.num = num;
public Storage getStorage()
return storage;
public void setStorage(Storage storage)
} 滴喲中
第一種實作方式
import java.util.LinkedList;
public class Storage
// 倉庫最大存儲量
private final int MAX_SIZE = 200;
// 倉庫存儲的載體
private LinkedList<Object> list = new LinkedList<Object>();
private static Storage storage=null;
public static Storage getInstance() {
if (storage == null) {
synchronized (Storage.class) {
if (storage == null) {
storage = new Storage();
}
}
}
return storage;
}
// 生産num個産品
public void produce(int num)
// 同步代碼段
synchronized (list)
{
// 如果倉庫剩餘容量不足
while (list.size() + num > MAX_SIZE)
{
System.out.println(Thread.currentThread().getName()+"【生産者:要生産的産品數量】:" + num + "\t【庫存量】:"
+ list.size() + "\t暫時不能執行生産任務!");
try
{
// 由于條件不滿足,生産阻塞
list.wait();
}
catch (InterruptedException e)
e.printStackTrace();
}
// 生産條件滿足情況下,生産num個産品
for (int i = 1; i <= num; ++i)
list.add(new Object());
System.out.println(Thread.currentThread().getName()+"【生産者:已經生産産品數】:" + num + "\t【現倉儲量為】:" + list.size());
list.notifyAll();
}
// 消費num個産品
// 如果倉庫存儲量不足
while (list.size() < num)
System.out.println(Thread.currentThread().getName()+"【消費者:要消費的産品數量】:" + num + "\t【庫存量】:"
+ list.size() + "\t暫時不能執行消費任務!");
// 由于條件不滿足,消費阻塞
// 消費條件滿足情況下,消費num個産品
list.remove();
System.out.println(Thread.currentThread().getName()+"【消費者:已經消費産品數】:" + num + "\t【現倉儲量為】:" + list.size());
public LinkedList<Object> getList()
return list;
public void setList(LinkedList<Object> list)
this.list = list;
public int getMAX_SIZE()
return MAX_SIZE;
}
第二種實作方式
第二種使用await和signal的方式實作,我們使用Lock來産生兩個Condition對象來管理任務間的通信,一個重要的參考點就是倉庫滿,倉庫空,這類方式最後必須有try-finally的子句,保證能夠釋放鎖。
package com.jeanron.test2;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.jeanron.test2.Storage;
// 鎖
private final Lock lock = new ReentrantLock();
// 倉庫滿的條件變量
private final Condition full = lock.newCondition();
// 倉庫空的條件變量
private final Condition empty = lock.newCondition();
// 獲得鎖
lock.lock();
// 如果倉庫剩餘容量不足
while (list.size() + num > MAX_SIZE)
System.out.println(Thread.currentThread().getName()+"【要生産的産品數量】:" + num + "\t【庫存量】:" + list.size()
+ "\t暫時不能執行生産任務!");
try
// 由于條件不滿足,生産阻塞
full.await();
catch (InterruptedException e)
e.printStackTrace();
// 生産條件滿足情況下,生産num個産品
for (int i = 1; i <= num; ++i)
list.add(new Object());
System.out.println(Thread.currentThread().getName()+"【已經生産産品數】:" + num + "\t【現倉儲量為】:" + list.size());
// 喚醒其他所有線程
full.signalAll();
empty.signalAll();
// 釋放鎖
lock.unlock();
// 如果倉庫存儲量不足
while (list.size() < num)
System.out.println(Thread.currentThread().getName()+"【要消費的産品數量】:" + num + "\t【庫存量】:" + list.size()
// 由于條件不滿足,消費阻塞
empty.await();
// 消費條件滿足情況下,消費num個産品
list.remove();
System.out.println(Thread.currentThread().getName()+"【已經消費産品數】:" + num + "\t【現倉儲量為】:" + list.size());
// set/get方法
測試類
測試類算是用用,我們直接使用多線程排程器Executor來做。對于生産者消費者使用随機數的方式來初始化物品數,倉庫使用單例模式。
package com.jeanron.main;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.jeanron.entity1.Consumer;
import com.jeanron.entity1.Producer;
public class Test1
public static void main(String[] args)
ExecutorService exec = Executors.newCachedThreadPool();
for(int i=0;i<5;i++){
exec.execute(new Producer(Storage.getInstance(),new Random().nextInt(10)*10));
}
for(int i=0;i<10;i++){
exec.execute(new Consumer(Storage.getInstance(),new Random().nextInt(10)*10));
}
測試結果
生産者是5個線程,消費者是10個線程,我們來看看調用之後的資訊,這個可以簡單分析下日志即可,不具有典型性和代表性。
第一類實作方式的輸出如下:
pool-1-thread-1【生産者:已經生産産品數】:10 【現倉儲量為】:10
pool-1-thread-3【生産者:已經生産産品數】:50 【現倉儲量為】:60
pool-1-thread-2【生産者:已經生産産品數】:50 【現倉儲量為】:110
pool-1-thread-5【生産者:已經生産産品數】:0 【現倉儲量為】:110
pool-1-thread-4【生産者:已經生産産品數】:80 【現倉儲量為】:190
pool-1-thread-4【消費者:已經消費産品數】:40 【現倉儲量為】:150
pool-1-thread-1【消費者:已經消費産品數】:10 【現倉儲量為】:140
pool-1-thread-3【消費者:已經消費産品數】:20 【現倉儲量為】:120
pool-1-thread-2【消費者:已經消費産品數】:10 【現倉儲量為】:110
pool-1-thread-5【消費者:已經消費産品數】:40 【現倉儲量為】:70
pool-1-thread-6【消費者:已經消費産品數】:30 【現倉儲量為】:40
pool-1-thread-4【消費者:要消費的産品數量】:90 【庫存量】:40 暫時不能執行消費任務!
pool-1-thread-1【消費者:要消費的産品數量】:50 【庫存量】:40 暫時不能執行消費任務!
pool-1-thread-3【消費者:要消費的産品數量】:90 【庫存量】:40 暫時不能執行消費任務!
pool-1-thread-7【消費者:已經消費産品數】:20 【現倉儲量為】:20
pool-1-thread-3【消費者:要消費的産品數量】:90 【庫存量】:20 暫時不能執行消費任務!
pool-1-thread-1【消費者:要消費的産品數量】:50 【庫存量】:20 暫時不能執行消費任務!
pool-1-thread-4【消費者:要消費的産品數量】:90 【庫存量】:20 暫時不能執行消費任務!
第二類實作方式的輸出如下:
pool-1-thread-1【已經生産産品數】:16 【現倉儲量為】:16
pool-1-thread-3【已經生産産品數】:60 【現倉儲量為】:76
pool-1-thread-2【已經生産産品數】:67 【現倉儲量為】:143
pool-1-thread-5【要生産的産品數量】:90 【庫存量】:143 暫時不能執行生産任務!
pool-1-thread-4【已經生産産品數】:27 【現倉儲量為】:170
pool-1-thread-1【已經消費産品數】:10 【現倉儲量為】:160
pool-1-thread-2【已經消費産品數】:23 【現倉儲量為】:137
pool-1-thread-6【已經消費産品數】:18 【現倉儲量為】:119
pool-1-thread-5【要生産的産品數量】:90 【庫存量】:119 暫時不能執行生産任務!
pool-1-thread-3【已經消費産品數】:97 【現倉儲量為】:22
pool-1-thread-1【要消費的産品數量】:57 【庫存量】:22 暫時不能執行生産任務!
pool-1-thread-6【要消費的産品數量】:62 【庫存量】:22 暫時不能執行生産任務!
pool-1-thread-3【要消費的産品數量】:35 【庫存量】:22 暫時不能執行生産任務!
pool-1-thread-4【已經消費産品數】:17 【現倉儲量為】:5
pool-1-thread-2【已經消費産品數】:1 【現倉儲量為】:4
pool-1-thread-7【要消費的産品數量】:93 【庫存量】:4 暫時不能執行生産任務!
pool-1-thread-2【已經消費産品數】:4 【現倉儲量為】:0
pool-1-thread-5【已經生産産品數】:90 【現倉儲量為】:90
pool-1-thread-1【已經消費産品數】:57 【現倉儲量為】:33
pool-1-thread-6【要消費的産品數量】:62 【庫存量】:33 暫時不能執行生産任務!
pool-1-thread-3【要消費的産品數量】:35 【庫存量】:33 暫時不能執行生産任務!
pool-1-thread-4【要消費的産品數量】:45 【庫存量】:33 暫時不能執行生産任務!
pool-1-thread-7【要消費的産品數量】:93 【庫存量】:33 暫時不能執行生産任務!
pool-1-thread-10【要消費的産品數量】:81 【庫存量】:33 暫時不能執行生産任務!
pool-1-thread-2【要消費的産品數量】:65 【庫存量】:33 暫時不能執行生産任務!
pool-1-thread-5【要消費的産品數量】:38 【庫存量】:33 暫時不能執行生産任務!
pool-1-thread-8【已經消費産品數】:33 【現倉儲量為】:0
pool-1-thread-12【要消費的産品數量】:21 【庫存量】:0 暫時不能執行生産任務!
pool-1-thread-9【要消費的産品數量】:24 【庫存量】:0 暫時不能執行生産任務!
pool-1-thread-11【要消費的産品數量】:54 【庫存量】:0 暫時不能執行生産任務!
pool-1-thread-13【要消費的産品數量】:58 【庫存量】:0 暫時不能執行生産任務!
pool-1-thread-6【要消費的産品數量】:62 【庫存量】:0 暫時不能執行生産任務!
pool-1-thread-3【要消費的産品數量】:35 【庫存量】:0 暫時不能執行生産任務!
pool-1-thread-4【要消費的産品數量】:45 【庫存量】:0 暫時不能執行生産任務!
pool-1-thread-7【要消費的産品數量】:93 【庫存量】:0 暫時不能執行生産任務!
pool-1-thread-10【要消費的産品數量】:81 【庫存量】:0 暫時不能執行生産任務!
pool-1-thread-2【要消費的産品數量】:65 【庫存量】:0 暫時不能執行生産任務!
pool-1-thread-5【要消費的産品數量】:38 【庫存量】:0 暫時不能執行生産任務!
