我們平時開發中好像很少使用到BlockingQueue(阻塞隊列),比如我們想要存儲一組資料的時候會使用ArrayList,想要存儲鍵值對資料會使用HashMap,在什麼場景下需要用到BlockingQueue呢?
1. BlockingQueue的應用場景
當我們處理完一批資料之後,需要把這批資料發給下遊方法接着處理,但是下遊方法的處理速率不受控制,可能時快時慢。如果下遊方法的處理速率較慢,會拖慢目前方法的處理速率,這時候該怎麼辦呢?
你可能想到使用線程池,是個辦法,不過需要建立很多線程,還要考慮下遊方法支不支援并發,如果是CPU密集任務,可能多線程比單線程處理速度更慢,因為需要頻繁上下文切換。
這時候就可以考慮使用BlockingQueue,BlockingQueue最典型的應用場景就是上面這種生産者-消費者模型。生産者往隊列中放資料,消費者從隊列中取資料,中間使用BlockingQueue做緩沖隊列,也就解決了生産者和消費者速率不同步的問題。
你可能聯想到了消息隊列(MessageQueue),消息隊列相當于分布式阻塞隊列,而BlockingQueue相當于本地阻塞隊列,隻作用于本機器。對應的是分布式緩存(比如:Redis、Memcache)和本地緩存(比如:Guava、Caffeine)。
另外很多架構中都有BlockingQueue的影子,比如線程池中就用到BlockingQueue做任務的緩沖。消息隊列中發消息、拉取消息的方法也都借鑒了BlockingQueue,使用起來很相似。
今天就一塊深入剖析一下Queue的底層源碼。
2. BlockingQueue的用法
BlockingQueue的用法非常簡單,就是放資料和取資料。
/**
* @apiNote BlockingQueue示例
* @author 一燈架構
*/
public class Demo {
public static void main(String[] args) throws InterruptedException {
// 1. 建立隊列,設定容量是10
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// 2. 往隊列中放資料
queue.put(1);
// 3. 從隊列中取資料
Integer result = queue.take();
}
}
為了滿足不同的使用場景,BlockingQueue設計了很多的放資料和取資料的方法。
操作 | 抛出異常 | 傳回特定值 | 阻塞 | 阻塞一段時間 |
放資料 | add | offer | put | offer(e, time, unit) |
取資料 | remove | poll | take | poll(time, unit) |
取資料(不删除) | element() | peek() | 不支援 | 不支援 |
這幾組方法的不同之處就是:
- 當隊列滿了,再往隊列中放資料,add方法抛異常,offer方法傳回false,put方法會一直阻塞(直到有其他線程從隊列中取走資料),offer方法阻塞指定時間然後傳回false。
- 當隊列是空,再從隊列中取資料,remove方法抛異常,poll方法傳回null,take方法會一直阻塞(直到有其他線程往隊列中放資料),poll方法阻塞指定時間然後傳回null。
- 當隊列是空,再去隊列中檢視資料(并不删除資料),element方法抛異常,peek方法傳回null。
工作中使用最多的就是offer、poll阻塞指定時間的方法。
3. BlockingQueue實作類
BlockingQueue常見的有下面5個實作類,主要是應用場景不同。
- ArrayBlockingQueue
- 基于數組實作的阻塞隊列,建立隊列時需指定容量大小,是有界隊列。
- LinkedBlockingQueue
- 基于連結清單實作的阻塞隊列,預設是無界隊列,建立可以指定容量大小
- SynchronousQueue
- 一種沒有緩沖的阻塞隊列,生産出的資料需要立刻被消費
- PriorityBlockingQueue
- 實作了優先級的阻塞隊列,基于資料顯示,是無界隊列
- DelayQueue
- 實作了延遲功能的阻塞隊列,基于PriorityQueue實作的,是無界隊列
4. BlockingQueue源碼解析
BlockingQueue的5種子類實作方式大同小異,這次就以最常用的ArrayBlockingQueue做源碼解析。
4.1 ArrayBlockingQueue類屬性
先看一下ArrayBlockingQueue類裡面有哪些屬性:
// 用來存放資料的數組
final Object[] items;
// 下次取資料的數組下标位置
int takeIndex;
// 下次放資料的數組下标位置
int putIndex;
// 目前已有元素的個數
int count;
// 獨占鎖,用來保證存取資料安全
final ReentrantLock lock;
// 取資料的條件
private final Condition notEmpty;
// 放資料的條件
private final Condition notFull;
ArrayBlockingQueue中4組存取資料的方法實作也是大同小異,本次以put和take方法進行解析。
4.2 put方法源碼解析
無論是放資料還是取資料都是從隊頭開始,逐漸往隊尾移動。
// 放資料,如果隊列已滿,就一直阻塞,直到有其他線程從隊列中取走資料
public void put(E e) throws InterruptedException {
// 校驗元素不能為空
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加鎖,加可中斷的鎖
lock.lockInterruptibly();
try {
// 如果隊列已滿,就一直阻塞,直到被喚醒
while (count == items.length)
notFull.await();
// 如果隊列未滿,就往隊列添加元素
enqueue(e);
} finally {
// 結束後,别忘了釋放鎖
lock.unlock();
}
}
// 實際往隊列添加資料的方法
private void enqueue(E x) {
// 擷取數組
final Object[] items = this.items;
// putIndex 表示本次插入的位置
items[putIndex] = x;
// ++putIndex 計算下次插入的位置
// 如果本次插入的位置,正好是隊尾,下次插入就從 0 開始
if (++putIndex == items.length)
putIndex = 0;
// 元素數量加一
count++;
// 喚醒因為隊列空等待的線程
notEmpty.signal();
}
源碼中有個有意思的設計,添加元素的時候如果已經到了隊尾,下次就從隊頭開始添加,相當于做成了一個循環隊列。
像下面這樣:
4.3 take方法源碼
// 取資料,如果隊列為空,就一直阻塞,直到有其他線程往隊列中放資料
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加鎖,加可中斷的鎖
lock.lockInterruptibly();
try {
// 如果隊列為空,就一直阻塞,直到被喚醒
while (count == 0)
notEmpty.await();
// 如果隊列不為空,就從隊列取資料
return dequeue();
} finally {
// 結束後,别忘了釋放鎖
lock.unlock();
}
}
// 實際從隊列取資料的方法
private E dequeue() {
// 擷取數組
final Object[] items = this.items;
// takeIndex 表示本次取資料的位置,是上一次取資料時計算好的
E x = (E) items[takeIndex];
// 取完之後,就把隊列該位置的元素删除
items[takeIndex] = null;
// ++takeIndex 計算下次取資料的位置
// 如果本次取資料的位置,正好是隊尾,下次就從 0 開始取資料
if (++takeIndex == items.length)
takeIndex = 0;
// 元素數量減一
count--;
if (itrs != null)
itrs.elementDequeued();
// 喚醒被隊列滿所阻塞的線程
notFull.signal();
return x;
}
4.4 總結
- ArrayBlockingQueue基于數組實作的阻塞隊列,建立隊列時需指定容量大小,是有界隊列。
- ArrayBlockingQueue底層采用循環隊列的形式,保證數組位置可以重複使用。
- ArrayBlockingQueue存取都采用ReentrantLock加鎖,保證線程安全,在多線程環境下也可以放心使用。
- 使用ArrayBlockingQueue的時候,預估好隊列長度,保證生産者和消費者速率相比對。
我是「一燈架構」,如果本文對你有幫助,歡迎各位小夥伴點贊、評論和關注,感謝各位老鐵,我們下期見