概述
上一篇文章
jdk11源碼--ReentrantLock之Condition源碼分析中分析了ReentrantLock和Condition的源碼,那麼接下來看一下Condition在JDK中的具體應用。
ArrayBlockingQueue底層就是使用Condition來實作的。
BlockingQueue
BlockingQueue 阻塞隊列,該類是一個接口,平時我們熟知的ArrayBlockingQueue,LinkedBlockingQueue等都是該接口的實作。
BlockingQueue 之是以說是阻塞的,是因為他可以在隊列為空的時候,擷取元素的線程會阻塞,直到有新的元素添加進來。當隊列滿時,添加元素的線程會阻塞,直到有線程從隊列中取走了元素。這也是注明的==生産者消費者==問題。
當有面試官問你==生産者消費者==問題時,直接将ArrayBlockingQueue源碼分析講一下也是可以的。
看一下BlockingQueue的類圖,BlockingQueue是繼承自Queue,而queue繼承自collection。

BlockingQueue 對插入、删除、擷取原色的操作提供了四種不同的方法用于不同的場景中使用,這些方法總結在下表中:
抛出異常Throws exception | Special value | ==Blocks== | Times out | |
---|---|---|---|---|
插入資料Insert | add(e) //隊列滿時抛異常 | offer(e) //隊列滿傳回false,不阻塞 | ==put(e)== //隊列滿時阻塞,直到隊列未滿時再插入 | offer(e, time, unit) //指定直接内可以插入傳回true,指定時間内不能插入,傳回false |
擷取資料Remove | remove()//隊列為空,抛異常 | poll()//隊列為空傳回null,不阻塞 | ==take()== //當隊列為空時會阻塞,一直等到隊列不為空時再傳回隊首值 | poll(time, unit) //在指定時間内,隊列都是空,則傳回null,否則傳回對首的值 |
Examine | element() | peek() |
本文我們重點關注 put和take方法,因為這兩個方法時阻塞的。
ArrayBlockingQueue
ArrayBlockingQueue是BlockingQueue的一個實作。他是FIFO先進先出隊列。
ArrayBlockingQueue類圖:
重要屬性:
/** 隊列集合,數組存儲!! */
final Object[] items;
/** take, poll, peek or remove 方法擷取元素的下标位置 */
int takeIndex;
/** put, offer, or add 方法添加元素的下标位置 */
int putIndex;
/** 隊列中的元素數量 */
int count;
//并發控制
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
items是使用數組存儲的,這也是ArrayBlockingQueue名稱的由來。
并發控制使用經典的雙Condition 算法,上面定義了兩個Condition ,一個notEmpty,一個notFull。下面來逐行源碼具體分析一下。
ArrayBlockingQueue構造方法
//capacity: 數組初始容量
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//fair:是否是公平鎖
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
在構造函數中會指定初始容量以及鎖的類型,預設是非公平鎖。
數組items一旦确定下來,後續就不會再更改大小。
put
put方法添加元素到隊尾,當隊列滿時阻塞。
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;//擷取鎖lock
lock.lockInterruptibly();//加鎖,響應中斷
try {
while (count == items.length)
notFull.await();//如果items滿了,那麼notFull阻塞
enqueue(e);//将元素添加到隊列末尾
} finally {
lock.unlock();
}
}
//将元素添加到隊列末尾, ++putIndex,count++,
//該方法隻允許在lock加鎖後操作
private void enqueue(E e) {
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();//喚醒阻塞的擷取元素的線程
}
整個過程還是比較簡單的,首先加鎖,注意這裡的鎖允許中斷傳回。然後,如果隊列滿了(
count == items.length
),那麼就無法繼續添加元素了,添加元素的線程就需要await等待(
notFull.await();
),該線程添加到notFull的condition隊列上,直到被take方法喚醒(後面會講)後,繼續添加元素到隊尾(
enqueue(e);
)。
enqueue方法中,添加元素到putIndex 的位置,然後對putIndex 加1操作,由于是數組,是以這裡進行了越界處理,越界後從0開始繼續計算。count元素的數量進行加1操作,同時喚醒notEmpty condition隊列上阻塞的線程。
讀者可以思考一下這裡為什麼沒有進行這個校驗:putIndex 在加一以後是否會與現有隊列中已經存在的元素重合而覆寫掉現有元素?答案下一節揭曉。
take
take:從隊首擷取元素。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//加鎖,響應中斷
try {
while (count == 0)
notEmpty.await();//如果隊列中空,那麼目前線程需要添加到notEmpty的condition隊列中阻塞,直到有新的元素添加進來
return dequeue();
} finally {
lock.unlock();
}
}
//從對首擷取元素,
//該方法隻允許在lock加鎖後操作
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;//設定takeIndex 下一次應該擷取的位置
count--;//隊列總數量-1
if (itrs != null)
itrs.elementDequeued();//疊代器相關
notFull.signal();//喚醒notFull condition隊列上的線程
return e;
}
take過程也很簡單,先看一下隊列是否為空,如果為空,則無法擷取元素,将目前線程添加到notEmpty的condition隊列。否則從takeIndex 的位置讀取元素并且設定takeIndex ,count 的值。
上一節提到了這個問題:
putIndex 在加一以後是否會與現有隊列中已經存在的元素重合而覆寫掉現有元素?
其實也很簡單,enqueue和dequeue都是需要加鎖以後才調用的,是以是線程安全的,count可以準确表示隊列中的有效長度,takeIndex 和putIndex 也都沒有并發問題,他們每次添加或者讀取時都會判斷count的值,來确認隊列是否滿或者空。如此一來,當然不會出現添加過多覆寫現有元素的情況。
總結
首先回顧一下上一篇
中畫的condition結構圖嗎,以及condition與ReentrantLock之間的關系。
然後再次基礎上,畫一下ArrayBlockingQueue中的condition關系圖:
總體來講,ArrayBlockingQueue包含一個ReentrantLock和兩個condition:notEmpty和notFull。
put可take操作都需要加鎖,都是線程安全的。
當隊列滿時,put操作需阻塞等待,目前線程添加到notFull的condition隊列中;添加成功時,需喚醒notEmpty隊列中的線程。
當隊列空時,take操作需阻塞等待,目前線程添加到notEmpty的condition隊列中;擷取成功時,需喚醒notFull隊列中的線程。