JAVA并發之阻塞隊列淺析
背景
因為在工作中經常會用到阻塞隊列,有的時候還要根據業務場景擷取重寫阻塞隊列中的方法,是以學習一下阻塞隊列的實作原理還是很有必要的。(PS:不深入了解的話,很容易使用出錯,造成沒有技術深度的樣子)
阻塞隊列是什麼?
要想了解阻塞隊列,先了解一下隊列是啥,簡單的說隊列就是一種先進先出的資料結構。(具體的内容去資料結構裡學習一下)是以阻塞隊列就是一種可阻塞的隊列。和普通的隊列的不同就展現在 ”阻塞“兩個字上。阻塞是啥意思?
百度看一下
在軟體工程裡阻塞一般指的是阻塞調用,即調用結果傳回之前,目前線程會被挂起。函數隻有在得到結果之後才會傳回。
阻塞隊列其實就是普通的隊列根據需要将某些方法改為阻塞調用。是以阻塞隊裡和普通隊裡的不同主要展現在兩個方面
當隊列是空的時,從隊列中擷取元素的操作将會被阻塞 。直到其他的線程往空的隊列插入新的元素
當隊列是滿時,往隊列裡添加元素的操作會被阻塞,直到其他的線程使隊列重新變得空閑起來,如從隊列中移除一個或者多個元素,或者完全清空隊列
為什麼要使用阻塞隊列?
那麼為什麼要使用阻塞隊列?阻塞隊列又能完成什麼特殊的任務嗎?
阻塞隊列的經典使用 場景就是“生産者”和“消費者”模型,生産者生産資料,放入隊列,然後消費從隊列中擷取資料,這個在一般情況下自然沒有問題,但如果生産者和消費者在某個時間段内,萬一發生資料處理速度不比對的情況呢?
在出現消費者速度遠大于生産者速度,消費者在資料消費至一定程度的情況下,暫停等待一下(阻塞消費者)來等待生産者,以保證生産者能夠生産出新的資料;反之亦然。
阻塞隊列在java中的一種典型使用場景是線程池,線上程池中,當送出的任務不能被立即得到執行的時候,線程池就會将送出的任務放到一個阻塞的任務隊列中來(線程池的具體使用參見之前寫的一篇文章《java并發之線程池的淺析》)
然而,在阻塞隊列釋出以前,在多線程環境下,我們每個程式員都必須去自己控制這些細節,尤其還要兼顧效率和線程安全,而這會給我們的程式帶來不小的複雜度。在這裡要感謝一下concurrent包,減輕了我們很多工作
阻塞隊列的成員有哪些
下面分别簡單介紹一下:
ArrayBlockingQueue:是一個用數組實作的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。構造時必須傳入的參數是數組大小此外還可以指定是否公平性。【注:每一個線程在擷取鎖的時候可能都會排隊等待,如果在等待時間上,先擷取鎖的線程的請求一定先被滿足,那麼這個鎖就是公平的。反之,這個鎖就是不公平的。公平的擷取鎖,也就是目前等待時間最長的線程先擷取鎖】;在插入或删除元素時不會産生或銷毀任何額外的對象執行個體
LinkedBlockingQueue:一個由連結清單結構組成的有界隊列,照先進先出的順序進行排序 ,未指定長度的話,預設 此隊列的長度為Integer.MAX_VALUE。。【PS:如果生産者的速度遠遠大于消費者的速度,也許還沒有等到隊列滿阻塞産生,系統記憶體就有可能已經被消耗殆盡了。】PriorityBlockingQueue: 一個支援線程優先級排序的無界隊列,預設自然序進行排序,也可以自定義實作compareTo()方法來指定元素排序規則,不能保證同優先級元素的順序。
LinkedBlockingQueue之是以能夠高效的處理并發資料,是因為take()方法和put(E param)方法使用了不同的可重入鎖,分别為private final ReentrantLock putLock和private final ReentrantLock takeLock,這也意味着在高并發的情況下生産者和消費者可以并行地操作隊列中的資料,以此來提高整個隊列的并發性能
LinkedBlockingQueue在插入元素是會建立一個額外的Node對象,是以它這在長時間内需要高效并發地處理大批量資料的系統中,對于GC的還是存在一定的影響。
DelayQueue: 一個實作PriorityBlockingQueue實作延遲擷取的無界隊列,在建立元素時,可以指定多久才能從隊列中擷取目前元素。隻有延時期滿後才能從隊列中擷取元素。(DelayQueue可以運用在以下應用場景:1.緩存系統的設計:可以用DelayQueue儲存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中擷取元素時,表示緩存有效期到了。2.定時任務排程。使用DelayQueue儲存當天将會執行的任務和執行時間,一旦從DelayQueue中擷取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實作的。)
SynchronousQueue: 一個不存儲元素的阻塞隊列,每一個put操作必須等待take操作,否則不能添加元素。支援公平鎖和非公平鎖。SynchronousQueue的一個使用場景是線上程池裡。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要(新任務到來時)建立新的線程,如果有空閑線程則會重複使用,線程空閑了60秒後會被回收。
LinkedTransferQueue: 一個由連結清單結構組成的無界阻塞隊列,相當于其它隊列,LinkedTransferQueue隊列多了transfer和tryTransfer方法。
LinkedBlockingDeque: 一個由連結清單結構組成的雙向阻塞隊列。隊列頭部和尾部都可以添加和移除元素,多線程并發時,可以将鎖的競争最多降到一半。
阻塞隊列的核心方法
阻塞對隊列的核心方法主要是插入操作操作和取出操作,如下
Throws Exception 類型的插入和取出在不能立即被執行的時候就會抛出異常。
Special Value 類型的插入和取出在不能被立即執行的情況下會傳回一個特殊的值(true 或者 false 或者null)
Blocked 類型的插入和取出操作在不能被立即執行的時候會阻塞線程直到可以操作的時候會被其他線程喚醒
Timed out 類型的插入和取出操作在不能立即執行的時候會被阻塞一定的時候,如果在指定的時間内沒有被執行,那麼會傳回一個特殊值
插入操作
boolean offer(E e):将指定元素插入此隊列中(如果立即可行且不會違反容量限制),成功時傳回 true,如果目前沒有可用的空間,則傳回 false。(本方法不阻塞目前執行方法的線程)。
boolean offer(E o, long timeout, TimeUnit unit):可以設定等待的時間,如果在設定的指定的時間内,還不能往隊列中加入BlockingQueue,則傳回false。
void put(E paramE) throws InterruptedException:将指定元素插入到此隊列中裡,如果隊列沒有空間,則調用此方法的線程被阻斷直到隊列裡裡面有空間再繼續執行插入操作。
public boolean add(E e): 将指定元素插入此隊列中(如果立即可行且不會違反容量限制),成功時傳回 true,如果目前沒有可用的空間,則抛出 IllegalStateException(其實就是調用了offer方法)。
複制代碼
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
擷取操作
poll():取走BlockingQueue裡排在首位的對象,,取不到時傳回null;
poll(long timeout, TimeUnit unit):在指定時間内從BlockingQueue取出一個隊首的對象,隊列一旦有資料可取,則立即傳回隊列中的資料。否則直到時間逾時還沒有資料可取,傳回null。
take():取走BlockingQueue裡排在首位的對象,若BlockingQueue為空,阻斷進入等待狀态直到BlockingQueue有新的資料被加入;
drainTo(Collection<? super E> c, int maxElements):一次性從BlockingQueue擷取所有可用的資料對象,将資料對象加入傳遞的集合中(還可以通過maxElements指定擷取資料的個數),通過該方法,可以提升擷取資料效率;不需要多次分批加鎖或釋放鎖
阻塞隊列的實作原理
前面介紹了非阻塞隊列和阻塞隊列中常用的方法,下面來探讨阻塞隊列的實作原理,本文以比較常用的ArrayBlockingQueue為例,其他阻塞隊列實作原理根據特性會和ArrayBlockingQueue有一些差别,但是大體思路應該類似,有興趣的朋友可自行檢視其他阻塞隊列的實作源碼。
首先看一下ArrayBlockingQueue的幾個關鍵成員變量
public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable {
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
從上邊可以明顯的看出ArrayBlockingQueue用一個數組來存儲資料,takeIndex和putIndex分别表示隊首元素和隊尾元素的下标,count表示隊列中元素的個數。 lock是一個可重入鎖,notEmpty和notFull是等待條件。
然後看它的一個關鍵方法的實作:put()
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
首選檢查元素是否為空,為空則抛出異常
接着執行個體化可重入鎖
然後localReentrantLock.lockInterruptibly();這裡特别強調一下 (lockInterruptibly()允許在等待時由其他線程的Thread.interrupt()方法來中斷等待線程而直接傳回,這時是不用擷取鎖的,而會抛出一個InterruptException。 而ReentrantLock.lock()方法則不允許Thread.interrupt()中斷,即使檢測到了Thread.interruptted一樣會繼續嘗試擷取鎖,失敗則繼續休眠。隻是在最後擷取鎖成功之後在把目前線程置為中斷狀态)
判斷目前元素個數是否等于數組的長度,如果相等,則調用notFull.await()進行等待,即當隊列滿的時候,将會等待
将元素插入到隊列中
解鎖(這裡一定要在finally中解鎖啊!!!)
enqueue(E x)将元素插入到數組啊item中
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
該方法内部通過putIndex索引直接将元素添加到數組items中
這裡思考一個問題 為什麼當putIndex索引大小等于數組長度時,需要将putIndex重新設定為0?
這是因為當隊列是先進先出的 是以擷取元素總是從隊列頭部擷取,而添加元素從中從隊列尾部擷取。是以當隊列索引(從0開始)與數組長度相等時,是以下次我們就需要從數組頭部開始添加了;
最後當插入成功後,通過notEmpty喚醒正在等待取元素的線程
阻塞隊列中和put對應的就是take了
下邊是take方法的實作
public E take() throws InterruptedException {
try {
while (count == 0)
notEmpty.await();
return dequeue();
finally {
lock.unlock();
}
take方法其實很簡單,隊列中有資料就删除沒有就阻塞,注意這個阻塞是可以中斷的,如果隊列沒有資料那麼就加入notEmpty條件隊列等待(有資料就直接取走,方法結束),如果有新的put線程添加了資料,那麼put操作将會喚醒take線程;
可以看到take的實作跟put方法實作很類似,隻不過put方法等待的是notFull信号,而take方法等待的是notEmpty信号。(等的就是上文的put中的信号)當數組的數量為空時,也就是無任何資料可以被取出來的時候,notEmpty這個Condition就會進行阻塞,直到被notEmpty喚醒
dequeue的實作如下
private E dequeue() {
final Object[] items = this.items;
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
take方法主要是從隊列頭部取元素,可以看到takeIndex是取元素的時候的偏移值,而put中是putIndex控制添加元素的偏移量,由此可見,put和take操作的偏移量分别是由putIndex和takeIndex控制的。其實仔細觀察put和take的實作思路是有很多相似之處。
offer(E o, long timeout, TimeUnit unit)的實作方式其實和put的思想是差不多的差別是 offer在阻塞的時候調用的不是await()方法而是awaitNanos(long nanosTimeout) 帶逾時響應的等待(PS:具體差別可以參考我之前寫的關于鎖的部落格《JAVA并發之鎖的使用淺析》)
poll(long timeout, TimeUnit unit)的實作也是這樣在take的基礎上加了逾時響應。感興趣的朋友可以自行去看一下
案例分析
模拟食堂的經曆,食堂視窗端出一道菜放在台面,然後等待顧客消費。寫到代碼裡就是食堂視窗就是一個生産者線程,顧客就是消費者線程,台面就是阻塞隊列。
-
View Code
結果部分如下
可以看到當生産者産生的資料達到阻塞隊列的容量時,生成者線程會阻塞,等待消費者線程進行消費,上述案例中最大容量為8個盤子,是以當食堂做好了8個菜後了8會等待顧客進行消費,消費後繼續生産。上述案例使用阻塞隊列,看起來代碼要簡單得多,不需要再單獨考慮同步和線程間通信的問題。
在并發程式設計中,一般推薦使用阻塞隊列,這樣實作可以盡量地避免程式出現意外的錯誤。
阻塞隊列使用最經典的場景就是socket用戶端資料的讀取和解析,讀取資料的線程不斷将資料放入隊列,然後解析線程不斷從隊列取資料解析。還有其他類似的場景,如線程池中就使用了阻塞隊列,其實隻要符合生産者-消費者模型的都可以使用阻塞隊列。
參考資料:
《Java程式設計思想》
https://www.cnblogs.com/dolphin0520/p/3932906.html https://www.cnblogs.com/superfj/p/7757876.html原文位址
https://www.cnblogs.com/NathanYang/p/11276428.html