天天看點

【轉載】ArrayBlockingQueue

我們為什麼要招進階程式員呢?因為進階程式員寫的 bug 可能更少,在調用 api 的時候,犯錯的機率更小。但是并不意味這進階程式員就不犯錯。今天我們就一起來分享一個由于 ArrayBlockingQueue 使用不當,導緻公司損失幾百萬的案例!

根據 ArrayBlockingQueue 的名字我們都可以看出,它是一個隊列,并且是一個基于數組的阻塞隊列。

ArrayBlockingQueue 是一個有界隊列,有界也就意味着,它不能夠存儲無限多數量的對象。是以在建立 ArrayBlockingQueue 時,必須要給它指定一個隊列的大小。

我們先來熟悉一下 ArrayBlockingQueue 中的幾個重要的方法。

  • add(E e):把 e 加到 BlockingQueue 裡,即如果 BlockingQueue 可以容納,則傳回 true,否則報異常
  • offer(E e):表示如果可能的話,将 e 加到 BlockingQueue 裡,即如果 BlockingQueue 可以容納,則傳回 true,否則傳回 false
  • put(E e):把 e 加到 BlockingQueue 裡,如果 BlockQueue 沒有空間,則調用此方法的線程被阻斷直到 BlockingQueue 裡面有空間再繼續
  • poll(time):取走 BlockingQueue 裡排在首位的對象,若不能立即取出,則可以等 time 參數規定的時間,取不到時傳回 null
  • take():取走 BlockingQueue 裡排在首位的對象,若 BlockingQueue 為空,阻斷進入等待狀态直到 Blocking 有新的對象被加入為止
  • remainingCapacity():剩餘可用的大小。等于初始容量減去目前的 size

    我們再來看一下 ArrayBlockingQueue 使用場景。

  • 先進先出隊列(隊列頭的是最先進隊的元素;隊列尾的是最後進隊的元素)
  • 有界隊列(即初始化時指定的容量,就是隊列最大的容量,不會出現擴容,容量滿,則阻塞進隊操作;容量空,則阻塞出隊操作)
  • 隊列不支援空元素

    ArrayBlockingQueue 進隊操作采用了加鎖的方式保證并發安全。源代碼裡面有一個 while() 判斷:

public void put(E e) throws InterruptedException {
    checkNotNull(e); // 非空判斷
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 擷取鎖
    try {
        while (count == items.length) {
            // 一直阻塞,知道隊列非滿時,被喚醒
            notFull.await();
        }
        enqueue(e); // 進隊
    } finally {
        lock.unlock();
    }
}
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
        // 阻塞,知道隊列不滿
        // 或者逾時時間已過,傳回false
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}      

通過源碼分析,我們可以發現下面的規律:

  • 阻塞調用方式 put(e)或 offer(e, timeout, unit)
  • 阻塞調用時,喚醒條件為逾時或者隊列非滿(是以,要求在出隊時,要發起一個喚醒操作)
  • 進隊成功之後,執行notEmpty.signal()喚起被阻塞的出隊線程

    出隊的源碼類似,我就不貼了。ArrayBlockingQueue 隊列我們可以在建立線程池時進行使用。

new ThreadPoolExecutor(1, 1,
  0L, TimeUnit.MILLISECONDS,
  new ArrayBlockingQueue<Runnable>(2));
 
new ThreadPoolExecutor(1, 1,
  0L, TimeUnit.MILLISECONDS,
  new LinkedBlockingQueue<Runnable>(2));      
if (blockingQueue.remainingCapacity() < 1) {
    //todo
}
blockingQueue.put(...)      

繼續閱讀