我們為什麼要招進階程式員呢?因為進階程式員寫的 bug 可能更少,在調用 api 的時候,犯錯的機率更小。但是并不意味這進階程式員就不犯錯。今天我們就一起來分享一個由于 ArrayBlockingQueue 使用不當,導緻公司損失幾百萬的案例!
根據 ArrayBlockingQueue 的名字我們都可以看出,它是一個隊列,并且是一個基于數組的阻塞隊列。
ArrayBlockingQueue 是一個有界隊列,有界也就意味着,它不能夠存儲無限多數量的對象。是以在建立 ArrayBlockingQueue 時,必須要給它指定一個隊列的大小。
我們先來熟悉一下 ArrayBlockingQueue 中的幾個重要的方法。
- add(E e):把 e 加到 BlockingQueue 裡,即如果 BlockingQueue 可以容納,則傳回 true,否則報異常
- offer(E e):表示如果可能的話,将 e 加到 BlockingQueue 裡,即如果 BlockingQueue 可以容納,則傳回 true,否則傳回 false
- put(E e):把 e 加到 BlockingQueue 裡,如果 BlockQueue 沒有空間,則調用此方法的線程被阻斷直到 BlockingQueue 裡面有空間再繼續
- poll(time):取走 BlockingQueue 裡排在首位的對象,若不能立即取出,則可以等 time 參數規定的時間,取不到時傳回 null
- take():取走 BlockingQueue 裡排在首位的對象,若 BlockingQueue 為空,阻斷進入等待狀态直到 Blocking 有新的對象被加入為止
-
remainingCapacity():剩餘可用的大小。等于初始容量減去目前的 size
我們再來看一下 ArrayBlockingQueue 使用場景。
- 先進先出隊列(隊列頭的是最先進隊的元素;隊列尾的是最後進隊的元素)
- 有界隊列(即初始化時指定的容量,就是隊列最大的容量,不會出現擴容,容量滿,則阻塞進隊操作;容量空,則阻塞出隊操作)
-
隊列不支援空元素
ArrayBlockingQueue 進隊操作采用了加鎖的方式保證并發安全。源代碼裡面有一個 while() 判斷:
public void put(E e) throws InterruptedException {
checkNotNull(e); // 非空判斷
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 擷取鎖
try {
while (count == items.length) {
// 一直阻塞,知道隊列非滿時,被喚醒
notFull.await();
}
enqueue(e); // 進隊
} finally {
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
// 阻塞,知道隊列不滿
// 或者逾時時間已過,傳回false
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
通過源碼分析,我們可以發現下面的規律:
- 阻塞調用方式 put(e)或 offer(e, timeout, unit)
- 阻塞調用時,喚醒條件為逾時或者隊列非滿(是以,要求在出隊時,要發起一個喚醒操作)
-
進隊成功之後,執行notEmpty.signal()喚起被阻塞的出隊線程
出隊的源碼類似,我就不貼了。ArrayBlockingQueue 隊列我們可以在建立線程池時進行使用。
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(2));
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(2));
if (blockingQueue.remainingCapacity() < 1) {
//todo
}
blockingQueue.put(...)