1. 概述
在AbstractQueuedSynchronizer中,有兩個FIFO隊列,一個是同步隊列,用來排隊申請同步狀态,還有一個是條件等待隊列,當調用了await()系列的方法後,就會在等待隊列尾部插入一個節點,通知喚醒的時候會把這個節點從等待隊列轉移到同步隊列。
本文主要描述條件等待隊列以及等待、通知機制的實作,關于同步隊列的相關操作和實作分析,可以在這篇部落格了解。
2. 分析
ReentrantLock可以實作條件等待,我們可以先調用newCondition()方法生成一個條件對象ConditionObject,然後調用ConditionObject的await()方法即可實作條件等待。
一路跟蹤源碼到AbstractQueuedSynchronizer内部類ConditionObject的await()方法。
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//在等待隊列的尾部添加一個節點
Node node = addConditionWaiter();
//釋放同步隊列中目前節點持有的同步狀态,在這裡會将同步隊列中對應的節點移除
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//如果不在同步隊列裡,阻塞目前線程,等待喚醒
LockSupport.park(this);
//線程被喚醒,是否把這個節點添加到同步隊列中,檢查目前線程的interrupt中斷标記
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//在同步隊列中進行自旋等待同步狀态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清除無效的等待節點
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//進行中斷标記
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
首先進行的是addConditionWaiter()方法。
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
這個方法中,先取到最後一個等待節點t,如果t不為null或者t的狀态不為CONDITION,觸發一次“删除取消等待的節點”操作,也就是unlinkCancelledWaiters()方法,隻要節點的狀态不為CONDITION,該節點就可以被認為取消等待了(等待結束)。删除結束,就新建立一個CONDITION狀态的節點加入到隊列末尾。注意條件等待隊列和同步隊列中的不同,條件等待隊列中的節點隻知道它的下一個節點,并不知道它的上一個節點。
到這裡addConditionWaiter()方法結束,此時已經增加了一個節點到條件等待隊列,并且将新增加的節點傳回。再回到await()方法,接下來需要同步隊列中目前持有同步狀态的節點(頭部節點,對應了目前線程)進行資源的釋放,也就是fullyRelease()方法,進入fullyRelease(),其實就是調用了release()方法,并傳回了釋放前的加鎖的次數。release()方法解析參見這篇部落格。
再傳回到await()方法,接下來是一個while循環,判斷這個節點是否在同步隊列上,如果不在,就進入循環體,通過LockSupport.park()阻塞線程,當線程被喚醒,執行while中的if判斷。
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
* Transfers node, if necessary, to sync queue after a cancelled wait.
* Returns true if thread was cancelled before being signalled.
*
* @param node the node
* @return true if cancelled before the node was signalled
*/
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
如果目前線程被中斷,則調用transferAfterCancelledWait決定外層await()方法是應該抛出InterruptedException異常還是重新調用中斷方法。transferAfterCancelledWait()方法中,如果CAS設定成功,說明中斷之前沒有調用signal()方法(因為signal方法會對節點狀态進行CAS設定,從CONDITION到0),将目前節點添加到同步隊列;否則,循環檢查目前節點是否在同步隊列中,如果不在,就把CPU讓出來,直到被放到同步隊列上時,中止循環,傳回false。
然後回到await()方法,調用acquireQueued(),acquireQueued()中進行自旋等待同步資源(這篇部落格),讓被喚醒的節點進行排隊,并進行中斷标記,如果有下一個條件等待者,這裡會觸發一次“取消等待節點清理”操作。再最後根據中斷标記決定是抛出中斷異常,還是僅僅調用中斷方法。
接着在來看signal()方法,signal()方法喚醒等待的線程後,while中LockSupport.park(this);之後的代碼才可以執行。
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
從signal()方法跟蹤到doSignal()方法,這裡主要進行的操作是,将等待隊列中的第一個等待者移動到同步隊列,當成一個全新的尾節點來進行處理,具體流程也是增加到尾節點、然後進行自旋擷取(這篇部落格),并且這裡将firstWaiter指向原本firstWaiter的下一個等待者,使原本的firstWaiter成為了一個垃圾對象(可被回收)。
signalAll方法也很好了解,直接将所有的等待者喚醒。