阻塞隊列——手寫生産者消費者模式、線程池原理面試題真正的答案
文章收錄在 GitHub JavaKeeper ,N線網際網路開發必備技能兵器譜
隊列和阻塞隊列
隊列
隊列(
Queue
)是一種經常使用的集合。
Queue
實際上是實作了一個先進先出(FIFO:First In First Out)的有序表。和 List、Set一樣都繼承自 Collection。它和
List
的差別在于,
List
可以在任意位置添加和删除元素,而
Queue
隻有兩個操作:
- 把元素添加到隊列末尾;
- 從隊列頭部取出元素。
超市的收銀台就是一個隊列:

我們常用的 LinkedList 就可以當隊列使用,實作了Dequeue接口,還有 ConcurrentLinkedQueue,他們都屬于非阻塞隊列。
阻塞隊列
阻塞隊列,顧名思義,首先它是一個隊列,而一個阻塞隊列在資料結構中所起的作用大緻如下
線程 1 往阻塞隊列中添加元素,而線程 2 從阻塞隊列中移除元素
- 當阻塞隊列是空時,從隊列中擷取元素的操作将會被阻塞。
- 當阻塞隊列是滿時,從隊列中添加元素的操作将會被阻塞。
試圖從空的阻塞隊列中擷取元素的線程将會阻塞,直到其他的線程往空的隊列插入新的元素,同樣,試圖往已滿的阻塞隊列添加新元素的線程同樣也會阻塞,直到其他的線程從列中移除一個或多個元素或者完全清空隊列後繼續新增。
類似我們去海底撈排隊,海底撈爆滿情況下,阻塞隊列相當于用餐區,用餐區滿了的話,就阻塞在候客區等着,可以用餐的話 put 一波去用餐,吃完就 take 出去。
為什麼要用阻塞隊列,有什麼好處嗎
在多線程領域:所謂阻塞,是指在某些情況下會挂起線程(即阻塞),一旦條件滿足,被挂起的線程又會自動被喚醒。
那為什麼需要 BlockingQueue 呢
好處是我們不需要關心什麼時候需要阻塞線程,什麼時候需要喚醒線程,因為這些 BlockingQueue 都包辦了。
在 concurrent 包釋出以前,多線程環境下,我們每個程式員都必須自己去實作這些細節,尤其還要兼顧效率和線程安全,這會給我們的程式帶來不小的複雜性。現在有了阻塞隊列,我們的操作就從手動擋換成了自動擋。
Java 裡的阻塞隊列
Collection的子類除了我們熟悉的 List 和 Set,還有一個 Queue,阻塞隊列 BlockingQueue 繼承自 Queue。
BlockingQueue 是個接口,需要使用它的實作之一來使用 BlockingQueue,
java.util.concurrent
包下具有以下 BlockingQueue 接口的實作類:
JDK 提供了 7 個阻塞隊列。分别是
- ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列
- LinkedBlockingQueue :一個由連結清單結構組成的有界阻塞隊列
- PriorityBlockingQueue :一個支援優先級排序的無界阻塞隊列
- DelayQueue:一個使用優先級隊列實作的無界阻塞隊列
- SynchronousQueue:一個不存儲元素的阻塞隊列
- LinkedTransferQueue:一個由連結清單結構組成的無界阻塞隊列(實作了繼承于 BlockingQueue的 TransferQueue)
- LinkedBlockingDeque:一個由連結清單結構組成的雙向阻塞隊列
BlockingQueue 核心方法
相比 Queue 接口,BlockingQueue 有四種形式的 API。
方法類型 | 抛出異常 | 傳回特殊值 | 一直阻塞 | 逾時退出 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除(取出) | remove() | poll() | take() | poll(time,unit) |
檢查 | element() | peek() | 不可用 |
以 ArrayBlockingQueue 來看下 Java 阻塞隊列提供的常用方法
- 抛出異常:
- 當阻塞隊列滿時,再往隊列裡 add 插入元素會抛出
異常;java.lang.IllegalStateException: Queue full
- 當隊列為空時,從隊列裡 remove 移除元素時會抛出
異常 。NoSuchElementException
- element(),傳回隊列頭部的元素,如果隊列為空,則抛出一個
異常NoSuchElementException
- 當阻塞隊列滿時,再往隊列裡 add 插入元素會抛出
- 傳回特殊值:
- offer(),插入方法,成功傳回 true,失敗傳回 false;
- poll(),移除方法,成功傳回出隊列的元素,隊列裡沒有則傳回 null
- peek() ,傳回隊列頭部的元素,如果隊列為空,則傳回 null
- 一直阻塞:
- 當阻塞隊列滿時,如果生産線程繼續往隊列裡 put 元素,隊列會一直阻塞生産線程,直到拿到資料,或者響應中斷退出;
- 當阻塞隊列空時,消費線程試圖從隊列裡 take 元素,隊列也會一直阻塞消費線程,直到隊列可用。
- 逾時退出:
- 當阻塞隊列滿時,隊列會阻塞生産線程一定時間,如果超過一定的時間,生産線程就會退出,傳回 false
- 當阻塞隊列空時,隊列會阻塞消費線程一定時間,如果超過一定的時間,消費線程會退出,傳回 null
BlockingQueue 實作類
逐個分析下這 7 個阻塞隊列,常用的幾個順便探究下源碼。
ArrayBlockingQueue
ArrayBlockingQueue,一個由數組實作的有界阻塞隊列。該隊列采用先進先出(FIFO)的原則對元素進行排序添加的。
ArrayBlockingQueue 為有界且固定,其大小在構造時由構造函數來決定,确認之後就不能再改變了。
ArrayBlockingQueue 支援對等待的生産者線程和使用者線程進行排序的可選公平政策,但是在預設情況下不保證線程公平的通路,在構造時可以選擇公平政策(
fair = true
)。公平性通常會降低吞吐量,但是減少了可變性和避免了“不平衡性”。(ArrayBlockingQueue 内部的阻塞隊列是通過 ReentrantLock 和 Condition 條件隊列實作的, 是以 ArrayBlockingQueue 中的元素存在公平和非公平通路的差別)
所謂公平通路隊列是指阻塞的所有生産者線程或消費者線程,當隊列可用時,可以按照阻塞的先後順序通路隊列,即先阻塞的生産者線程,可以先往隊列裡插入元素,先阻塞的消費者線程,可以先從隊列裡擷取元素,可以保證先進先出,避免饑餓現象。
源碼解讀
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 通過數組來實作的隊列
final Object[] items;
//記錄隊首元素的下标
int takeIndex;
//記錄隊尾元素的下标
int putIndex;
//隊列中的元素個數
int count;
//通過ReentrantLock來實作同步
final ReentrantLock lock;
//有2個條件對象,分别表示隊列不為空和隊列不滿的情況
private final Condition notEmpty;
private final Condition notFull;
//疊代器
transient Itrs itrs;
//offer方法用于向隊列中添加資料
public boolean offer(E e) {
// 可以看出添加的資料不支援null值
checkNotNull(e);
final ReentrantLock lock = this.lock;
//通過重入鎖來實作同步
lock.lock();
try {
//如果隊列已經滿了的話直接就傳回false,不會阻塞調用這個offer方法的線程
if (count == items.length)
return false;
else {
//如果隊列沒有滿,就調用enqueue方法将元素添加到隊列中
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
//多了個等待時間的 offer方法
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//擷取可中斷鎖
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
//等待設定的時間
nanos = notFull.awaitNanos(nanos);
}
//如果等待時間過了,隊列有空間的話就會調用enqueue方法将元素添加到隊列
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
//将資料添加到隊列中的具體方法
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
//通過循環數組實作的隊列,當數組滿了時下标就變成0了
if (++putIndex == items.length)
putIndex = 0;
count++;
//激活因為notEmpty條件而阻塞的線程,比如調用take方法的線程
notEmpty.signal();
}
//将資料從隊列中取出的方法
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
//将對應的數組下标位置設定為null釋放資源
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//激活因為notFull條件而阻塞的線程,比如調用put方法的線程
notFull.signal();
return x;
}
//put方法和offer方法不一樣的地方在于,如果隊列是滿的話,它就會把調用put方法的線程阻塞,直到隊列裡有空間
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//因為後面調用了條件變量的await()方法,而await()方法會在中斷标志設定後抛出InterruptedException異常後退出,
// 是以在加鎖時候先看中斷标志是不是被設定了,如果設定了直接抛出InterruptedException異常,就不用再去擷取鎖了
lock.lockInterruptibly();
try {
while (count == items.length)
//如果隊列滿的話就阻塞等待,直到notFull的signal方法被調用,也就是隊列裡有空間了
notFull.await();
//隊列裡有空間了執行添加操作
enqueue(e);
} finally {
lock.unlock();
}
}
//poll方法用于從隊列中取資料,不會阻塞目前線程
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果隊列為空的話會直接傳回null,否則調用dequeue方法取資料
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//有等待時間的 poll 重載方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
//take方法也是用于取隊列中的資料,但是和poll方法不同的是它有可能會阻塞目前的線程
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//當隊列為空時,就會阻塞目前線程
while (count == 0)
notEmpty.await();
//直到隊列中有資料了,調用dequeue方法将資料傳回
return dequeue();
} finally {
lock.unlock();
}
}
//傳回隊首元素
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
//擷取隊列的元素個數,加了鎖,是以結果是準确的
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
// 此外,還有一些其他方法
//傳回隊列剩餘空間,還能加幾個元素
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}
// 判斷隊列中是否存在目前元素o
public boolean contains(Object o){}
// 傳回一個按正确順序,包含隊列中所有元素的數組
public Object[] toArray(){}
// 自動清空隊列中的所有元素
public void clear(){}
// 移除隊列中所有可用元素,并将他們加入到給定的 Collection 中
public int drainTo(Collection<? super E> c){}
// 傳回此隊列中按正确順序進行疊代的,包含所有元素的疊代器
public Iterator<E> iterator()
}
LinkedBlockingQueue
LinkedBlockingQueue 是一個用單向連結清單實作的有界阻塞隊列。此隊列的預設和最大長度為
Integer.MAX_VALUE
。此隊列按照先進先出的原則對元素進行排序。
如果不是特殊業務,LinkedBlockingQueue 使用時,切記要定義容量
new LinkedBlockingQueue(capacity)
,防止過度膨脹。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -6903933977591709194L;
// 基于連結清單實作,肯定要有結點類,典型的單連結清單結構
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
//容量
private final int capacity;
//目前隊列元素數量
private final AtomicInteger count = new AtomicInteger();
// 頭節點,不存資料
transient Node<E> head;
// 尾節點,便于入隊
private transient Node<E> last;
// take鎖,出隊鎖,隻有take,poll方法會持有
private final ReentrantLock takeLock = new ReentrantLock();
// 出隊等待條件
// 當隊列無元素時,take鎖會阻塞在notEmpty條件上,等待其它線程喚醒
private final Condition notEmpty = takeLock.newCondition();
// 入隊鎖,隻有put,offer會持有
private final ReentrantLock putLock = new ReentrantLock();
// 入隊等待條件
// 當隊列滿了時,put鎖會會阻塞在notFull上,等待其它線程喚醒
private final Condition notFull = putLock.newCondition();
//同樣提供三個構造器
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
// 初始化head和last指針為空值節點
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue() {
// 如果沒傳容量,就使用最大int值初始化其容量
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(Collection<? extends E> c) {}
//入隊
public void put(E e) throws InterruptedException {
// 不允許null元素
if (e == null) throw new NullPointerException();
//規定給目前put方法預留一個本地變量
int c = -1;
// 建立一個節點
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 使用put鎖加鎖
putLock.lockInterruptibly();
try {
// 如果隊列滿了,就阻塞在notFull條件上
// 等待被其它線程喚醒
while (count.get() == capacity) {
notFull.await();
}
// 隊列不滿了,就入隊
enqueue(node);
// 隊列長度加1
c = count.getAndIncrement();
// 如果現隊列長度小于容量
// 就再喚醒一個阻塞在notFull條件上的線程
// 這裡為啥要喚醒一下呢?
// 因為可能有很多線程阻塞在notFull這個條件上的
// 而取元素時隻有取之前隊列是滿的才會喚醒notFull
// 為什麼隊列滿的才喚醒notFull呢?
// 因為喚醒是需要加putLock的,這是為了減少鎖的次數
// 是以,這裡索性在放完元素就檢測一下,未滿就喚醒其它notFull上的線程
// 說白了,這也是鎖分離帶來的代價
if (c + 1 < capacity)
notFull.signal();
} finally {
// 釋放鎖
putLock.unlock();
}
// 如果原隊列長度為0,現在加了一個元素後立即喚醒notEmpty條件
if (c == 0)
signalNotEmpty();
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
// 加take鎖
takeLock.lock();
try {
// 喚醒notEmpty條件
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
private void enqueue(Node<E> node) {
// 直接加到last後面
last = last.next = node;
}
public boolean offer(E e) {
//用帶過期時間的說明
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
//轉換為納秒
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//擷取入隊鎖,支援等待鎖的過程中被中斷
putLock.lockInterruptibly();
try {
//隊列滿了,再看看有沒有逾時
while (count.get() == capacity) {
if (nanos <= 0)
//等待時間逾時
return false;
//進行等待,awaitNanos(long nanos)是AQS中的方法
//在等待過程中,如果被喚醒或逾時,則繼續目前循環
//如果被中斷,則抛出中斷異常
nanos = notFull.awaitNanos(nanos);
}
//進入隊尾
enqueue(new Node<E>(e));
c = count.getAndIncrement();
//說明目前元素後面還能再插入一個
//就喚醒一個入隊條件隊列中阻塞的線程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//節點數量為0,說明隊列是空的
if (c == 0)
//喚醒一個出隊條件隊列阻塞的線程
signalNotEmpty();
return true;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果隊列無元素,則阻塞在notEmpty條件上
while (count.get() == 0) {
notEmpty.await();
}
// 否則,出隊
x = dequeue();
// 擷取出隊前隊列的長度
c = count.getAndDecrement();
// 如果取之前隊列長度大于1,則喚醒notEmpty
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果取之前隊列長度等于容量
// 則喚醒notFull
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
//隊列為空且已經逾時,直接傳回空
if (nanos <= 0)
return null;
//等待過程中可能被喚醒,逾時,中斷
nanos = notEmpty.awaitNanos(nanos);
}
//進行出隊操作
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果出隊前,隊列是滿的,則喚醒一個被take()阻塞的線程
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
//
}
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
public boolean contains(Object o) {
}
static final class LBQSpliterator<E> implements Spliterator<E> {
}
}
LinkedBlockingQueue 與 ArrayBlockingQueue 對比
- ArrayBlockingQueue 入隊出隊采用一把鎖,導緻入隊出隊互相阻塞,效率低下;
- LinkedBlockingQueue入隊出隊采用兩把鎖,入隊出隊互不幹擾,效率較高;
- 二者都是有界隊列,如果長度相等且出隊速度跟不上入隊速度,都會導緻大量線程阻塞;
- LinkedBlockingQueue 如果初始化不傳入初始容量,則使用最大int值,如果出隊速度跟不上入隊速度,會導緻隊列特别長,占用大量記憶體;
PriorityBlockingQueue
PriorityBlockingQueue 是一個支援優先級的無界阻塞隊列。(雖說是無界隊列,但是由于資源耗盡的話,也會OutOfMemoryError,無法添加元素)
預設情況下元素采用自然順序升序排列。也可以自定義類實作 compareTo() 方法來指定元素排序規則,或者初始化 PriorityBlockingQueue 時,指定構造參數 Comparator 來對元素進行排序。但需要注意的是不能保證同優先級元素的順序。PriorityBlockingQueue 是基于最小二叉堆實作,使用基于 CAS 實作的自旋鎖來控制隊列的動态擴容,保證了擴容操作不會阻塞 take 操作的執行。
DelayQueue
DelayQueue 是一個使用優先級隊列實作的延遲無界阻塞隊列。
隊列使用 PriorityQueue 來實作。隊列中的元素必須實作 Delayed 接口,在建立元素時可以指定多久才能從隊列中擷取目前元素。隻有在延遲期滿時才能從隊列中提取元素。我們可以将 DelayQueue 運用在以下應用場景:
- 緩存系統的設計:可以用 DelayQueue 儲存緩存元素的有效期,使用一個線程循環查詢 DelayQueue,一旦能從 DelayQueue 中擷取元素時,表示緩存有效期到了。
- 定時任務排程。使用 DelayQueue 儲存當天将會執行的任務和執行時間,一旦從 DelayQueue 中擷取到任務就開始執行,從比如 Timer 就是使用 DelayQueue 實作的。
SynchronousQueue
SynchronousQueue 是一個不存儲元素的阻塞隊列,也即是單個元素的隊列。
每一個 put 操作必須等待一個 take 操作,否則不能繼續添加元素。SynchronousQueue 可以看成是一個傳球手,負責把生産者線程處理的資料直接傳遞給消費者線程。隊列本身并不存儲任何元素,非常适合于傳遞性場景, 比如在一個線程中使用的資料,傳遞給另外一個線程使用,SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。
Coding
synchronousQueue 是一個沒有資料緩沖的阻塞隊列,生産者線程對其的插入操作 put() 必須等待消費者的移除操作 take(),反過來也一樣。
對應 peek, contains, clear, isEmpty ... 等方法其實是無效的。
但是 poll() 和 offer() 就不會阻塞,舉例來說就是 offer 的時候如果有消費者在等待那麼就會立馬滿足傳回 true,如果沒有就會傳回 false,不會等待消費者到來。
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> queue = new SynchronousQueue<>();
//System.out.println(queue.offer("aaa")); //false
//System.out.println(queue.poll()); //null
System.out.println(queue.add("bbb")); //IllegalStateException: Queue full
new Thread(()->{
try {
System.out.println("Thread 1 put a");
queue.put("a");
System.out.println("Thread 1 put b");
queue.put("b");
System.out.println("Thread 1 put c");
queue.put("c");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("Thread 2 get:"+queue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println("Thread 2 get:"+queue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println("Thread 2 get:"+queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
Thread 1 put a
Thread 2 get:a
Thread 1 put b
Thread 2 get:b
Thread 1 put c
Thread 2 get:c
不像ArrayBlockingQueue、LinkedBlockingDeque之類的阻塞隊列依賴AQS實作并發操作,SynchronousQueue直接使用CAS實作線程的安全通路。
synchronousQueue 提供了兩個構造器(公平與否),内部是通過 Transferer 來實作的,具體分為兩個Transferer,分别是 TransferStack 和 TransferQueue。
TransferStack:非公平競争模式使用的資料結構是後進先出棧(LIFO Stack)
TransferQueue:公平競争模式則使用先進先出隊列(FIFO Queue)
性能上兩者是相當的,一般情況下,FIFO 通常可以支援更大的吞吐量,但 LIFO 可以更大程度的保持線程的本地化。
private transient volatile Transferer<E> transferer;
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
分析 TransferQueue 的實作
//構造函數中會初始化一個出隊的節點,并且首尾都指向這個節點
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
//隊列節點,
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
// 設定next和item的值,用于進行并發更新, cas 無鎖操作
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
boolean isCancelled() {
return item == this;
}
boolean isOffList() {
return next == this;
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
從 put() 方法和 take() 方法可以看出最終調用的都是 TransferQueue 的 transfer() 方法。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
//transfer方法用于送出資料或者是擷取資料
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
//如果e不為null,就說明是添加資料的入隊操作
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
//如果目前操作和 tail 節點的操作是一樣的;或者頭尾相同(表明隊列中啥都沒有)。
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
// 如果 t 和 tail 不一樣,說明,tail 被其他的線程改了,重來
if (t != tail) // inconsistent read
continue;
// 如果 tail 的 next 不是空。就需要将 next 追加到 tail 後面了
if (tn != null) { // lagging tail
// 使用 CAS 将 tail.next 變成 tail,
advanceTail(t, tn);
continue;
}
// 時間到了,不等待,傳回 null,插入失敗,擷取也是失敗的
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
LinkedTransferQueue
LinkedTransferQueue 是一個由連結清單結構組成的無界阻塞 TransferQueue 隊列。
LinkedTransferQueue采用一種預占模式。意思就是消費者線程取元素時,如果隊列不為空,則直接取走資料,若隊列為空,那就生成一個節點(節點元素為null)入隊,然後消費者線程被等待在這個節點上,後面生産者線程入隊時發現有一個元素為null的節點,生産者線程就不入隊了,直接就将元素填充到該節點,并喚醒該節點等待的線程,被喚醒的消費者線程取走元素,從調用的方法傳回。我們稱這種節點操作為“比對”方式。
隊列實作了 TransferQueue 接口重寫了 tryTransfer 和 transfer 方法,這組方法和 SynchronousQueue 公平模式的隊列類似,具有比對的功能
LinkedBlockingDeque
LinkedBlockingDeque 是一個由連結清單結構組成的雙向阻塞隊列。
所謂雙向隊列指的你可以從隊列的兩端插入和移出元素。雙端隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競争。相比其他的阻塞隊列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 單詞結尾的方法,表示插入,擷取(peek)或移除雙端隊列的第一個元素。以 Last 單詞結尾的方法,表示插入,擷取或移除雙端隊列的最後一個元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。
在初始化 LinkedBlockingDeque 時可以設定容量防止其過渡膨脹,預設容量也是 Integer.MAX_VALUE。另外雙向阻塞隊列可以運用在“工作竊取”模式中。
阻塞隊列使用場景
我們常用的生産者消費者模式就可以基于阻塞隊列實作;
線程池中活躍線程數達到 corePoolSize 時,線程池将會将後續的 task 送出到 BlockingQueue 中;
生産者消費者模式
JDK API文檔的 BlockingQueue 給出了一個典型的應用
面試題:一個初始值為 0 的變量,兩個線程對齊交替操作,一個+1,一個-1,5 輪
public class ProdCounsume_TraditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 0; i <= 5; i++) {
shareData.increment();
}
}, "T1").start();
new Thread(() -> {
for (int i = 0; i <= 5; i++) {
shareData.decrement();
}
}, "T1").start();
}
}
//線程操作資源類
class ShareData {
private int num = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() {
lock.lock();
try {
while (num != 0) {
//等待,不能生産
condition.await();
}
//幹活
num++;
System.out.println(Thread.currentThread().getName() + "\t" + num);
//喚醒
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
while (num == 0) {
//等待,不能生産
condition.await();
}
//幹活
num--;
System.out.println(Thread.currentThread().getName() + "\t" + num);
//喚醒
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
線程池
線程池的核心方法 ThreadPoolExecutor,用 BlockingQueue 存放任務的阻塞隊列,被送出但尚未被執行的任務
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
線程池在内部實際也是建構了一個生産者消費者模型,将線程和任務兩者解耦,并不直接關聯,進而良好的緩沖任務,複用線程。
不同的線程池實作用的是不同的阻塞隊列,newFixedThreadPool 和 newSingleThreadExecutor 用的是LinkedBlockingQueue,newCachedThreadPool 用的是 SynchronousQueue。
參考與感謝
https://www.liaoxuefeng.com/SynchronousQueue源碼
https://juejin.im/post/5ae754c7f265da0ba76f8534