1.何為阻塞隊列
阻塞隊列(BlockingQueue)是一個支援兩個附加操作的隊列。這兩個附加的操作支援阻塞的插入和移除方法。
- 支援阻塞的插入方法:意思是當隊列滿(無界隊列除外)時,隊列會阻塞插入元素的線程,直到隊列不滿。
- 支援阻塞的移除方法:意思是在隊列為空時,擷取元素的線程會等待隊列變為非空。
阻塞隊列常用于生産者和消費者的場景,生産者是向隊列裡添加元素的線程,消費者是從隊列裡取元素的線程。阻塞隊列就是生産者用來存放元素、消費者用來擷取元素的容器。阻塞隊列對插入和移除兩個附加操作提供了4種處理方式。

- 抛出異常:當隊列滿時,如果再往隊列裡插入元素,會抛出IllegalStateException("Queuefull")異常。當隊列空時,從隊列裡擷取元素會抛出NoSuchElementException異常。
- ·傳回特殊值:當往隊列插入元素時,會傳回元素是否插入成功,成功傳回true。如果是移除方法,則是從隊列裡取出一個元素,如果沒有則傳回null。
- ·一直阻塞:當阻塞隊列滿時,如果生産者線程往隊列裡put元素,隊列會一直阻塞生産者線程,直到隊列可用或者響應中斷退出。當隊列空時,如果消費者線程從隊列裡take元素,隊列會阻塞住消費者線程,直到隊列不為空。
- ·逾時退出:當阻塞隊列滿時,如果生産者線程往隊列裡插入元素,隊列會阻塞生産者線程一段時間,如果超過了指定的時間,生産者線程就會退出。
注意:如果是無界阻塞隊列,隊列不可能會出現滿的情況,是以使用put或offer方法永 遠不會被阻塞,而且使用offer方法時,該方法永遠傳回true。
2.阻塞隊列
2.1 分類
- ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
- LinkedBlockingQueue:一個由連結清單結構組成的有界阻塞隊列。
- PriorityBlockingQueue:一個支援優先級排序的無界阻塞隊列。
- DelayQueue:一個使用優先級隊列實作的無界阻塞隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。
- LinkedTransferQueue:一個由連結清單結構組成的無界阻塞隊列。
- LinkedBlockingDeque:一個由連結清單結構組成的雙向阻塞隊列。
2.2 七種阻塞隊列使用詳解
- ArrayBlockingQueue:是一個用數組實作的有界阻塞隊列。此隊列按照先進先出的原則對元素進行排序。預設情況下不保證線程公平的通路隊列,所謂公平通路隊列是指阻塞的線程,可以按照阻塞的先後順序通路隊列,即先阻塞線程先通路隊列。非公平性是對先等待的線程是非公平的,當隊列可用時,阻塞的線程都可以争奪通路隊列的資格,有可能先阻塞的線程最後才通路隊列。為了保證公平性,通常會降低吞吐量。通路者的公平性是使用可重入鎖實作的。
public class TestBlockingQueue { public static void main(String[] args) { int capacity=1000;//有界隊列元素容量,必須>=1 ArrayBlockingQueue<String> arr1=new ArrayBlockingQueue<String>(capacity); ArrayBlockingQueue<String> arr2=new ArrayBlockingQueue<String>(capacity,true);//boolean值表示是否采用公平性原則 } }
- LinkedBlockingQueue:LinkedBlockingQueue是一個用連結清單實作的有界阻塞隊列。此隊列的預設和最大長度為 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。
- PriorityBlockingQueue:是一個支援優先級的無界阻塞隊列。預設情況下元素采取自然順序升序排列。也可以自定義類實作compareTo()方法來指定元素排序規則,或者初始化 PriorityBlockingQueue時,指定構造參數Comparator來對元素進行排序。需要注意的是不能保證 同優先級元素的順序。
- DelayQueue:DelayQueue是一個支援延時擷取元素的無界阻塞隊列。隊列使用PriorityQueue來實作。隊 列中的元素必須實作Delayed接口(可以參考ScheduledThreadPoolExecutor 裡ScheduledFutureTask類的實作),在建立元素時可以指定多久才能從隊列中擷取目前元素。 隻有在延遲期滿時才能從隊列中提取元素。該阻塞隊列非常有用,比如可以設計緩存系統,使用一個線程循環查詢 DelayQueue,一旦能從DelayQueue中擷取元素時,表示緩存有效期到了;還可以做定時任務排程,使用DelayQueue儲存當天将會執行的任務和執行時間,一旦從 DelayQueue中擷取到任務就開始執行,比如TimerQueue就是使用DelayQueue實作的。
- SynchronousQueue:SynchronousQueue是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作,否則不能繼續添加元素。它支援公平通路隊列。預設情況下線程采用非公平性政策通路隊列。使用以下構造方法可以建立公平性通路的SynchronousQueue,如果設定為true,則等待的線程會采用先進先出的順序通路隊列。可以将該隊列了解為一個傳球手,負責把生産者線程處理的資料直接傳遞給消費者線程。隊列本身并不存儲任何元素,非常适合傳遞性場景。
- LinkedTransferQueue:LinkedTransferQueue是一個由連結清單結構組成的無界阻塞TransferQueue隊列。相對于其他阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。
- 使用transfer方法:如果目前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法 時),transfer方法可以把生産者傳入的元素立transfer(傳輸)給消費者。如果沒有消費者在等 待接收元素,transfer方法會将元素存放在隊列的tail節點,并等到該元素被消費者消費了才返 回。
- tryTransfer方法:tryTransfer方法是用來試探生産者傳入的元素是否能直接傳給消費者。如果沒有消費者等 待接收元素,則傳回false。和transfer方法的差別tryTransfer方法無論消費者是否接收,方法 立即傳回,而transfer方法是必須等到消費者消費了才傳回。 對于帶有時間限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,試圖把生産者傳入 的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間再傳回,如果超 時還沒消費元素,則傳回false,如果在逾時時間内消費了元素,則傳回true。
- LinkedBlockingDeque:LinkedBlockingDeque是一個由連結清單結構組成的雙向阻塞隊列。雙向隊列因為多了一個操作隊列的入口,在多線程同時入隊 時,也就減少了一半的競争。相比其他的阻塞隊列,LinkedBlockingDeque多了addFirst、 addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First單詞結尾的方法,表示插入、 擷取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入、擷取或移除雙 端隊列的最後一個元素。另外,插入方法add等同于addLast,移除方法remove等效于 removeFirst。但是take方法卻等同takeFirst
3.阻塞隊列實作原理
JDK通過通知模式實作。所謂通知模式,就是當生産者往滿的隊列裡添加元素時會阻塞住生産者,當消費者消費了一個隊列中的元素後,會通知生産者目前隊列可用。通過檢視JDK源碼 發現ArrayBlockingQueue使用了Condition來實作。也就是通過await/signal來實作。
final Object[] items;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
int count;
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}