介紹 AQS
AQS(AbstractQueuedSynchronizer)是 Java 并發包中,實作各種同步元件的基礎。比如
- 各種鎖:ReentrantLock、ReadWriteLock、StampedLock
- 各種線程同步工具類:CountDownLatch、CyclicBarrier、Semaphore
- 線程池中的 Worker
Lock 接口的實作基本都是通過聚合了一個 AQS 的子類來完成線程通路控制的。
Doug Lea 曾經介紹過 AQS 的設計初衷。從原理上,一種同步元件往往是可以利用其他的元件實作的,例如可以使用 Semaphore 實作互斥鎖。但是,對某種同步元件的傾向,會導緻複雜、晦澀的實作邏輯,是以,他選擇了将基礎的同步相關操作抽象在 AbstractQueuedSynchronizer 中,利用 AQS 為我們建構同步元件提供了範本。
如何使用 AQS
利用 AQS 實作一個同步元件,我們至少要實作兩類基本的方法,分别是:
- 擷取資源,需要實作 tryAcquire(int arg) 方法
- 釋放資源,需要實作 tryRelease(int arg) 方法
如果需要共享式擷取 / 釋放資源,需要實作對應的 tryAcquireShared(int arg)、tryReleaseShared(int arg)
AQS 使用的是模闆方法設計模式。AQS 方法的修飾符很有規律,其中,使用 protected 修飾的方法為抽象方法,通常需要子類去實作,進而實作不同的同步元件;使用 public 修飾的方法基本可以認為是模闆方法,不建議子類直接覆寫。
通過調用 AQS 的 acquire(int arg) 方法可以擷取資源,該方法會調用 protected 修飾的 tryAcquire(int arg) 方法,是以我們需要在 AQS 的子類中實作 tryAcquire(int arg),tryAcquire(int arg) 方法的作用是:擷取資源。
目前線程擷取資源并執行了相應邏輯之後,就需要釋放資源,使得後續節點能夠繼續擷取資源。通過調用 AQS 的 release(int arg) 方法可以釋放資源,該方法會調用 protected 修飾的 tryRelease(int arg) 方法,是以我們需要在 AQS 的子類中實作 tryRelease(int arg),tryRelease(int arg) 方法的作用是:釋放資源。
AQS 的實作原理
從實作角度分析 AQS 是如何完成線程通路控制。
AQS 的實作原理可以從 同步阻塞隊列、擷取資源時的執行流程、釋放資源時的執行流程 這 3 個方面介紹。
同步阻塞隊列
AQS 依賴内部的同步阻塞隊列(一個 FIFO 雙向隊列)來完成資源的管理。
同步阻塞隊列的工作機制:
- 節點:同步阻塞隊列中的節點(Node)用來儲存擷取資源失敗的線程引用、等待狀态以及前驅和後繼節點,沒有成功擷取資源的線程将會成為節點加入同步阻塞隊列的尾部,同時會阻塞目前線程(Java 線程處于 WAITING 狀态,釋放 CPU 的使用權)。
- 首節點:同步阻塞隊列遵循 FIFO(先進先出),首節點是擷取資源成功的節點,首節點的線程在釋放資源時,将會喚醒後繼節點,使其再次嘗試擷取資源,而後繼節點将會在擷取資源成功時将自己設定為首節點。
static final class Node {
/**
* Marker to indicate a node is waiting in shared mode
*/
static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
/**
* Marker to indicate a node is waiting in exclusive mode
*/
static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;
/**
* waitStatus value to indicate thread has cancelled
*/
static final int CANCELLED = 1;
/**
* waitStatus value to indicate successor's thread needs unparking
*/
static final int SIGNAL = -1;
/**
* waitStatus value to indicate thread is waiting on condition
*/
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
// 等待狀态
volatile int waitStatus;
// 前驅節點
volatile AbstractQueuedSynchronizer.Node prev;
// 後繼節點
volatile AbstractQueuedSynchronizer.Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
// 條件等待隊列的後繼節點
AbstractQueuedSynchronizer.Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException {
AbstractQueuedSynchronizer.Node p = prev;
if (p == null) throw new NullPointerException();
else return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, AbstractQueuedSynchronizer.Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
等待狀态
在節點中用 volatile int waitStatus 屬性表示節點的等待狀态。
節點有如下幾種等待狀态:
- CANCELLED,值為 1,由于在同步阻塞隊列中等待的線程等待逾時或者被中斷,需要從同步阻塞隊列中取消等待,節點進人該狀态将不會變化
- SIGNAL,值為 -1,後繼節點的線程處于等待狀态,而目前節點的線程如果釋放了同步狀态或者被取消,将會通知後繼節點,使後繼節點的線程得以運作
- CONDITION,值為 -2,節點在條件等待隊列中,節點線程等待在 Condition 上,當其他線程對Condition 調用了 signal() 方法後,該節點将會從條件等待隊列轉移到同步阻塞隊列中,加入到對同步狀态的擷取中
- PROPAGATE,值為 -3,表示下一次共享式同步狀态擷取将會無條件地被傳播下去
- INITIAL,值為 0,初始狀态
擷取資源、釋放資源的執行流程,結論先行:
- 在擷取資源時,擷取資源失敗的線程都會被加入到同步阻塞隊列中,并在隊列中進行自旋;移出隊列(或停止自旋)的條件是前驅節點為頭節點且成功擷取了資源。
- 在釋放資源時,AQS 調用 tryRelease(int arg) 方法釋放資源,然後喚醒頭節點的後繼節點。
擷取資源
下面來介紹擷取資源時的執行流程。
調用 AQS 的 acquire(int arg) 方法可以擷取資源。
acquire(int arg) 方法是獨占式擷取資源,它調用流程如下圖所示。
用文字描述 acquire(int arg) 方法的調用流程:首先調用自定義 AQS 實作的 tryAcquire(int arg) 方法,該方法的作用是嘗試擷取資源:
- 如果擷取資源成功,則直接從 acquire(int arg) 方法傳回
- 如果擷取資源失敗,則構造節點,并将該節點加入到同步阻塞隊列的尾部,最後調用 acquireQueued(Node node,int arg) 方法,使得該節點以“死循環”的方式嘗試擷取資源。隻有目前節點的前驅節點是頭節點,才能嘗試擷取資源。
- 如果目前節點的前驅節點是頭節點,并且擷取資源成功,則設定目前節點為頭節點,并從 acquireQueued(Node node,int arg) 方法傳回
- 如果目前節點的前驅節點不是頭節點 或者 擷取資源失敗,則阻塞目前線程,線程被喚醒後繼續執行該循環操作
acquireQueued(Node node,int arg) 方法的調用過程也被稱為“自旋過程”。
自旋是什麼意思是呢?我的了解就是:自旋就是一個死循環,循環執行某個操作序列,直到滿足某個條件才退出循環。
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire(int arg) 的主要邏輯是:
首先調用自定義 AQS 實作的 tryAcquire(int arg) 方法,該方法保證線程安全的擷取資源:
- 如果擷取資源成功,則直接從 acquire(int arg) 方法傳回
- 如果擷取資源失敗,則構造同步節點(獨占式 Node.EXCLUSIVE,同一時刻隻能有一個線程成功擷取資源)并通過 addWaiter(Node node) 方法将該節點加入到同步阻塞隊列的尾部,最後調用 acquireQueued(Node node,int arg) 方法,使得該節點以“死循環”的方式擷取資源。如果擷取不到則阻塞節點中的線程,而被阻塞線程的喚醒主要依靠 前驅節點的出隊 或 阻塞線程被中斷 來實作。
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在 acquireQueued(final Node node,int arg) 方法中,目前線程在“死循環”中嘗試擷取資源,而隻有前驅節點是頭節點才能夠嘗試擷取資源,這是為什麼?原因有兩個,如下。
- 第一,頭節點是成功擷取到資源的節點,而頭節點的線程釋放了資源之後,将會喚醒其後繼節點,後繼節點的線程被喚醒後需要檢查自己的前驅節點是否是頭節點。
- 第二,維護同步阻塞隊列的 FIFO 原則。
釋放資源
目前線程擷取資源并執行了相應邏輯之後,就需要釋放資源,使得後續節點能夠繼續擷取資源。
下面來介紹釋放資源時的執行流程。
通過調用 AQS 的 release(int arg) 方法可以釋放資源,該方法在釋放資源之後,會喚醒頭節點的後繼節點,進而使後繼節點重新嘗試擷取資源。
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release(int arg) 方法執行時,會喚醒頭節點的後繼節點線程, unparkSuccessor(Node node) 方法使用 LockSupport#unpark() 方法來喚醒處于等待狀态的線程。
共享式 擷取 & 釋放 資源
上面講的是獨占式擷取 / 釋放 資源。
共享式擷取與獨占式擷取最主要的差別在于:同一時刻能否有多個線程同時擷取到資源。以檔案的讀寫為例,如果一個程式在對檔案進行讀操作,那麼這一時刻對于該檔案的寫操作均被阻塞,而讀操作能夠同時進行。寫操作要求對資源的獨占式通路,而讀操作可以是共享式通路。
- 共享式通路資源時,其他共享式的通路均被允許,獨占式通路被阻塞
- 獨占式通路資源時,同一時刻其他通路均被阻塞
共享式擷取資源
調用 AQS 的 acquireShared(int arg) 方法可以共享式地擷取資源。
在 acquireShared(int arg) 方法中,AQS 調用 tryAcquireShared(int arg) 方法嘗試擷取資源, tryAcquireShared(int arg) 方法傳回值為 int 類型,當傳回值 >= 0 時,表示能夠擷取到資源。
可以看到,在 doAcquireShared(int arg) 方法的自旋過程中,如果目前節點的前驅為頭節點時,才能嘗試擷取資源,如果擷取資源成功(傳回值 >= 0),則設定目前節點為頭節點,并從自旋過程中退出。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
共享式釋放資源
調用 releaseShared(int arg) 方法可以釋放資源。該方法在釋放資源之後,會喚醒頭節點的後繼節點,進而使後繼節點重新嘗試擷取資源。
對于能夠支援多個線程同時通路的并發元件(比如 Semaphore),它和獨占式主要差別在于 tryReleaseShared(int arg) 方法必須確定資源安全釋放,因為釋放資源的操作會同時來自多個線程。 確定資源安全釋放一般是通過循環和 CAS 來保證的。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
獨占式逾時擷取資源
調用 AQS 的 doAcquireNanos(int arg,long nanosTimeout) 方法可以逾時擷取資源,即在指定的時間段内擷取資源,如果擷取資源成功則傳回 true,否則傳回 false。
該方法提供了傳統 Java 同步操作(比如 synchronized 關鍵字)所不具備的特性。
在分析該方法的實作前,先介紹一下響應中斷的擷取資源過程。
- 在 Java 5 之前,當一個線程擷取不到鎖而被阻塞在 synchronized 之外時,對該線程進行中斷操作,此時該線程的中斷标志位會被修改,但線程依舊會阻塞在 synchronized 上,等待着擷取鎖。
- 在 Java 5 中,AQS 提供了 acquireInterruptibly(int arg) 方法,這個方法在等待擷取資源時,如果目前線程被中斷,會立刻傳回,并抛出 InterruptedException。
acquire(int arg) 方法對中斷不敏感,也就是由于線程擷取資源失敗後進入同步阻塞隊列中,後續對線程進行中斷操作時,線程不會從同步阻塞隊列中移出。
逾時擷取資源過程可以被視作響應中斷擷取資源過程的“增強版”,doAcquireNanos(int arg,long nanosTimeout) 方法在支援響應中斷的基礎上,增加了逾時擷取的特性。
針對逾時擷取,主要需要計算出需要睡眠的時間間隔 nanosTimeout,為了防止過早通知, nanosTimeout 計算公式為:nanosTimeout -= now - lastTime,其中 now 為目前喚醒時間, lastTime 為上次喚醒時間,如果 nanosTimeout 大于 0 則表示逾時時間未到,需要繼續睡眠 nanosTimeout 納秒,反之,表示已經逾時。
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* Acquires in exclusive timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
該方法在自旋過程中,當節點的前驅節點為頭節點時嘗試擷取資源,如果成功擷取資源則從該方法傳回,這個過程和獨占式同步擷取的過程類似,但是在擷取資源失敗的處理上有所不同。
如果目前線程擷取資源失敗,則判斷是否逾時(nanosTimeout 小于等于 0 表示已經逾時),如果沒有逾時,則重新計算逾時間隔 nanosTimeout,然後使目前線程等待 nanosTimeout 納秒(當已到設定的逾時時間,該線程會從 LockSupport.parkNanos(Object blocker,long nanos)方法傳回)。
如果 nanosTimeout 小于等于 spinForTimeoutThreshold(1000 納秒)時,将不會使該線程進行逾時等待,而是進入快速的自旋過程。原因在于,非常短的逾時等待無法做到十分精确,如果這時再進行逾時等待,相反會讓 nanosTimeout 的逾時從整體上表現得反而不精确。是以,在逾時非常短的場景下,AQS 會進入無條件的快速自旋。
獨占式逾時擷取資源的流程如下所示。
從圖中可以看出,獨占式逾時擷取資源 doAcquireNanos(int arg,long nanosTimeout) 和獨占式擷取資源 acquire(int args)在流程上非常相似,其主要差別在于:未擷取到資源時的處理邏輯。
acquire(int args) 在未擷取到資源時,将會使目前線程一直處于等待狀态,而 doAcquireNanos(int arg,long nanosTimeout) 會使目前線程等待 nanosTimeout 納秒,如果目前線程在 nanosTimeout 納秒内沒有擷取到資源,将會從等待邏輯中自動傳回。
Condition 的實作原理
技術是為了解決問題而生的,通過 Condition 我們可以實作等待 / 通知功能。
ConditionObject 是 AQS 的内部類。每個 Condition 對象都包含着一個條件等待隊列,這個條件等待隊列是 Condition 對象實作等待 / 通知功能的關鍵。
下面我們分析 Condition 的實作原理,主要包括:條件等待隊列、等待 和 通知。
下面提到的 Condition 如果不加說明均指的是 ConditionObject。
條件等待隊列
Condition 依賴内部的條件等待隊列(一個 FIFO 雙向隊列)來實作等待 / 通知功能。
條件等待隊列的工作機制:
- 節點:條件等待隊列中的每個節點(Node)都包含一個線程引用,該線程就是在 Condition 對象上等待的線程,如果一個線程調用了 Condition.await()方法,那麼該線程将會釋放資源、構造成為節點加入條件等待隊列的尾部,同時線程狀态變為等待狀态。
事實上,條件等待隊列中的節點定義複用了 AQS 節點的定義,也就是說,同步阻塞隊列和條件等待隊列中節點類型都是 AQS 的靜态内部類 AbstractQueuedSynchronizer.Node。
在 Object 的螢幕模型上,一個對象擁有一個同步阻塞隊列和一個條件等待隊列,而并發包中的 Lock(更确切地說是 AQS)擁有一個同步阻塞隊列和多個條件等待隊列。
等待
下面來介紹讓線程等待的執行流程。
調用 Condition 的 await() 方法(或者以 await 開頭的方法),将會使目前線程釋放資源、構造成為節點加入條件等待隊列的尾部,同時線程狀态變為等待狀态。
如果從隊列(同步阻塞隊列和條件等待隊列)的角度看 await()方法,當調用 await() 方法時,相當于同步阻塞隊列的首節點(擷取到鎖的節點)移動到 Condition 的條件等待隊列中。并且同步阻塞隊列的首節點并不會直接加入條件等待隊列,而是通過 addConditionWaiter() 方法把目前線程構造成一個新的節點,将其加入條件等待隊列中。
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
通知
下面來介紹喚醒等待線程的執行流程。
調用 Condition 的 signal() 方法,将會喚醒在條件等待隊列中等待時間最長的節點(首節點),在喚醒節點之前,會将目前節點從條件等待隊列移動到同步阻塞隊列中。
條件等待隊列中的節點被喚醒後,被喚醒的線程以“死循環”的方式嘗試擷取資源。成功擷取資源之後,被喚醒的線程将從先前調用的 await() 方法傳回。
如果被喚醒的線程不是通過其他線程調用 Condition.signal() 方法喚醒,而是對等待線程進行中斷,則會抛出InterruptedException。
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}