背景:
一個BlockingQueue的是一個這樣的隊列,每個插入操作都必須等待另一個删除操作,反過來也一樣。一個同步隊列沒有内部容量這個概念。你不能使用peek操作,因為一個元素僅在你試着删除它的時候才能夠被取得。你不能插入一個元素(任何方法),直到另一個線程試着删除它。你不能疊代它們,因為沒有東西可以被疊代。queue的頭元素head是第一個進入隊列中的元素,并且插入線程正在為它等待。如果隊列中沒有像以上的元素存在,那麼調用poll的方法會傳回null。對于
Collection的其他方法(比如contains),SynchronousQueue表現得像一個空的集合。它不允許null入隊。
這個隊列類似于CSP和Ada中使用的會合信道。它們适合于切換的設計,比如一個線程中的對象必須同步等待另一個線程中運作的對象進而傳遞一些資訊/事件/任務。
這個類支援可選的公平政策進而制訂生産者和等待者的等待順序。預設情況下,這個順序是沒有保證的,使用true可以確定隊列是FIFO的。
這個類以及它的疊代器實作了某些Collection和Iterator中的方法。
算法:
這個算法實作了雙重棧和雙重隊列算法。
(LIfo)棧用作非公平模式,(Fifo)隊列用于公平模式。這兩者的性能相似。Fifo經常能在競争情況下提供更高的吞吐量,但是Lifo能夠在一般應用中維持更高的線程局部性。
雙重隊列(以及相似的棧)在任何時刻都是持有“資料”--item通過put操作提供或者“請求”--slo通過take操作提供,或者為空。一個調用試着“滿足”(一個請求的調用得到資料或者一個資料的調用比對了請求)結果是出隊了一個模式互補的節點。最有趣的的地方在于任何操作都能夠明确目前隊列處于哪種模式,以及表現得好像不需要鎖。
隊列和棧都擴充了抽象的Transferer接口,它定義了唯一一個transfer方法用于put或者take。這些方法統一在一個方法下是因為在雙重資料結構中,put和take方法是對稱的兩種方法,是以幾乎所有代碼可以被組合。結果transfer方法是比較長的,但是這樣相對于把他們分成幾乎重複的幾部分代碼還是更好的。
這個隊列和棧共享了很多相似的概念但是很少的具體細節。為了簡單性,他們保持不同進而在後續可以分開演化。
這個同步隊列的算法不同于以前的算法,包括消除的機制。主要的不同在于:
- 其他的算法使用位-掩碼的方式,但是現在的節點直接使用了模式位,進而導緻了其他的不同。
- 同步隊列必須等待直到被其他線程來“滿足”。
- 提供了逾時和中斷的支援,包括從連結清單中清理節點/線程進而避免垃圾資料的持有和記憶體消耗。
阻塞操作主要通過LockSupport的park/unpark方法來實作,而那些很可能被下一個調用滿足的節點會首先自旋一定次數(僅在多核處理器上)。在非常繁忙的同步隊列中,自旋能夠顯著地提升吞吐量。但是在一般情況下,這個自旋次數也是足夠小進而不那麼明顯。
清理操作在隊列和棧中的方式是不同的。隊列中,我們幾乎使用O(1)的時間來清除被取消的節點。但是如果它正被作為tail元素,那麼就必須等待後續的清除操作來清理它。棧中,我們需要潛在的O(n)時間來周遊,進而确認我們能夠清除這個節點,但是這個操作可以和其他線程共同作用于這個棧。
當垃圾回收關注于大部分節點的回收問題時,複雜的非阻塞算法會試着“忘記”資料、其他節點的引用,以及線程會持有這些資料在一段長的阻塞時間内。在這種情況下給一個節點中的值設定null将會違背主要的算法,我們會代替使用将引用指向節點自身的方式。這個情況不怎麼會在棧中發生(因為阻塞線程不會挂在舊的頭節點上),但是隊列中的節點引用必須積極地忘記,進而避免已經結束的節點的可達性。
實作:
實作的重點在于transfer這個方法:
- 不能放入null元素,否則抛出NullPointerException異常。
- 不限時版本(timed==false),傳回非null為成功插入,傳回null抛出InterruptedException異常。
- 限時版本(timed==true),傳回非null為成功插入,傳回null的确情況下,若處于中斷狀态則抛出InterruptedException異常,否則表明逾時。
Transferer接口分為queue和stack兩個版本, 我們先分析queue版本(源碼):
queue
/**
* Puts or takes an item.
*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
這裡的基本算法是循環試着執行下列行動之一:
- 如果隊列是空的或者擁有和目前調用同樣的模式,試着在隊列中增加等待節點,等待這個節點被滿足(或者取消),然後傳回相比對的資料。
- 如果隊列不為空以及隊列擁有和目前調用相補的模式,試着去通過CAS操作來改變等待節點的item域,彈出隊列,以及傳回相比對的資料。
在每一種情況中,都會檢查并且幫助遞增head和tail域,在其他的線程相對停滞和緩慢的情況下。 一開始對于null的檢查是為了避免看到沒有初始化的head和tail值。在目前的SynchronousQueue中這種情況不會發生,除非調用者使用的是非volatile/final的Transferer。 這個檢查存在的原因是,放在循環的頂部快于将它在循環中穿插地放置。
詳情如下:
- 取得目前調用的模式isData,進入循環,首先排除head和tail都為null的情況。
- 當隊列為空(head=tail)或者隊尾節點的模式和目前調用模式相同的情況下:假如tail元素已經改變則更新重來,假如目前是限時版本并且時間參數不大于0則傳回null,建立新節點并且以CAS方式入隊,更新tail節點,然後等待相補的調用的來臨(awaitFulfill),假如傳回的資料是節點本身(說明是中斷或者超市)則做清理,并傳回null,否則假如節點還在隊列中則更新head元素并且修改目前節點的item和waiter域,然後傳回獲得的x和e中非空的元素(此處必定是1空1非空)。
- 否則,隊列中有相補模式的節點,則試着取得隊首元素即head.next,嘗試滿足他(fulfill),假如失敗則重試,成功之後會遞增head的值,然後喚醒(unpark)等待線程以及傳回相比對的資料。
接着我們來看阻塞操作awaitFulfill:
/**
* Spins/blocks until node s is fulfilled.
*
* @param s the waiting node
* @param e the comparison value for checking match
* @param timed true if timed wait
* @param nanos timeout value
* @return matched item, or s if cancelled
*/
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
if (x != e)
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
這裡的s是根據調用構造的節點,e為傳遞的資料(null是請求,否則資料),timed為限時标示,nanos時間量(納秒):
- 首先我們根據timed取得結束時間,假如非限時則為0,然後取得線程對象以及自旋次數:隊首元素&非限時(maxUntimedSpins最大);隊首元素&限時(maxTimedSpins次之);否則為0。
- 進入循環:假如中斷則嘗試取消節點,假如item變化則傳回資料,假如限時版本并且逾時則嘗試取消并重試或者更新nanos,遞減自旋次數直到0----->設定節點中的線程對象----->調用非限時挂起版本或者根據門檻值判斷是否調用限時挂起版本。
注意:以上循環在有相補的調用發生時總是會傳回對應的資料,在被中斷或者逾時處理成功情況下會傳回目前節點本身。
我們最後看transfer方法中的清理操作:
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
/*
* At any given time, exactly one node on list cannot be
* deleted -- the last inserted node. To accommodate this,
* if we cannot delete s, we save its predecessor as
* "cleanMe", deleting the previously saved version
* first. At least one of node s or the node previously
* saved can always be deleted, so this always terminates.
*/
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
QNode t = tail; // Ensure consistent read for tail
if (t == h)
return;
QNode tn = t.next;
if (t != tail)
continue;
if (tn != null) {
advanceTail(t, tn);
continue;
}
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))
return;
}
QNode dp = cleanMe;
if (dp != null) { // Try unlinking previous cancelled node
QNode d = dp.next;
QNode dn;
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
casCleanMe(dp, null);
if (dp == pred)
return; // s is already saved node
} else if (casCleanMe(null, pred))
return; // Postpone cleaning s
}
}
方法中的pred為s入隊時的前驅,這個方法中注釋裡有如下說明:
在任一時刻,隻有一個連結清單中的節點不能被删除---最後一個插入的節點。為了遷就這個原則,如果我們無法删除s,那麼我們就儲存它的前驅節點為“cleanMe”,首先删除之前儲存的版本。是以至少s或者之前儲存的節點能夠被删除,是以最後總是能夠被删除!
詳情如下:
- 首先既然s已經被取消,則設定它的等待者waiter為null,進入循環(條件:pred.next為s推斷出s不為head,s為head的話不需要删除了)。
- 取得head和tail,探測他們是否停滞,過于停滞時更新他們的值。
- 當s節點不為隊尾節點時候(嘗試更新前驅pred,或者當s處于OffList時傳回)。
- s節點為尾元素:假如cleanMe不為空,則說明有之前并未删除的尾節點,那麼則嘗試删除之前的cleanMe之後的節點,否則嘗試設定目前的pred為cleanMe,等待下一次的删除,然後傳回。
這裡的難點在于: 當我們取得cleanMe時,如果不為空,并不了解是否有其他線程進過同樣的操作,是以我們首先要判斷d=dp.nex不為空(若為空則修改cleanMe然後傳回)、d != dp(否則說明dp已經offList)、d.isCancelled(否則說明之前的節點已經被删除、以及d不為尾節點&&dn=d.next不為空&&dn != d(否則d已經offList),以上情況下才可以修改dp的next為dn進而删除了之前存儲的待删除尾節點,并且修改cleanMe的值從dp變為null。假如cleanMe為空,則可以嘗試設定自己的pred節點,在競争失敗的情況下可以重試是可以取得進展的,因為任何時候隻有一個節點能夠作為tail。 是以我們會嘗試清理之前被删除的尾節點,以及嘗試設定自己的前驅為cleanMe節點。
if (dp == pred)
這一行在我看來是沒必要的,因為永遠不可能出現。
我們接着分析stack:
stack
/**
* Puts or takes an item.
*/
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
這裡的Node實際上擁有三種狀态:REQUEST/DATA/FULFILLING,基本算法是循環試着執行下列3種行動之一:
- 如果棧為空或者包含相同模式的節點,試着入棧然後等待比對,傳回比對資料,在取消時傳回null。
- 如果包含相補的節點,試着入棧一個fulfill模式的節點,比對相對應的等待節點,彈出這兩個節點,傳回比對資料。這裡的比對或者取消連結操作可能沒有被實際執行,因為第三種行動:
- 如果棧頂元素為fulfill模式的節點,嘗試幫助它執行match以及pop操作,然後再重試。這裡的代碼和fulfill的行為幾乎相同的,隻不過不傳回資料。
詳情如下:
- 取得目前調用的模式mode,REQUEST或則DATA,然後進入循環。
- 取得棧頂元素h,當隊列為空或者棧頂元素模式與目前模式相同時:首先排除限時調用和nanos不大于0的情況,然後傳回null。否則連結到h,并嘗試入棧,成功之後通過awaitFulfill等待相補調用的來臨,然後根據傳回SNode節點是否為s本身來判斷是否需要清理操作,假如擷取了資料那麼便協助更新head,以及根據目前模式傳回資料。
- 目前棧頂元素與目前調用模式不同,那麼假如目前棧頂元素模式不為fulfill時(通過 (mode & FULFILLING) != 0來判斷),進入循環,s為目前棧頂元素,m為下一個待比對元素(必定不為null),嘗試取得mn(m.next),試着使用m.tryMatch(s),來完成m和s的比對,并且傳遞s到了m.match(注意,這一步可能也會由另一種情況完成),成功之後改變head值,以及傳回目前資料或者比對資料。假如失敗(意味着最後探測到的match不為s,唯一的場景為等待線程中斷或者逾時),則重新設定s的next值為mn,然後重試。
- 目前棧頂元素的模式為fulfill時,嘗試取得棧頂元素h和next節點m,然後試着比對m和h的值,這裡的作用幾乎和上一種情況中類似,隻不過不傳回資料,以及隻協助一次。
接着我們來看阻塞操作awaitFulfill:
/**
* Spins/blocks until node s is matched by a fulfill operation.
*
* @param s the waiting node
* @param timed true if timed wait
* @param nanos timeout value
* @return matched node, or s if cancelled
*/
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();
SNode m = s.match;
if (m != null)
return m;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
這個操作事實上和隊列版本中的類似,首先來解釋下注釋中的内容:當一個節點/線程試着去阻塞,它會在設定waiter域之後至少檢查一次狀态,然後才會調用parking(阻塞),這樣子可以通過waiter進而和它的滿足者協作進而確定不會丢失信号。如果目前調用的幾點位于棧頂,那麼在park之前會首先嘗試自旋,這樣可以在生産者和消費者非常接近時避免阻塞。但是這個隻在多核處理器下才會有用。從代碼中的檢查情況可以看出,在優先級上,中斷狀态--->正式傳回---->逾時。(是以最後一個檢查是用來探測逾時的)除了非限時的同步隊列。{poll/offer}方法不會檢查中斷以及等待太久,是以對于中斷和逾時的判斷被放置于transfer方法中,這樣要好于調用awaitFulfill。詳情如下(與隊列版本中類似):取得目前的結束時間,目前線程,以及自旋次數。然後進入循環。首先判斷是否中斷,判斷限時版本下的時間流逝,判斷自旋,以及根據目前節點s所處的位置來設定自旋次數。設定線程對象(用于喚醒)。最後根據是否限時來阻塞目前線程,限時版本下會根據門檻值來判斷是否需要阻塞。最後我們來看進行中斷和逾時情況下的清理操作clean:
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread
/*
* At worst we may need to traverse entire stack to unlink
* s. If there are multiple concurrent calls to clean, we
* might not see s if another thread has already removed
* it. But we can stop when we see any node known to
* follow s. We use s.next unless it too is cancelled, in
* which case we try the node one past. We don't check any
* further because we don't want to doubly traverse just to
* find sentinel.
*/
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// Absorb cancelled nodes at head
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// Unsplice embedded nodes
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
這裡的s為被取消了的節點,這裡的注釋有如下說明:
最壞情況下我們需要周遊整個棧才能取消s的連結。如果有其他的取消操作同時在進行,我們可能看不到s,因為它已經被其他的線程删除了。但是我們可以觀察跟随s之後的節點,如果這個節點也是取消狀态,那麼我們會使用下一個節點,我們不會再檢查,因為不想要周遊兩遍僅僅是為了找到哨兵節點。
詳情如下:
- 設定s節點的item和waiter都為null,因為已經不需要了,并且它的狀态可以由match為this來判斷。
- 取得s的下一個節點past,假如past也是取消的,那麼再取下一節點。
- 從頭p=head開始,逐漸斷開past之前的那些被取消的節點。
- 再從p開始删除嵌在棧中的節點,知道棧為空或者找到哨兵節點(past)。