天天看點

不允許還有Java程式員不了解BlockingQueue阻塞隊列的實作原理

作者:一燈架構

我們平時開發中好像很少使用到BlockingQueue(阻塞隊列),比如我們想要存儲一組資料的時候會使用ArrayList,想要存儲鍵值對資料會使用HashMap,在什麼場景下需要用到BlockingQueue呢?

1. BlockingQueue的應用場景

當我們處理完一批資料之後,需要把這批資料發給下遊方法接着處理,但是下遊方法的處理速率不受控制,可能時快時慢。如果下遊方法的處理速率較慢,會拖慢目前方法的處理速率,這時候該怎麼辦呢?

你可能想到使用線程池,是個辦法,不過需要建立很多線程,還要考慮下遊方法支不支援并發,如果是CPU密集任務,可能多線程比單線程處理速度更慢,因為需要頻繁上下文切換。

這時候就可以考慮使用BlockingQueue,BlockingQueue最典型的應用場景就是上面這種生産者-消費者模型。生産者往隊列中放資料,消費者從隊列中取資料,中間使用BlockingQueue做緩沖隊列,也就解決了生産者和消費者速率不同步的問題。

不允許還有Java程式員不了解BlockingQueue阻塞隊列的實作原理

你可能聯想到了消息隊列(MessageQueue),消息隊列相當于分布式阻塞隊列,而BlockingQueue相當于本地阻塞隊列,隻作用于本機器。對應的是分布式緩存(比如:Redis、Memcache)和本地緩存(比如:Guava、Caffeine)。

另外很多架構中都有BlockingQueue的影子,比如線程池中就用到BlockingQueue做任務的緩沖。消息隊列中發消息、拉取消息的方法也都借鑒了BlockingQueue,使用起來很相似。

今天就一塊深入剖析一下Queue的底層源碼。

2. BlockingQueue的用法

BlockingQueue的用法非常簡單,就是放資料和取資料。

/**
 * @apiNote BlockingQueue示例
 * @author 一燈架構
 */
public class Demo {
    public static void main(String[] args) throws InterruptedException {
        // 1. 建立隊列,設定容量是10
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        // 2. 往隊列中放資料
        queue.put(1);
        // 3. 從隊列中取資料
        Integer result = queue.take();
    }
}
           

為了滿足不同的使用場景,BlockingQueue設計了很多的放資料和取資料的方法。

操作 抛出異常 傳回特定值 阻塞 阻塞一段時間
放資料 add offer put offer(e, time, unit)
取資料 remove poll take poll(time, unit)
取資料(不删除) element() peek() 不支援 不支援

這幾組方法的不同之處就是:

  1. 當隊列滿了,再往隊列中放資料,add方法抛異常,offer方法傳回false,put方法會一直阻塞(直到有其他線程從隊列中取走資料),offer方法阻塞指定時間然後傳回false。
  2. 當隊列是空,再從隊列中取資料,remove方法抛異常,poll方法傳回null,take方法會一直阻塞(直到有其他線程往隊列中放資料),poll方法阻塞指定時間然後傳回null。
  3. 當隊列是空,再去隊列中檢視資料(并不删除資料),element方法抛異常,peek方法傳回null。

工作中使用最多的就是offer、poll阻塞指定時間的方法。

3. BlockingQueue實作類

BlockingQueue常見的有下面5個實作類,主要是應用場景不同。

  • ArrayBlockingQueue
  • 基于數組實作的阻塞隊列,建立隊列時需指定容量大小,是有界隊列。
  • LinkedBlockingQueue
  • 基于連結清單實作的阻塞隊列,預設是無界隊列,建立可以指定容量大小
  • SynchronousQueue
  • 一種沒有緩沖的阻塞隊列,生産出的資料需要立刻被消費
  • PriorityBlockingQueue
  • 實作了優先級的阻塞隊列,基于資料顯示,是無界隊列
  • DelayQueue
  • 實作了延遲功能的阻塞隊列,基于PriorityQueue實作的,是無界隊列

4. BlockingQueue源碼解析

BlockingQueue的5種子類實作方式大同小異,這次就以最常用的ArrayBlockingQueue做源碼解析。

4.1 ArrayBlockingQueue類屬性

先看一下ArrayBlockingQueue類裡面有哪些屬性:

// 用來存放資料的數組
final Object[] items;

// 下次取資料的數組下标位置
int takeIndex;

// 下次放資料的數組下标位置
int putIndex;

// 目前已有元素的個數
int count;

// 獨占鎖,用來保證存取資料安全
final ReentrantLock lock;

// 取資料的條件
private final Condition notEmpty;

// 放資料的條件
private final Condition notFull;
           

ArrayBlockingQueue中4組存取資料的方法實作也是大同小異,本次以put和take方法進行解析。

4.2 put方法源碼解析

不允許還有Java程式員不了解BlockingQueue阻塞隊列的實作原理

無論是放資料還是取資料都是從隊頭開始,逐漸往隊尾移動。

// 放資料,如果隊列已滿,就一直阻塞,直到有其他線程從隊列中取走資料
public void put(E e) throws InterruptedException {
    // 校驗元素不能為空
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
  	// 加鎖,加可中斷的鎖
    lock.lockInterruptibly();
    try {
        // 如果隊列已滿,就一直阻塞,直到被喚醒
        while (count == items.length)
            notFull.await();
      	// 如果隊列未滿,就往隊列添加元素
        enqueue(e);
    } finally {
      	// 結束後,别忘了釋放鎖
        lock.unlock();
    }
}

// 實際往隊列添加資料的方法
private void enqueue(E x) {
    // 擷取數組
    final Object[] items = this.items;
    // putIndex 表示本次插入的位置
    items[putIndex] = x;
    // ++putIndex 計算下次插入的位置
    // 如果本次插入的位置,正好是隊尾,下次插入就從 0 開始
    if (++putIndex == items.length)
        putIndex = 0;
  	// 元素數量加一
    count++;
    // 喚醒因為隊列空等待的線程
    notEmpty.signal();
}
           

源碼中有個有意思的設計,添加元素的時候如果已經到了隊尾,下次就從隊頭開始添加,相當于做成了一個循環隊列。

像下面這樣:

不允許還有Java程式員不了解BlockingQueue阻塞隊列的實作原理

4.3 take方法源碼

// 取資料,如果隊列為空,就一直阻塞,直到有其他線程往隊列中放資料
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
  	// 加鎖,加可中斷的鎖
    lock.lockInterruptibly();
    try {
        // 如果隊列為空,就一直阻塞,直到被喚醒
        while (count == 0)
            notEmpty.await();
        // 如果隊列不為空,就從隊列取資料
        return dequeue();
    } finally {
      	// 結束後,别忘了釋放鎖
        lock.unlock();
    }
}

// 實際從隊列取資料的方法
private E dequeue() {
  	// 擷取數組
    final Object[] items = this.items;
    // takeIndex 表示本次取資料的位置,是上一次取資料時計算好的
    E x = (E) items[takeIndex];
    // 取完之後,就把隊列該位置的元素删除
    items[takeIndex] = null;
    // ++takeIndex 計算下次取資料的位置
    // 如果本次取資料的位置,正好是隊尾,下次就從 0 開始取資料
    if (++takeIndex == items.length)
        takeIndex = 0;
    // 元素數量減一
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 喚醒被隊列滿所阻塞的線程
    notFull.signal();
    return x;
}
           

4.4 總結

  1. ArrayBlockingQueue基于數組實作的阻塞隊列,建立隊列時需指定容量大小,是有界隊列。
  2. ArrayBlockingQueue底層采用循環隊列的形式,保證數組位置可以重複使用。
  3. ArrayBlockingQueue存取都采用ReentrantLock加鎖,保證線程安全,在多線程環境下也可以放心使用。
  4. 使用ArrayBlockingQueue的時候,預估好隊列長度,保證生産者和消費者速率相比對。
我是「一燈架構」,如果本文對你有幫助,歡迎各位小夥伴點贊、評論和關注,感謝各位老鐵,我們下期見

繼續閱讀