在JAVA的Concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全“傳輸”資料的問題。通過這些高效并且線程安全的隊列類,為我們快速搭建高品質的多線程程式帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場景。
一、認識BlockingQueue
阻塞隊列,顧名思義,首先它是一個隊列,而一個隊列在資料結構中所起的作用大緻如下圖所示:

從上圖我們可以很清楚看到,通過一個共享的隊列,可以使得資料由隊列的一端輸入,從另外一端輸出;
常用的隊列主要有以下兩種:(當然通過不同的實作方式,還可以延伸出很多不同類型的隊列,DelayQueue就是其中的一種)
先進先出(FIFO):先插入的隊列的元素也最先出隊列,類似于排隊的功能。從某種程度上來說這種隊列也展現了一種公平性。
後進先出(LIFO):後插入隊列的元素最先出隊列,這種隊列優先處理最近發生的事件。
多線程環境中,通過隊列可以很容易實作資料共享,比如經典的“生産者”和“消費者”模型中,通過隊列可以很便利地實作兩者之間的資料共享。假設我們有若幹生産者線程,另外又有若幹個消費者線程。如果生産者線程需要把準備好的資料共享給消費者線程,利用隊列的方式來傳遞資料,就可以很友善地解決他們之間的資料共享問題。但如果生産者和消費者在某個時間段内,萬一發生資料處理速度不比對的情況呢?理想情況下,如果生産者産出資料的速度大于消費者消費的速度,并且當生産出來的資料累積到一定程度的時候,那麼生産者必須暫停等待一下(阻塞生産者線程),以便等待消費者線程把累積的資料處理完畢,反之亦然。然而,在concurrent包釋出以前,在多線程環境下,我們每個程式員都必須去自己控制這些細節,尤其還要兼顧效率和線程安全,而這會給我們的程式帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大BlockingQueue。(在多線程領域:所謂阻塞,在某些情況下會挂起線程(即阻塞),一旦條件滿足,被挂起的線程又會自動被喚醒)下面兩幅圖示範了BlockingQueue的兩個常見阻塞場景:
如上圖所示:當隊列中填滿資料的情況下,生産者端的所有線程都會被自動阻塞(挂起),直到隊列中有空的位置,線程被自動喚醒。
這也是我們在多線程環境下,為什麼需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什麼時候需要阻塞線程,什麼時候需要喚醒線程,因為這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大,讓我們一起來見識下它的常用方法:
二、BlockingQueue定義的常用方法
抛出異常 | 特殊值 | 阻塞 | 逾時 | |
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
檢查 | element() | peek()/isEmpty() | 無 |
其中: null被用作訓示poll操作失敗的警戒值。
1) 插入對象:
BlockingQueue不接受null對象。試圖add、put或offer一個null對象時,抛出NullPointerException。
add(object):把object加到BlockingQueue裡,如果BlockingQueue可以容納,則傳回true,否則抛出異常。
offer(object):把object加到BlockingQueue裡,如果BlockingQueue可以容納,則傳回true,否則傳回false,本方法不阻塞目前執行方法的線程,offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間内,不能加入BlockingQueue,則傳回false。 put(object):把object加到BlockingQueue裡,如果BlockQueue沒有空間,則調用此方法的線程被阻塞,直到BlockingQueue裡面有空間再繼續加到BlockingQueue。
2) 擷取對象:
poll(time):取走BlockingQueue裡排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時傳回null;poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間内,隊列一旦有資料可取,則立即傳回隊列中的資料。否則超過時間還沒有資料可取,傳回null。
take():取走BlockingQueue裡排在首位的對象,若BlockingQueue為空,阻塞去資料線程進入等待狀态直到BlockingQueue有新加入的對象被取走;
drainTo():一次性從BlockingQueue擷取所有可用的資料對象,也可以一次性從BlockingQueue擷取指定個數的資料對象),通過該方法,可以提升擷取資料效率;不需要多次分批加鎖或釋放鎖。
3) 檢查對象
element() 擷取BlockingQueue裡排在首位的對象并不從隊列中移除。如果隊列沒有對象,抛出異常。
peek()擷取BlockingQueue裡排在首位的對象并不從隊列中移除。如果隊列沒有對象,傳回null對象。
isEmpty() 判斷BlockingQueue裡是否有對象,如果為空,傳回true,如果不為空,傳回false。
三、常見BlockingQueue
在了解了BlockingQueue的基本功能後,讓我們來看看BlockingQueue家庭大緻有哪些成員?
1. ArrayBlockingQueue
基于數組的阻塞隊列實作,在ArrayBlockingQueue内部,維護了一個定長數組,以便緩存隊列中的資料對象,這是一個常用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue内部還儲存着兩個整形變量,分别辨別着隊列的頭部和尾部在數組中的位置。ArrayBlockingQueue在生産者放入資料和消費者擷取資料,都是共用同一個鎖對象,由此也意味着兩者無法真正并行運作,這點尤其不同于LinkedBlockingQueue;按照實作原理來分析,ArrayBlockingQueue完全可以采用分離鎖,進而實作生産者和消費者操作的完全并行運作。Doug Lea之是以沒這樣去做,也許是因為ArrayBlockingQueue的資料寫入和擷取操作已經足夠輕巧,以至于引入獨立的鎖機制,除了給代碼帶來額外的複雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在于,前者在插入或删除元素時不會産生或銷毀任何額外的對象執行個體,而後者則會生成一個額外的Node對象。這在長時間内需要高效并發地處理大批量資料的系統中,其對于GC的影響還是存在一定的差別。而在建立ArrayBlockingQueue時,我們還可以控制對象的内部鎖是否采用公平鎖,預設采用非公平鎖。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
// 聲明一個容量為10的緩存隊列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
queue.add("sss");
//測試添加null對象
try{
queue.add(null);
}
catch(Exception ee){
ee.printStackTrace();
}
try{
queue.offer(null);
}
catch(Exception ee){
ee.printStackTrace();
}
try{
queue.put(null);
}
catch(Exception ee){
ee.printStackTrace();
}
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue);
// 借助Executors
ExecutorService service = Executors.newCachedThreadPool();
// 啟動線程
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer);
// 執行10s
Thread.sleep(10 * 1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(2000);
// 退出Executor
service.shutdown();
}
}import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 消費者線程
*/
public class Consumer implements Runnable {
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
System.out.println("啟動消費者線程!");
Random r = new Random();
boolean isRunning = true;
try {
while (isRunning) {
System.out.println("正從隊列擷取資料...");
String data = queue.poll(2, TimeUnit.SECONDS);
if (null != data) {
System.out.println("拿到資料:" + data);
System.out.println("正在消費資料:" + data);
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
} else {
// 超過2s還沒資料,認為所有生産線程都已經退出,自動退出消費線程。
isRunning = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出消費者線程!");
}
}
private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
}
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 生産者線程
*
* @author jackyuj
*/
public class Producer implements Runnable {
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
String data = null;
Random r = new Random();
System.out.println("啟動生産者線程!");
try {
while (isRunning) {
System.out.println("正在生産資料...");
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
data = "data:" + count.incrementAndGet();
System.out.println("将資料:" + data + "放入隊列...");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.out.println("放入資料失敗:" + data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出生産者線程!");
}
}
public void stop() {
isRunning = false;
}
private volatile boolean isRunning = true;
private BlockingQueue queue;
private static AtomicInteger count = new AtomicInteger();
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
}
2. LinkedBlockingQueue
基于連結清單的阻塞隊列,同ArrayListBlockingQueue類似,其内部也維持着一個資料緩沖隊列(該隊列由一個連結清單構成),當生産者往隊列中放入一個資料時,隊列會從生産者手中擷取資料,并緩存在隊列内部,而生産者立即傳回;隻有當隊列緩沖區達到最大值緩存容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生産者隊列,直到消費者從隊列中消費掉一份資料,生産者線程會被喚醒,反之對于消費者這端的處理也基于同樣的原理。而LinkedBlockingQueue之是以能夠高效的處理并發資料,還因為其對于生産者端和消費者端分别采用了獨立的鎖來控制資料同步,這也意味着在高并發的情況下生産者和消費者可以并行地操作隊列中的資料,以此來提高整個隊列的并發性能。
作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生産者的速度一旦大于消費者的速度,也許還沒有等到隊列滿阻塞産生,系統記憶體就有可能已被消耗殆盡了。
ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞隊列,一般情況下,在處理多線程間的生産者消費者問題,使用這兩個類足以。
3. DelayQueue
DelayQueue中的元素隻有當其指定的延遲時間到了,才能夠從隊列中擷取到該元素。DelayQueue是一個沒有大小限制的隊列,是以往隊列中插入資料的操作(生産者)永遠不會被阻塞,而隻有擷取資料的操作(消費者)才會被阻塞。
使用場景:DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個逾時未響應的連接配接隊列。
4. PriorityBlockingQueue
基于優先級的阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定),但需要注意的是PriorityBlockingQueue并不會阻塞資料生産者,而隻會在沒有可消費的資料時,阻塞資料的消費者。是以使用的時候要特别注意,生産者生産資料的速度絕對不能快于消費者消費資料的速度,否則時間一長,會最終耗盡所有的可用堆記憶體空間。在實作PriorityBlockingQueue時,内部控制線程同步的鎖采用的是公平鎖。
PriorityBlockingQueue裡面存儲的對象必須是實作Comparable接口。隊列通過這個接口的compare方法确定對象的priority。
5. SynchronousQueue
一種無緩沖的等待隊列,類似于無中介的直接交易,有點像原始社會中的生産者和消費者,生産者拿着産品去集市銷售給産品的最終消費者,而消費者必須親自去集市找到所要商品的直接生産者,如果一方沒有找到合适的目标,那麼對不起,大家都在集市等待。相對于有緩沖的BlockingQueue來說,少了一個中間經銷商的環節(緩沖區),如果有經銷商,生産者直接把産品批發給經銷商,而無需在意經銷商最終會将這些産品賣給那些消費者,由于經銷商可以庫存一部分商品,是以相對于直接交易模式,總體來說采用中間經銷商的模式會吞吐量高一些(可以批量買賣);但另一方面,又因為經銷商的引入,使得産品從生産者到消費者中間增加了額外的交易環節,單個産品的及時響應性能可能會降低。
聲明一個SynchronousQueue有兩種不同的方式,它們之間有着不太一樣的行為。公平模式和非公平模式的差別:
如果采用公平模式:SynchronousQueue會采用公平鎖,并配合一個FIFO隊列來阻塞多餘的生産者和消費者,進而體系整體的公平政策;
但如果是非公平模式(SynchronousQueue預設):SynchronousQueue采用非公平鎖,同時配合一個LIFO隊列來管理多餘的生産者和消費者,而後一種模式,如果生産者和消費者的處理速度有差距,則很容易出現饑渴的情況,即可能有某些生産者或者是消費者的資料永遠都得不到處理。
6.小結
BlockingQueue不光實作了一個完整隊列所具有的基本功能,同時在多線程環境下,他還自動管理了多線間的自動等待于喚醒功能,進而使得程式員可以忽略這些細節,關注更進階的功能。
ArrayBlockingQueue:規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的;
LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序;
PriorityBlockingQueue:類似于LinkedBlockQueue,但其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數的Comparator決定的順序;
SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的;
其中LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的資料結構不一樣,導緻LinkedBlockingQueue的資料吞吐量要大于ArrayBlockingQueue,
但線上程數量很大時其性能的可預見性低于ArrayBlockingQueue。
四、BlockingQueue的幾個注意點
1.BlockingQueue可以是限定容量的。
它在任意給定時間都可以有一個remainingCapacity,超出此容量,便無法無阻塞地put 附加元素。沒有任何内部容量限制的BlockingQueue 總是報告Integer.MAX_VALUE 的剩餘容量。
2.BlockingQueue實作主要用于生産者-使用者隊列,還支援Collection接口。
舉例來說,使用remove(x)從隊列中移除任意一個元素是有可能的。然而,這種操作通常不會有效執行,隻能有計劃地偶爾使用,比如在取消排隊資訊時。
3.BlockingQueue實作是線程安全的。
所有排隊方法都可以使用内部鎖或其他形式的并發控制來自動達到它們的目的。然而,大量的Collection 操作(addAll、containsAll、retainAll 和removeAll)沒有必要自動執行,除非在實作中特别說明。是以,舉例來說,在隻添加了c中的一些元素後,addAll(c)有可能失敗(抛出一個異常)。
4.BlockingQueue實質上不支援使用任何一種“close”或“shutdown”操作來訓示不再添加任何項。
這種功能的需求和使用有依賴于實作的傾向。例如,一種常用的政策是:對于生産者,插入特殊的end-of-stream 或poison 對象,并根據使用者擷取這些對象的時間來對它們進行解釋。
參考文章:http://wsmajunfeng.iteye.com/blog/1629354
http://zzhonghe.iteye.com/blog/826757
http://blog.csdn.net/xin_jmail/article/details/26157971