1.簡介
AbstractQueuedSynchronizer (抽象隊列同步器,以下簡稱 AQS)出現在 JDK 1.5 中,由大師 Doug Lea 所創作。AQS 是很多同步器的基礎架構,比如 ReentrantLock、CountDownLatch 和 Semaphore 等都是基于 AQS 實作的。除此之外,我們還可以基于 AQS,定制出我們所需要的同步器。
AQS 的使用方式通常都是通過内部類繼承 AQS 實作同步功能,通過繼承 AQS,可以簡化同步器的實作。如前面所說,AQS 是很多同步器實作的基礎架構。弄懂 AQS 對了解 Java 并發包裡的元件大有裨益,這也是我學習 AQS 并寫出這篇文章的緣由。另外,需要說明的是,AQS 本身并不是很好了解,細節很多。在看的過程中藥有一定的耐心,做好看多遍的準備。好了,其他的就不多說了,開始進入正題吧。
2.原理概述
在 AQS 内部,通過維護一個
FIFO 隊列
來管理多線程的排隊工作。在公平競争的情況下,無法擷取同步狀态的線程将會被封裝成一個節點,置于隊列尾部。入隊的線程将會通過自旋的方式擷取同步狀态,若在有限次的嘗試後,仍未擷取成功,線程則會被阻塞住。大緻示意圖如下:

當頭結點釋放同步狀态後,且後繼節點對應的線程被阻塞,此時頭結點
線程将會去喚醒後繼節點線程。後繼節點線程恢複運作并擷取同步狀态後,會将舊的頭結點從隊列中移除,并将自己設為頭結點。大緻示意圖如下:
3.重要方法介紹
本節将介紹三組重要的方法,通過使用這三組方法即可實作一個同步元件。
第一組方法是用于通路/設定同步狀态的,如下:
方法 | 說明 |
---|---|
int getState() | 擷取同步狀态 |
void setState() | 設定同步狀态 |
boolean compareAndSetState(int expect, int update) | 通過 CAS 設定同步狀态 |
第二組方需要由同步元件覆寫。如下:
boolean tryAcquire(int arg) | 獨占式擷取同步狀态 |
boolean tryRelease(int arg) | 獨占式釋放同步狀态 |
int tryAcquireShared(int arg) | 共享式擷取同步狀态 |
boolean tryReleaseShared(int arg) | 共享式私房同步狀态 |
boolean isHeldExclusively() | 檢測目前線程是否擷取獨占鎖 |
第三組方法是一組模闆方法,同步元件可直接調用。如下:
void acquire(int arg) | 獨占式擷取同步狀态,該方法将會調用 tryAcquire 嘗試擷取同步狀态。擷取成功則傳回,擷取失敗,線程進入同步隊列等待。 |
void acquireInterruptibly(int arg) | 響應中斷版的 acquire |
boolean tryAcquireNanos(int arg,long nanos) | 逾時+響應中斷版的 acquire |
void acquireShared(int arg) | 共享式擷取同步狀态,同一時刻可能會有多個線程獲得同步狀态。比如讀寫鎖的讀鎖就是就是調用這個方法擷取同步狀态的。 |
void acquireSharedInterruptibly(int arg) | 響應中斷版的 acquireShared |
boolean tryAcquireSharedNanos(int arg,long nanos) | 逾時+響應中斷版的 acquireShared |
boolean release(int arg) | |
boolean releaseShared(int arg) | 共享式釋放同步狀态 |
上面列舉了一堆方法,看似繁雜。但稍微理一下,就會發現上面諸多方法無非就兩大類:一類是獨占式擷取和釋放共享狀态,另一類是共享式擷取和釋放同步狀态。至于這兩類方法的實作細節,我會在接下來的章節中講到,繼續往下看吧。
4.源碼分析
4.1 節點結構
在并發的情況下,AQS 會将未擷取同步狀态的線程将會封裝成節點,并将其放入同步隊列尾部。同步隊列中的節點除了要儲存線程,還要儲存等待狀态。不管是獨占式還是共享式,在擷取狀态失敗時都會用到節點類。是以這裡我們要先看一下節點類的實作,為後面的源碼分析進行簡單鋪墊。源碼如下:
static final class Node { /** 共享類型節點,标記節點在共享模式下等待 */ static final Node SHARED = new Node(); /** 獨占類型節點,标記節點在獨占模式下等待 */ static final Node EXCLUSIVE = null; /** 等待狀态 - 取消 */ static final int CANCELLED = 1; /** * 等待狀态 - 通知。某個節點是處于該狀态,當該節點釋放同步狀态後, * 會通知後繼節點線程,使之可以恢複運作 */ static final int SIGNAL = -1; /** 等待狀态 - 條件等待。表明節點等待在 Condition 上 */ static final int CONDITION = -2; /** * 等待狀态 - 傳播。表示無條件向後傳播喚醒動作,詳細分析請看第五章 */ static final int PROPAGATE = -3; /** * 等待狀态,取值如下: * SIGNAL, * CANCELLED, * CONDITION, * PROPAGATE, * 0 * * 初始情況下,waitStatus = 0 */ volatile int waitStatus; /** * 前驅節點 */ volatile Node prev; /** * 後繼節點 */ volatile Node next; /** * 對應的線程 */ volatile Thread thread; /** * 下一個等待節點,用在 ConditionObject 中 */ Node nextWaiter; /** * 判斷節點是否是共享節點 */ final boolean isShared() { return nextWaiter == SHARED; } /** * 擷取前驅節點 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } /** addWaiter 方法會調用該構造方法 */ Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } /** Condition 中會用到此構造方法 */ Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
4.2 獨占模式分析
4.2.1 擷取同步狀态
獨占式擷取同步狀态時通過 acquire 進行的,下面來分析一下該方法的源碼。如下:
/** * 該方法将會調用子類複寫的 tryAcquire 方法擷取同步狀态, * - 擷取成功:直接傳回 * - 擷取失敗:将線程封裝在節點中,并将節點置于同步隊列尾部, * 通過自旋嘗試擷取同步狀态。如果在有限次内仍無法擷取同步狀态, * 該線程将會被 LockSupport.park 方法阻塞住,直到被前驅節點喚醒 */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** 向同步隊列尾部添加一個節點 */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 嘗試以快速方式将節點添加到隊列尾部 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 快速插入節點失敗,調用 enq 方法,不停的嘗試插入節點 enq(node); return node; } /** * 通過 CAS + 自旋的方式插入節點到隊尾 */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 設定頭結點,初始情況下,頭結點是一個空節點 if (compareAndSetHead(new Node())) tail = head; } else { /* * 将節點插入隊列尾部。這裡是先将新節點的前驅設為尾節點,之後在嘗試将新節點設為尾節 * 點,最後再将原尾節點的後繼節點指向新的尾節點。除了這種方式,我們還先設定尾節點, * 之後再設定前驅和後繼,即: * * if (compareAndSetTail(t, node)) { * node.prev = t; * t.next = node; * } * * 但但如果是這樣做,會導緻一個問題,即短時内,隊列結構會遭到破壞。考慮這種情況, * 某個線程在調用 compareAndSetTail(t, node)成功後,該線程被 CPU 切換了。此時 * 設定前驅和後繼的代碼還沒帶的及執行,但尾節點指針卻設定成功,導緻隊列結構短時内會 * 出現如下情況: * * +------+ prev +-----+ +-----+ * head | | <---- | | | | tail * | | ----> | | | | * +------+ next +-----+ +-----+ * * tail 節點完全脫離了隊列,這樣導緻一些隊列周遊代碼出錯。如果先設定 * 前驅,在設定尾節點。及時線程被切換,隊列結構短時可能如下: * * +------+ prev +-----+ prev +-----+ * head | | <---- | | <---- | | tail * | | ----> | | | | * +------+ next +-----+ +-----+ * * 這樣并不會影響從後向前周遊,不會導緻周遊邏輯出錯。 * * 參考: * https://www.cnblogs.com/micrari/p/6937995.html */ node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** * 同步隊列中的線程在此方法中以循環嘗試擷取同步狀态,在有限次的嘗試後, * 若仍未擷取鎖,線程将會被阻塞,直至被前驅節點的線程喚醒。 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 循環擷取同步狀态 for (;;) { final Node p = node.predecessor(); /* * 前驅節點如果是頭結點,表明前驅節點已經擷取了同步狀态。前驅節點釋放同步狀态後, * 在不出異常的情況下, tryAcquire(arg) 應傳回 true。此時節點就成功擷取了同 * 步狀态,并将自己設為頭節點,原頭節點出隊。 */ if (p == head && tryAcquire(arg)) { // 成功擷取同步狀态,設定自己為頭節點 setHead(node); p.next = null; // help GC failed = false; return interrupted; } /* * 如果擷取同步狀态失敗,則根據條件判斷是否應該阻塞自己。 * 如果不阻塞,CPU 就會處于忙等狀态,這樣會浪費 CPU 資源 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { /* * 如果在擷取同步狀态中出現異常,failed = true,cancelAcquire 方法會被執行。 * tryAcquire 需同步元件開發者覆寫,難免不了會出現異常。 */ if (failed) cancelAcquire(node); } } /** 設定頭節點 */ private void setHead(Node node) { // 僅有一個線程可以成功擷取同步狀态,是以這裡不需要進行同步控制 head = node; node.thread = null; node.prev = null; } /** * 該方法主要用途是,當線程在擷取同步狀态失敗時,根據前驅節點的等待狀态,決定後續的動作。比如前驅 * 節點等待狀态為 SIGNAL,表明目前節點線程應該被阻塞住了。不能老是嘗試,避免 CPU 忙等。 * ————————————————————————————————————————————————————————————————— * | 前驅節點等待狀态 | 相應動作 | * ————————————————————————————————————————————————————————————————— * | SIGNAL | 阻塞 | * | CANCELLED | 向前周遊, 移除前面所有為該狀态的節點 | * | waitStatus < 0 | 将前驅節點狀态設為 SIGNAL, 并再次嘗試擷取同步狀态 | * ————————————————————————————————————————————————————————————————— */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; /* * 前驅節點等待狀态為 SIGNAL,表示目前線程應該被阻塞。 * 線程阻塞後,會在前驅節點釋放同步狀态後被前驅節點線程喚醒 */ if (ws == Node.SIGNAL) return true; /* * 前驅節點等待狀态為 CANCELLED,則以前驅節點為起點向前周遊, * 移除其他等待狀态為 CANCELLED 的節點。 */ if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 等待狀态為 0 或 PROPAGATE,設定前驅節點等待狀态為 SIGNAL, * 并再次嘗試擷取同步狀态。 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { // 調用 LockSupport.park 阻塞自己 LockSupport.park(this); return Thread.interrupted(); } /** * 取消擷取同步狀态 */ private void cancelAcquire(Node node) { if (node == null) return; node.thread = null; // 前驅節點等待狀态為 CANCELLED,則向前周遊并移除其他為該狀态的節點 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 記錄 pred 的後繼節點,後面會用到 Node predNext = pred.next; // 将目前節點等待狀态設為 CANCELLED node.waitStatus = Node.CANCELLED; /* * 如果目前節點是尾節點,則通過 CAS 設定前驅節點 prev 為尾節點。設定成功後,再利用 CAS 将 * prev 的 next 引用置空,斷開與後繼節點的聯系,完成清理工作。 */ if (node == tail && compareAndSetTail(node, pred)) { /* * 執行到這裡,表明 pred 節點被成功設為了尾節點,這裡通過 CAS 将 pred 節點的後繼節點 * 設為 null。注意這裡的 CAS 即使失敗了,也沒關系。失敗了,表明 pred 的後繼節點更新 * 了。pred 此時已經是尾節點了,若後繼節點被更新,則是有新節點入隊了。這種情況下,CAS * 會失敗,但失敗不會影響同步隊列的結構。 */ compareAndSetNext(pred, predNext, null); } else { int ws; // 根據條件判斷是喚醒後繼節點,還是将前驅節點和後繼節點連接配接到一起 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) /* * 這裡使用 CAS 設定 pred 的 next,表明多個線程同時在取消,這裡存在競争。 * 不過此處沒針對 compareAndSetNext 方法失敗後做一些處理,表明即使失敗了也 * 沒關系。實際上,多個線程同時設定 pred 的 next 引用時,隻要有一個能設定成 * 功即可。 */ compareAndSetNext(pred, predNext, next); } else { /* * 喚醒後繼節點對應的線程。這裡簡單講一下為什麼要喚醒後繼線程,考慮下面一種情況: * head node1 node2 tail * ws=0 ws=1 ws=-1 ws=0 * +------+ prev +-----+ prev +-----+ prev +-----+ * | | <---- | | <---- | | <---- | | * | | ----> | | ----> | | ----> | | * +------+ next +-----+ next +-----+ next +-----+ * * 頭結點初始狀态為 0,node1、node2 和 tail 節點依次入隊。node1 自旋過程中調用 * tryAcquire 出現異常,進入 cancelAcquire。head 節點此時等待狀态仍然是 0,它 * 會認為後繼節點還在運作中,所它在釋放同步狀态後,不會去喚醒後繼等待狀态為非取消的 * 節點 node2。如果 node1 再不喚醒 node2 的線程,該線程面臨無法被喚醒的情況。此 * 時,整個同步隊列就回全部阻塞住。 */ unparkSuccessor(node); } node.next = node; // help GC } } private void unparkSuccessor(Node node) { int ws = node.waitStatus; /* * 通過 CAS 将等待狀态設為 0,讓後繼節點線程多一次 * 嘗試擷取同步狀态的機會 */ if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; /* * 這裡如果 s == null 處理,是不是表明 node 是尾節點?答案是不一定。原因之前在分析 * enq 方法時說過。這裡再啰嗦一遍,新節點入隊時,隊列瞬時結構可能如下: * node1 node2 * +------+ prev +-----+ prev +-----+ * head | | <---- | | <---- | | tail * | | ----> | | | | * +------+ next +-----+ +-----+ * * node2 節點為新入隊節點,此時 tail 已經指向了它,但 node1 後繼引用還未設定。 * 這裡 node1 就是 node 參數,s = node1.next = null,但此時 node1 并不是尾 * 節點。是以這裡不能從前向後周遊同步隊列,應該從後向前。 */ for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒 node 的後繼節點線程 LockSupport.unpark(s.thread); }
到這裡,獨占式擷取同步狀态的分析就講完了。如果僅分析擷取同步狀态的大緻流程,那麼這個流程并不難。但若深入到細節之中,還是需要思考思考。這裡對獨占式擷取同步狀态的大緻流程做個總結,如下:
- 調用 tryAcquire 方法嘗試擷取同步狀态
- 擷取成功,直接傳回
- 擷取失敗,将線程封裝到節點中,并将節點入隊
- 入隊節點在 acquireQueued 方法中自旋擷取同步狀态
- 若節點的前驅節點是頭節點,則再次調用 tryAcquire 嘗試擷取同步狀态
- 擷取成功,目前節點将自己設為頭節點并傳回
- 擷取失敗,可能再次嘗試,也可能會被阻塞。這裡簡單認為會被阻塞。
上面的步驟對應下面的流程圖:
上面流程圖參考自《Java并發程式設計》第128頁圖 5-5,這裡進行了重新繪制,并做了一定的修改。
4.2.2 釋放同步狀态
相對于擷取同步狀态,釋放同步狀态的過程則要簡單的多,這裡簡單羅列一下步驟:
- 調用 tryRelease(arg) 嘗試釋放同步狀态
- 根據條件判斷是否應該喚醒後繼線程
就兩個步驟,下面看一下源碼分析。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; /* * 這裡簡單列舉條件分支的可能性,如下: * 1. head = null * head 還未初始化。初始情況下,head = null,當第一個節點入隊後,head 會被初始 * 為一個虛拟(dummy)節點。這裡,如果還沒節點入隊就調用 release 釋放同步狀态, * 就會出現 h = null 的情況。 * * 2. head != null && waitStatus = 0 * 表明後繼節點對應的線程仍在運作中,不需要喚醒 * * 3. head != null && waitStatus < 0 * 後繼節點對應的線程可能被阻塞了,需要喚醒 */ if (h != null && h.waitStatus != 0) // 喚醒後繼節點,上面分析過了,這裡不再贅述 unparkSuccessor(h); return true; } return false; }
4.3 共享模式分析
與獨占模式不同,共享模式下,同一時刻會有多個線程擷取共享同步狀态。共享模式是實作讀寫鎖中的讀鎖、CountDownLatch 和 Semaphore 等同步元件的基礎,搞懂了,再去了解一些共享同步元件就不難了。
4.3.1 擷取同步狀态
共享類型的節點擷取共享同步狀态後,如果後繼節點也是共享類型節點,目前節點則會喚醒後繼節點。這樣,多個節點線程即可同時擷取共享同步狀态。
public final void acquireShared(int arg) { // 嘗試擷取共享同步狀态,tryAcquireShared 傳回的是整型 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 這裡和前面一樣,也是通過有限次自旋的方式擷取同步狀态 for (;;) { final Node p = node.predecessor(); /* * 前驅是頭結點,其類型可能是 EXCLUSIVE,也可能是 SHARED. * 如果是 EXCLUSIVE,線程無法擷取共享同步狀态。 * 如果是 SHARED,線程則可擷取共享同步狀态。 * 能不能擷取共享同步狀态要看 tryAcquireShared 具體的實作。比如多個線程競争讀寫 * 鎖的中的讀鎖時,均能成功擷取讀鎖。但多個線程同時競争信号量時,可能就會有一部分線 * 程因無法競争到信号量資源而阻塞。 */ if (p == head) { // 嘗試擷取共享同步狀态 int r = tryAcquireShared(arg); if (r >= 0) { // 設定頭結點,如果後繼節點是共享類型,喚醒後繼節點 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * 這個方法做了兩件事情: * 1. 設定自身為頭結點 * 2. 根據條件判斷是否要喚醒後繼節點 */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 設定頭結點 setHead(node); /* * 這個條件分支由 propagate > 0 和 h.waitStatus < 0 兩部分組成。 * h.waitStatus < 0 時,waitStatus = SIGNAL 或 PROPAGATE。這裡僅依賴 * 條件 propagate > 0 判斷是否喚醒後繼節點是不充分的,至于原因請參考第五章 */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; /* * 節點 s 如果是共享類型節點,則應該喚醒該節點 * 至于 s == null 的情況前面分析過,這裡不在贅述。 */ if (s == null || s.isShared()) doReleaseShared(); } } /** * 該方法用于在 acquires/releases 存在競争的情況下,確定喚醒動作向後傳播。 */ private void doReleaseShared() { /* * 下面的循環在 head 節點存在後繼節點的情況下,做了兩件事情: * 1. 如果 head 節點等待狀态為 SIGNAL,則将 head 節點狀态設為 0,并喚醒後繼節點 * 2. 如果 head 節點等待狀态為 0,則将 head 節點狀态設為 PROPAGATE,保證喚醒能夠正 * 常傳播下去。關于 PROPAGATE 狀态的細節分析,後面會講到。 */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } /* * ws = 0 的情況下,這裡要嘗試将狀态從 0 設為 PROPAGATE,保證喚醒向後 * 傳播。setHeadAndPropagate 在讀到 h.waitStatus < 0 時,可以繼續喚醒 * 後面的節點。 */ else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
到這裡,共享模式下擷取同步狀态的邏輯就分析完了,不過我這裡隻做了簡單分析。相對于獨占式擷取同步狀态,共享式的情況更為複雜。獨占模式下,隻有一個節點線程可以成功擷取同步狀态,也隻有擷取已同步狀态節點線程才可以釋放同步狀态。但在共享模式下,多個共享節點線程可以同時獲得同步狀态,在一些線程擷取同步狀态的同時,可能還會有另外一些線程正在釋放同步狀态。是以,共享模式更為複雜。這裡我的腦力跟不上了,沒法面面俱到的分析,見諒。
最後說一下共享模式下擷取同步狀态的大緻流程,如下:
- 擷取共享同步狀态
- 若擷取失敗,則生成節點,并入隊
- 如果前驅為頭結點,再次嘗試擷取共享同步狀态
- 擷取成功則将自己設為頭結點,如果後繼節點是共享類型的,則喚醒
- 若失敗,将節點狀态設為 SIGNAL,再次嘗試。若再次失敗,線程進入等待狀态
4.3.2 釋放共享狀态
釋放共享狀态主要邏輯在 doReleaseShared 中,doReleaseShared 上節已經分析過,這裡就不贅述了。共享節點線程在擷取同步狀态和釋放同步狀态時都會調用 doReleaseShared,是以 doReleaseShared 是多線程競争集中的地方。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
5.PROPAGATE 狀态存在的意義
AQS 的節點有幾種不同的狀态,這個在 4.1 節介紹過。在這幾個狀态中,PROPAGATE 的用途可能是最不好了解的。網上包括一些書籍關于該狀态的叙述基本都是一句帶過,也就是 PROPAGATE 字面意義,即向後傳播喚醒動作。至于怎麼傳播,鮮有資料說明過。不過,好在最終我還是找到了一篇詳細叙述了 PROPAGATE 狀态的文章。在部落格園上,博友 活在夢裡 在他的文章 AbstractQueuedSynchronizer源碼解讀 對 PROPAGATE,以及其他的一些細節進行了說明,很有深度。在欽佩之餘,不由得感歎作者思考的很深入。在征得他的同意後,我将在本節中引用他文章中對 PROPAGATE 狀态說明的部分,并進行一定的補充說明。這裡感謝作者 活在夢裡 的精彩分享,若不參考他的文章,我的這篇文章内容會比較空洞。好了,其他的不多說了,繼續往下分析。
在本節中,将會說明兩個個問題,如下:
- PROPAGATE 狀态用在哪裡,以及怎樣向後傳播喚醒動作的?
- 引入 PROPAGATE 狀态是為了解決什麼問題?
這兩個問題将會在下面兩節中分别進行說明。
5.1 利用 PROPAGATE 傳播喚醒動作
PROPAGATE 狀态是用來傳播喚醒動作的,那麼它是在哪裡進行傳播的呢?答案是在
setHeadAndPropagate
方法中,這裡再來看看 setHeadAndPropagate 方法的實作:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
大家注意看 setHeadAndPropagate 方法中那個長長的判斷語句,其中有一個條件是
h.waitStatus < 0
,當 h.waitStatus = SIGNAL(-1) 或 PROPAGATE(-3) 是,這個條件就會成立。那麼 PROPAGATE 狀态是在何時被設定的呢?答案是在
doReleaseShared
方法中,如下:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {...} // 如果 ws = 0,則将 h 狀态設為 PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } ... } }
再回到 setHeadAndPropagate 的實作,該方法既然引入了
h.waitStatus < 0
這個條件,就意味着僅靠條件
propagate > 0
判斷是否喚醒後繼節點線程的機制是不充分的。至于為啥不充分,請繼續往看下看。
5.2 引入 PROPAGATE 所解決的問題
PROPAGATE 的引入是為了解決一個 BUG -- JDK-6801020,複現這個 BUG 的代碼如下:
import java.util.concurrent.Semaphore; public class TestSemaphore { private static Semaphore sem = new Semaphore(0); private static class Thread1 extends Thread { @Override public void run() { sem.acquireUninterruptibly(); } } private static class Thread2 extends Thread { @Override public void run() { sem.release(); } } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10000000; i++) { Thread t1 = new Thread1(); Thread t2 = new Thread1(); Thread t3 = new Thread2(); Thread t4 = new Thread2(); t1.start(); t2.start(); t3.start(); t4.start(); t1.join(); t2.join(); t3.join(); t4.join(); System.out.println(i); } } }
根據 BUG 的描述消息可知 JDK 6u11,6u17 兩個版本受到影響。那麼,接下來再來看看引起這個 BUG 的代碼 -- JDK 6u17 中 setHeadAndPropagate 和 releaseShared 兩個方法源碼,如下:
private void setHeadAndPropagate(Node node, int propagate) { setHead(node); if (propagate > 0 && node.waitStatus != 0) { /* * Don't bother fully figuring out successor. If it * looks null, call unparkSuccessor anyway to be safe. */ Node s = node.next; if (s == null || s.isShared()) unparkSuccessor(node); } } // 和 release 方法的源碼基本一樣 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
下面來簡單說明 TestSemaphore 這個類的邏輯。這個類持有一個數值為 0 的信号量對象,并建立了4個線程,線程 t1 和 t2 用于擷取信号量,t3 和 t4 則是調用 release() 方法釋放信号量。在一般情況下,TestSemaphore 這個類的代碼都可以正常執行。但當有極端情況出現時,可能會導緻同步隊列挂掉。這裡演繹一下這個極端情況,考慮某次循環時,隊列結構如下:
- 時刻1:線程 t3 調用 unparkSuccessor 方法,head 節點狀态由 SIGNAL(-1) 變為 ,并喚醒線程 t1。此時信号量數值為1。
- 時刻2:線程 t1 恢複運作,t1 調用 Semaphore.NonfairSync 的 tryAcquireShared,傳回 。然後線程 t1 被切換,暫停運作。
- 時刻3:線程 t4 調用 releaseShared 方法,因 head 的狀态為 ,是以 t4 不會調用 unparkSuccessor 方法。
- 時刻4:線程 t1 恢複運作,t1 成功擷取信号量,調用 setHeadAndPropagate。但因為 propagate = 0,線程 t1 無法調用 unparkSuccessor 喚醒線程 t2,t2 面臨無線程喚醒的情況。因為 t2 無法退出等待狀态,是以 t2.join 會阻塞主線程,導緻程式挂住。
下面再來看一下修複 BUG 後的代碼,根據 BUG 詳情頁顯示,該 BUG 在 JDK 1.7 中被修複。這裡找一個 JDK 7 較早版本(JDK 7u10)的代碼看一下,如下:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
在按照上面的代碼演繹一下邏輯,如下:
- ,并喚醒線程t1。此時信号量數值為1。
- 時刻3:線程 t4 調用 releaseShared 方法,檢測到
,t4 将頭節點等待狀态由 設為h.waitStatus = 0
。PROPAGATE(-3)
- 時刻4:線程 t1 恢複運作,t1 成功擷取信号量,調用 setHeadAndPropagate。因 propagate = 0,
條件不滿足。而propagate > 0
,是以條件h.waitStatus = PROPAGATE(-3)
成立。進而,線程 t1 可以喚醒線程 t2,完成喚醒動作的傳播。h.waitStatus < 0
到這裡關于狀态 PROPAGATE 的内容就講完了。最後,簡單總結一下本章開頭提的兩個問題。
問題一:PROPAGATE 狀态用在哪裡,以及怎樣向後傳播喚醒動作的?
答:PROPAGATE 狀态用在 setHeadAndPropagate。當頭節點狀态被設為 PROPAGATE 後,後繼節點成為新的頭結點後。若
propagate > 0
條件不成立,則根據條件
h.waitStatus < 0
成立與否,來決定是否喚醒後繼節點,即向後傳播喚醒動作。
問題二:引入 PROPAGATE 狀态是為了解決什麼問題?
答:引入 PROPAGATE 狀态是為了解決并發釋放信号量所導緻部分請求信号量的線程無法被喚醒的問題。
聲明:
本章内容是在博友 活在夢裡 的文章 AbstractQueuedSynchronizer源碼解讀 基礎上,進行了一定的補充說明。本章所參考的觀點已經過原作者同意,為避免抄襲嫌疑,特此聲明。
6.總結
到這裡,本文就差不多結束了。如果大家從頭看到尾,到這裡就可以放松一下了。寫到這裡,我也可以放松一下了。這篇文章總共花費了我十多天的空閑時間,确實不容易。本來我隻打算講一下基本原理,但知道後來看到本文多次推薦的那篇文章。那篇文章給我的第一感覺是,作者很厲害。第二感覺是,我也要寫出一篇較為深入的 AQS 分析文章。雖然寫出來也不能怎麼樣,水準也不會是以提高多少,也不會去造個類似的輪子。但是寫完後,确實感覺很有成就感。本文的最後,來說一下如何學習 AQS 原理。AQS 的大緻原理不是很難了解,是以一開始不建議糾結細節,應該先弄懂它的大緻原理。在此基礎上,再去分析一些細節,分析細節時,要從多線程的角度去考慮。比如,有點地方 CAS 失敗後要重試,有的不用重試。總體來說 AQS 的大緻原理容易了解,細節部分比較複雜。很多細節要在腦子裡演繹一遍,好好思考才能想通,有點燒腦。另外因為文章篇幅的問題,關于 AQS ConditionObject 部分的分析将會放在下一篇文章中進行。
最後,再向 AQS 的作者 Doug Lea 緻以崇高的敬意。僅盡力弄懂 AQS 的原理都很難了,可想而知,實作 AQS 的難度有多大。
限于本人的能力,加之深入分析 AQS 本身就比較有難度。是以文中難免會有錯誤出現,如果不慎翻車,請見諒。也歡迎在評論區指明這些錯誤,感謝。
參考
- AbstractQueuedSynchronizer源碼解讀 - 活在夢裡
- 《Java并發程式設計的藝術》 - 方騰飛 / 魏鵬 / 程曉明
本文在知識共享許可協定 4.0 下釋出,轉載需在明顯位置處注明出處
作者:coolblog.xyz
本文同步釋出在我的個人部落格:http://www.coolblog.xyz/?r=cb
本作品采用知識共享署名-非商業性使用-禁止演繹 4.0 國際許可協定進行許可。