天天看點

Java實作生産者消費者的兩種方式(r12筆記第66天)

   我在8年前去面試程式員的時候,一個不大的公司,裡面的開發主管接待了我們,給我的題目就是寫一段程式模拟生産者消費者問題,當時可把我難壞了,一下子感覺自己的知識儲備竟然如此的匮乏。

   而在我從事DBA工作之後,經常會有大批量并發的環境,有的需要排障,有的需要優化,在很多并發的場景中,發現生産者消費者問題可以模拟出很多實際中的問題,是以生産者消費者問題非常重要,也是我想不斷改進和探索的一類問題。

引入倉庫的必要性

要想使用程式來模拟,其實也不用花太多的時間,我們簡單說說需要考慮的地方。首先生産者,消費者是兩個實體對象,生産者生産物品,消費者消費物品,如果在生産者中定義生産的流程,在消費者中定義消費的流程,兩個對象就需要彼此引用,依賴性太高,而且實際上性能也好不到哪裡去,是以就需要一個緩沖器,一個中間對象,我們就叫做倉庫吧,生産的物品推入倉庫,消費的物品從倉庫中取出,這樣生産者和消費者就能夠取消之間的引用,直接通過倉庫引用來同步狀态,降低耦合。

    是以我們的一個初步設想就是生産者-->倉庫<--消費者 這樣的模式。

生産者消費者的幾種類型和實作方式

    當然生産者消費者問題有兩種類型,一種就是使用某種機制來保護生産者和消費者之間的同步,另外一種和Linux中的管道思路相似。相對來說第一種類型的處理方式更為通用,大體分為三類具體的實作方式:

經典的wait(),notify()方法

await(),signal()方法

使用阻塞隊列(BlockingQueue),比如LinkedBlockingQueue

通用的對象類

為了更快出成果,盡可能快速了解,我也參考了一些資料,下午下班前寫了下面的程式。我就簡單說明第一種和第二種吧。

  因為實體類對象是通用的,我就不再重複列出了,有生産者Producer和消費者Consumer兩個類。

生産者類

import com.jeanron.test1.Storage;

public class Producer extends Thread {

    // 每次生産的産品數量

    private int num;

    // 所在放置的倉庫

    private Storage storage;

    // 構造函數,設定倉庫

    public Producer(Storage storage) {

        this.storage = storage;

    }

    public Producer(Storage storage, int num) {

        this.num = num;

    // 線程run函數

    public void run() {

        produce(num);

    // 調用倉庫Storage的生産函數

    public void produce(int num) {

        storage.produce(num);

    // get/set方法

    public int getNum() {

        return num;

    public void setNum(int num) {

    public Storage getStorage() {

        return storage;

    public void setStorage(Storage storage) {

}

消費者類

public class Consumer extends Thread  

{  

    // 每次消費的産品數量  

    private int num;  

    // 所在放置的倉庫  

    private Storage storage;  

    // 構造函數,設定倉庫  

    public Consumer(Storage storage)  

    {  

        this.storage = storage;  

    }  

    public Consumer(Storage storage,int num)  

        this.num = num;

    // 線程run函數  

    public void run()  

        consume(num);  

    // 調用倉庫Storage的生産函數  

    public void consume(int num)  

        storage.consume(num);  

    // get/set方法  

    public int getNum()  

        return num;  

    public void setNum(int num)  

        this.num = num;  

    public Storage getStorage()  

        return storage;  

    public void setStorage(Storage storage)  

}  滴喲中

第一種實作方式

import java.util.LinkedList;

public class Storage  

    // 倉庫最大存儲量  

    private final int MAX_SIZE = 200;  

    // 倉庫存儲的載體  

    private LinkedList<Object> list = new LinkedList<Object>();  

    private static Storage storage=null;  

    public static Storage getInstance() {  

        if (storage == null) {    

            synchronized (Storage.class) {    

               if (storage == null) {    

                   storage = new Storage();   

               }    

            }    

        }    

        return storage;   

    }   

    // 生産num個産品  

    public void produce(int num)  

        // 同步代碼段  

        synchronized (list)  

        {  

            // 如果倉庫剩餘容量不足  

            while (list.size() + num > MAX_SIZE)  

            {  

                System.out.println(Thread.currentThread().getName()+"【生産者:要生産的産品數量】:" + num + "\t【庫存量】:"  

                        + list.size() + "\t暫時不能執行生産任務!");  

                try  

                {  

                    // 由于條件不滿足,生産阻塞  

                    list.wait();  

                }  

                catch (InterruptedException e)  

                    e.printStackTrace();  

            }  

            // 生産條件滿足情況下,生産num個産品  

            for (int i = 1; i <= num; ++i)  

                list.add(new Object());  

            System.out.println(Thread.currentThread().getName()+"【生産者:已經生産産品數】:" + num + "\t【現倉儲量為】:" + list.size());  

            list.notifyAll();  

        }  

    // 消費num個産品  

            // 如果倉庫存儲量不足  

            while (list.size() < num)  

                System.out.println(Thread.currentThread().getName()+"【消費者:要消費的産品數量】:" + num + "\t【庫存量】:"  

                        + list.size() + "\t暫時不能執行消費任務!");  

                    // 由于條件不滿足,消費阻塞  

            // 消費條件滿足情況下,消費num個産品  

                list.remove();  

            System.out.println(Thread.currentThread().getName()+"【消費者:已經消費産品數】:" + num + "\t【現倉儲量為】:" + list.size());  

    public LinkedList<Object> getList()  

        return list;  

    public void setList(LinkedList<Object> list)  

        this.list = list;  

    public int getMAX_SIZE()  

        return MAX_SIZE;  

}  

第二種實作方式

第二種使用await和signal的方式實作,我們使用Lock來産生兩個Condition對象來管理任務間的通信,一個重要的參考點就是倉庫滿,倉庫空,這類方式最後必須有try-finally的子句,保證能夠釋放鎖。

package com.jeanron.test2;

import java.util.LinkedList;  

import java.util.concurrent.locks.Condition;  

import java.util.concurrent.locks.Lock;  

import java.util.concurrent.locks.ReentrantLock;  

import com.jeanron.test2.Storage;

    // 鎖  

    private final Lock lock = new ReentrantLock();  

    // 倉庫滿的條件變量  

    private final Condition full = lock.newCondition();  

    // 倉庫空的條件變量  

    private final Condition empty = lock.newCondition();  

        // 獲得鎖  

        lock.lock();  

        // 如果倉庫剩餘容量不足  

        while (list.size() + num > MAX_SIZE)  

            System.out.println(Thread.currentThread().getName()+"【要生産的産品數量】:" + num + "\t【庫存量】:" + list.size()  

                    + "\t暫時不能執行生産任務!");  

            try  

                // 由于條件不滿足,生産阻塞  

                full.await();  

            catch (InterruptedException e)  

                e.printStackTrace();  

        // 生産條件滿足情況下,生産num個産品  

        for (int i = 1; i <= num; ++i)  

            list.add(new Object());  

        System.out.println(Thread.currentThread().getName()+"【已經生産産品數】:" + num + "\t【現倉儲量為】:" + list.size());  

        // 喚醒其他所有線程  

        full.signalAll();  

        empty.signalAll();  

        // 釋放鎖  

        lock.unlock();  

        // 如果倉庫存儲量不足  

        while (list.size() < num)  

            System.out.println(Thread.currentThread().getName()+"【要消費的産品數量】:" + num + "\t【庫存量】:" + list.size()  

                // 由于條件不滿足,消費阻塞  

                empty.await();  

        // 消費條件滿足情況下,消費num個産品  

            list.remove();  

        System.out.println(Thread.currentThread().getName()+"【已經消費産品數】:" + num + "\t【現倉儲量為】:" + list.size());  

    // set/get方法  

測試類

測試類算是用用,我們直接使用多線程排程器Executor來做。對于生産者消費者使用随機數的方式來初始化物品數,倉庫使用單例模式。

package com.jeanron.main;

import java.util.Random;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import com.jeanron.entity1.Consumer;

import com.jeanron.entity1.Producer;

public class Test1  

    public static void main(String[] args)  

        ExecutorService exec = Executors.newCachedThreadPool();

        for(int i=0;i<5;i++){

            exec.execute(new Producer(Storage.getInstance(),new Random().nextInt(10)*10));            

        }

        for(int i=0;i<10;i++){

            exec.execute(new Consumer(Storage.getInstance(),new Random().nextInt(10)*10));            

測試結果

生産者是5個線程,消費者是10個線程,我們來看看調用之後的資訊,這個可以簡單分析下日志即可,不具有典型性和代表性。

第一類實作方式的輸出如下:

pool-1-thread-1【生産者:已經生産産品數】:10    【現倉儲量為】:10

pool-1-thread-3【生産者:已經生産産品數】:50    【現倉儲量為】:60

pool-1-thread-2【生産者:已經生産産品數】:50    【現倉儲量為】:110

pool-1-thread-5【生産者:已經生産産品數】:0    【現倉儲量為】:110

pool-1-thread-4【生産者:已經生産産品數】:80    【現倉儲量為】:190

pool-1-thread-4【消費者:已經消費産品數】:40    【現倉儲量為】:150

pool-1-thread-1【消費者:已經消費産品數】:10    【現倉儲量為】:140

pool-1-thread-3【消費者:已經消費産品數】:20    【現倉儲量為】:120

pool-1-thread-2【消費者:已經消費産品數】:10    【現倉儲量為】:110

pool-1-thread-5【消費者:已經消費産品數】:40    【現倉儲量為】:70

pool-1-thread-6【消費者:已經消費産品數】:30    【現倉儲量為】:40

pool-1-thread-4【消費者:要消費的産品數量】:90    【庫存量】:40    暫時不能執行消費任務!

pool-1-thread-1【消費者:要消費的産品數量】:50    【庫存量】:40    暫時不能執行消費任務!

pool-1-thread-3【消費者:要消費的産品數量】:90    【庫存量】:40    暫時不能執行消費任務!

pool-1-thread-7【消費者:已經消費産品數】:20    【現倉儲量為】:20

pool-1-thread-3【消費者:要消費的産品數量】:90    【庫存量】:20    暫時不能執行消費任務!

pool-1-thread-1【消費者:要消費的産品數量】:50    【庫存量】:20    暫時不能執行消費任務!

pool-1-thread-4【消費者:要消費的産品數量】:90    【庫存量】:20    暫時不能執行消費任務!

第二類實作方式的輸出如下:

pool-1-thread-1【已經生産産品數】:16    【現倉儲量為】:16

pool-1-thread-3【已經生産産品數】:60    【現倉儲量為】:76

pool-1-thread-2【已經生産産品數】:67    【現倉儲量為】:143

pool-1-thread-5【要生産的産品數量】:90    【庫存量】:143    暫時不能執行生産任務!

pool-1-thread-4【已經生産産品數】:27    【現倉儲量為】:170

pool-1-thread-1【已經消費産品數】:10    【現倉儲量為】:160

pool-1-thread-2【已經消費産品數】:23    【現倉儲量為】:137

pool-1-thread-6【已經消費産品數】:18    【現倉儲量為】:119

pool-1-thread-5【要生産的産品數量】:90    【庫存量】:119    暫時不能執行生産任務!

pool-1-thread-3【已經消費産品數】:97    【現倉儲量為】:22

pool-1-thread-1【要消費的産品數量】:57    【庫存量】:22    暫時不能執行生産任務!

pool-1-thread-6【要消費的産品數量】:62    【庫存量】:22    暫時不能執行生産任務!

pool-1-thread-3【要消費的産品數量】:35    【庫存量】:22    暫時不能執行生産任務!

pool-1-thread-4【已經消費産品數】:17    【現倉儲量為】:5

pool-1-thread-2【已經消費産品數】:1    【現倉儲量為】:4

pool-1-thread-7【要消費的産品數量】:93    【庫存量】:4    暫時不能執行生産任務!

pool-1-thread-2【已經消費産品數】:4    【現倉儲量為】:0

pool-1-thread-5【已經生産産品數】:90    【現倉儲量為】:90

pool-1-thread-1【已經消費産品數】:57    【現倉儲量為】:33

pool-1-thread-6【要消費的産品數量】:62    【庫存量】:33    暫時不能執行生産任務!

pool-1-thread-3【要消費的産品數量】:35    【庫存量】:33    暫時不能執行生産任務!

pool-1-thread-4【要消費的産品數量】:45    【庫存量】:33    暫時不能執行生産任務!

pool-1-thread-7【要消費的産品數量】:93    【庫存量】:33    暫時不能執行生産任務!

pool-1-thread-10【要消費的産品數量】:81    【庫存量】:33    暫時不能執行生産任務!

pool-1-thread-2【要消費的産品數量】:65    【庫存量】:33    暫時不能執行生産任務!

pool-1-thread-5【要消費的産品數量】:38    【庫存量】:33    暫時不能執行生産任務!

pool-1-thread-8【已經消費産品數】:33    【現倉儲量為】:0

pool-1-thread-12【要消費的産品數量】:21    【庫存量】:0    暫時不能執行生産任務!

pool-1-thread-9【要消費的産品數量】:24    【庫存量】:0    暫時不能執行生産任務!

pool-1-thread-11【要消費的産品數量】:54    【庫存量】:0    暫時不能執行生産任務!

pool-1-thread-13【要消費的産品數量】:58    【庫存量】:0    暫時不能執行生産任務!

pool-1-thread-6【要消費的産品數量】:62    【庫存量】:0    暫時不能執行生産任務!

pool-1-thread-3【要消費的産品數量】:35    【庫存量】:0    暫時不能執行生産任務!

pool-1-thread-4【要消費的産品數量】:45    【庫存量】:0    暫時不能執行生産任務!

pool-1-thread-7【要消費的産品數量】:93    【庫存量】:0    暫時不能執行生産任務!

pool-1-thread-10【要消費的産品數量】:81    【庫存量】:0    暫時不能執行生産任務!

pool-1-thread-2【要消費的産品數量】:65    【庫存量】:0    暫時不能執行生産任務!

pool-1-thread-5【要消費的産品數量】:38    【庫存量】:0    暫時不能執行生産任務!

Java實作生産者消費者的兩種方式(r12筆記第66天)