天天看點

[Java并發程式設計實戰] 阻塞隊列 BlockingQueue(含代碼,生産者-消費者模型)

見賢思齊焉,見不賢而内自省也。—《論語》

Java5.0 增加了兩種新的容器類型,它們是指:Queue 和 BlockingQueue。Queue 用來臨時儲存一組等待處理的元素。BlockingQueue 擴張了 Queue 接口,增加了可阻塞的插入和擷取等操作。

BlockingQueue 通常運用于一個線程生産對象放入隊列,另一個線程從隊列擷取對象并消費,這是典型的生産者消費者模型。

[Java并發程式設計實戰] 阻塞隊列 BlockingQueue(含代碼,生産者-消費者模型)

生産者線程持續生産新對象并插入隊列,如果隊列已滿,那麼插入對象的操作會一直阻塞,直到隊列中出現可用的空間。

消費者線程持續從隊列擷取對象,如果隊列為空,那麼擷取操作會一直阻塞,直到隊列中出現可用的新對象。BlockingQueue 簡化了生産者-消費者設計的實作過程,它支援任意數量的生産者和消費者。

BlockingQueue 的核心方法:

[Java并發程式設計實戰] 阻塞隊列 BlockingQueue(含代碼,生産者-消費者模型)

offer(E): 向隊列插入元素,并傳回插入成功與否。本方法不阻塞目前執行線程。

put(E) : 向隊列插入元素,如果隊列已滿,則會阻塞目前線程直至元素加入隊列。

take() : 擷取隊列的首位元素,如果隊列為空,則阻塞目前線程直至隊列有元素并取走。

poll():擷取隊列首個元素,指定時間内一旦資料可取,則立即傳回;否則傳回失敗。

remove(E):删除隊列中的元素,傳回成功與否。

BlockingQueue 的實作

BlockingQueue是一個接口,是以你必須使用它的實作來使用它。它的實作包括以下幾個:

  • ArrayBlockingQueue:基于數組實作的有界隊列(FIFO),使用一把全局鎖并行對 queue 讀寫操作,同時使用兩個 Condition 阻塞隊列為空時的取操作和隊列為滿時的寫操作。
  • LinkedBlockingQueue:基于已連結節點的,範圍上限為 Integer.MAX_VALUE 的 blocking queue(FIFO)。主要操作 put 和 take 都是阻塞的。
  • DelayQueue:當指定的延遲時間到了,才能夠從隊列中擷取元素。它沒有大小限制,是以插入元素時不會阻塞,而隻有擷取元素時才會被阻塞。它的用法可以參考下面兩篇部落格:

    http://www.cnblogs.com/jobs/archive/2007/04/27/730255.html,

    http://www.cnblogs.com/sunzhenchao/p/3515085.html

  • PriorityBlockingQueue: 基于優先級的阻塞隊列,但需要注意的是PriorityBlockingQueue并不會阻塞資料生産者,而隻會在沒有可消費的資料時,阻塞資料的消費者。
  • SynchronoutQueue:它的内部同時隻能夠容納單個元素。如果該隊列已有一個元素的話,試圖向隊列插入一個新元素線程會阻塞,知道另一個線程将該元素從隊列中拿走。同樣,如果該隊列為空,試圖向隊列中抽取一個元素的線程将會阻塞,知道另一個線程向隊列中插入一個新的元素。SynchronousQueue适合一對一的比對場景,沒有容量,無法緩存。它的用法強烈推薦部落格:

    http://www.cnblogs.com/leesf456/p/5560362.html

BlockingQueue的使用

這是一個使用 BlockingQueue 的例子,本例使用 ArrayBlockingQueue 實作。首先,BlockingQueueTest 建立一個生産者線程 Procucer, 把字元存放進共享隊列。然後建立三個消費者線程 Consumer,把字元串從隊列中取出。Consumer 取到最後一個字元串時,中斷所有消費者線程,結束程式。

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTest {
    //隊列容量
    private static final int SIZE = ;
    private static final int CONSUMER_SIZE = ;
    //消費者線程退出标志
    private static String endString = "num:" + (SIZE*-);
    //存放消費者線程
    private static List list = new ArrayList<Thread>();

    public static void main(String[] args) throws Exception{
        //建立固定長度的阻塞隊列
        BlockingQueue q = new ArrayBlockingQueue<String>(SIZE);

        //建立生産者
        Producer producer = new Producer(q);
        //啟動生産者線程,生産對象
        producer.start();

        //啟動消費者線程,擷取隊列對象
        for(int i = ; i < CONSUMER_SIZE; i++) {
            list.add(new Consumer(q));
            ((Thread) list.get(i)).start();
        }
    }

    //中斷線程
    public static void shutDownThread() {
        for(int i = ; i < CONSUMER_SIZE; i++) {
            ((Thread) list.get(i)).interrupt();
        }
    }

    static class Producer extends Thread{

        private BlockingQueue queue = null;

        public Producer(BlockingQueue q) {
            this.queue = q;
        }

        @Override
        public void run() {
            // TODO Auto-generated method stub
            try {
                //生産10個對象,放進隊列
                for(int i = ; i < SIZE*; i++) {
                    String str = "num:" + i;
                    System.out.println(Thread.currentThread().getName() +":"+"IN: " + str);
                    queue.put(str);
                    Thread.sleep();
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    }

    //消費者線程
    static class Consumer extends Thread{

        private BlockingQueue queue = null;

        public Consumer(BlockingQueue q) {
            this.queue = q;
        }

        @Override
        public void run() {
            // TODO Auto-generated method stub
            try {
                //擷取隊列的元素,隊列為空時會阻塞
                while(true) {
                    String str = (String) queue.take();
                    System.out.println(Thread.currentThread().getName() + ":" + "OUT " + str);
                    //已經取出最後一個元素,消費者線程應該退出,否則程式一直在運作
                    if(str.equals(endString)) {
                        shutDownThread();//中斷線程
                    }
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

           

執行結果:

[Java并發程式設計實戰] 阻塞隊列 BlockingQueue(含代碼,生産者-消費者模型)

總結

java.util.concurrent 中實作的阻塞隊列包含了足夠的同步機制,進而能夠安全的将對象從生産者線程釋出到消費者線程。對于可變對象,生産者-消費者模型,把對象的所有權安全的從生産者傳遞給消費者。轉移後,消費者線程獲得這個對象的所有權(獨占通路權,可以任意修改它),并且生産者線程不會再通路它。同時,阻塞隊列負責所有的控制流,使得消費者生産者的代碼更加簡單和清晰。

本文完畢,如對你有幫助,請關注我,謝謝~

參考

  1. https://blog.csdn.net/defonds/article/details/44021605/
  2. http://www.cnblogs.com/leesf456/p/5428630.html
  3. 《并發程式設計實戰》
    本文原創首發于微信公衆号 [ 林裡少年 ],歡迎關注第一時間擷取更新。
    [Java并發程式設計實戰] 阻塞隊列 BlockingQueue(含代碼,生産者-消費者模型)

繼續閱讀