前言
建議先看一下這篇分享,深入了解AbstractQueuedSynchronizer,這篇文章主要介紹了AQS的同步隊列實作,而本篇文章主要介紹AQS條件隊列的實作
在進行線程間的通信時,當我們使用synchronized時,可以用基于Object對象的wait和notify方法實作等待/通知機制,但是在AQS相關類中怎麼實作這種等待/通知機制呢?答案是Condition,Condition是一個接,AbstractQueuedSynchronizer中有一個内部類實作了這個接口
基于Object實作等待/通知機制的相關方法

舉個例子
public class WaitNotify {
// 代碼來自《Java并發程式設計的藝術》
static boolean flag = true;
static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Thread waitThread = new Thread(new Wait(), "WaitThread");
waitThread.start();
TimeUnit.SECONDS.sleep(1);
Thread notifyThread = new Thread(new Notify(), "notifyThread");
notifyThread.start();
}
static class Wait implements Runnable {
@Override
public void run() {
synchronized (lock) {
// 條件不滿足時,繼續wait,同時釋放了lock的鎖
while (flag) {
try {
System.out.println(Thread.currentThread() + " flag is true. await @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 條件滿足,完成工作
System.out.println(Thread.currentThread() + " flag is false. running @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}
}
static class Notify implements Runnable {
@Override
public void run() {
synchronized (lock) {
System.out.println(Thread.currentThread() + " hold lock. notify @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.notifyAll();
flag = false;
// 暫停5秒
SleepUtils.second(5);
}
synchronized (lock) {
System.out.println(Thread.currentThread() + " hold lock again. sleep @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
SleepUtils.second(5);
}
}
}
static class SleepUtils {
public static void second(int n) {
try {
TimeUnit.SECONDS.sleep(n);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
複制
Thread[WaitThread,5,main] flag is true. await @ 00:05:55
Thread[notifyThread,5,main] hold lock. notify @ 00:05:55
Thread[notifyThread,5,main] hold lock again. sleep @ 00:06:00
Thread[WaitThread,5,main] flag is false. running @ 00:06:05
複制
這裡有幾個需要注意的點
- 第三行和第四行的順序有可能颠倒,因為是競争擷取鎖的
- wait()方法被執行後,鎖被自動釋放,但notify()方法被執行後,鎖卻不自動釋放 ,必須執行完notify()方法所在的同步synchronized代碼塊後才釋放鎖
基于Condition實作等待/通知機制(包含了Condition接口的所有方法)
Conditon使用例子如下,可以實作條件性的通知
static ReentrantLock lock = new ReentrantLock();
static Condition conditionA = lock.newCondition();
static Condition conditionB = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
Thread waitThreadA = new Thread(new WaitA(), "WaitThreadA");
waitThreadA.start();
Thread waitThreadB = new Thread(new WaitB(), "WaitThreadB");
waitThreadB.start();
TimeUnit.SECONDS.sleep(2);
lock.lock();
try {
conditionA.signal();
} finally {
lock.unlock();
}
}
static class WaitA implements Runnable {
@Override
public void run() {
lock.lock();
try {
System.out.println(Thread.currentThread() + " begin await @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
conditionA.await();
System.out.println(Thread.currentThread() + " end await @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
static class WaitB implements Runnable {
@Override
public void run() {
lock.lock();
try {
System.out.println(Thread.currentThread() + " begin await @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
conditionB.await();
System.out.println(Thread.currentThread() + " end await @ "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
複制
Thread[WaitThreadA,5,main] begin await @ 00:49:57
Thread[WaitThreadB,5,main] begin await @ 00:49:57
Thread[WaitThreadA,5,main] end await @ 00:49:59
複制
WaitThreadB因為沒有被通知,一直阻塞 ,這裡說一下Condition的大概實作,AQS内部維護着一個同步隊列(雙向連結清單實作),多個條件隊列(單向連結清單實作),條件隊列由AQS的内部類ConditionObject來維護,new一個ConditonObject ,則多一個條件隊列,當一個線程執行await方法是,會把當線程包裝成一個Node節點,放到執行await方法的ConditionObject的條件隊列中,釋放鎖并被阻塞,當執行signal方式時,會把條件隊列的第一個節點移除,并轉移到同步隊列中,擷取到鎖即可繼續執行
源碼
基于jdk1.8.0_20 ,Object的螢幕方法和Condition接口的對比
ConditionObject 是AQS的一個内部類,用來實作條件隊列,屬性如下
public class ConditionObject implements Condition, java.io.Serializable {
// 條件隊列的頭節點
private transient Node firstWaiter;
// 條件隊列的尾節點
private transient Node lastWaiter;
public ConditionObject() { }
// 阻塞過程中不響應中斷,僅設定标志位,讓之後的方法處理
private static final int REINTERRUPT = 1;
// 阻塞過程中響應中斷,并throw InterruptedException
private static final int THROW_IE = -1;
}
複制
假如在阻塞過程中發生了中斷,REINTERRUPT标志了中斷發生在 signalled之後,THROW_IE标志了中斷發生在 signalled之前,進而決定采用那種方式響應中斷
來看await方法
// ConditionObject
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 目前線程加入等待隊列
Node node = addConditionWaiter();
// 釋放鎖
int savedState = fullyRelease(node);
// 标志位
int interruptMode = 0;
// 判斷節點是否在同步隊列中,即是否被喚醒
while (!isOnSyncQueue(node)) {
// 阻塞
LockSupport.park(this);
// 線程被喚醒,線程節點從條件隊列移除,并放到放到同步隊列,或被中斷
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 喚醒之後競争擷取鎖
// 擷取鎖的過程中有中斷,并且标志位不是(響應中斷)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
// 清除等待隊列中不是等待狀态的節點
unlinkCancelledWaiters();
// 阻塞中被中斷過,則進行中斷
if (interruptMode != 0)
// 根據标志位,決定對中斷的處理方式
reportInterruptAfterWait(interruptMode);
}
複制
将目前線程包裝成Node節點,并放入等待隊列
// ConditionObject
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
// 清除等待隊列中取消狀态的節點
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 連結清單還沒有初始化
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
複制
釋放鎖
// AbstractQueuedSynchronizer
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
// 釋放鎖失敗
throw new IllegalMonitorStateException();
}
} finally {
// 釋放鎖失敗後,将目前節點狀态設定為CANCELLED
// 後序會被清理出條件隊列
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
複制
判斷節點是否在同步隊列
// AbstractQueuedSynchronizer
final boolean isOnSyncQueue(Node node) {
// 節點在條件隊列
// 同步隊列中節點的狀态 隻能為0、SIGNAL、PROPAGATE 和 CANCELLED 其中之一
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果後繼節點不為null,則表明節點在同步隊列上
// 因為條件隊列使用的是nextWaiter指向後繼節點的
// 條件隊列上節點的next均為null
if (node.next != null) // If has successor, it must be on queue
return true;
// 走到這一步,說明node.prev!=null && node.next=null
// 但這并不能說明node在同步隊列中,因為節點在入隊過程中
// 是先設定node.prev後設定node.next的(詳見addWaiter方法)
// 有可能CAS設定尾節點失敗,導緻沒有加入隊列
// 是以從尾到頭周遊一遍
return findNodeFromTail(node);
}
複制
// AbstractQueuedSynchronizer
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
複制
檢測線程在等待期間是否發生中斷
// ConditionObject
// Checks for interrupt, returning THROW_IE if interrupted
// before signalled, REINTERRUPT if after signalled, or
// 0 if not interrupted.
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
複制
// AbstractQueuedSynchronizer
final boolean transferAfterCancelledWait(Node node) {
// signalled之前發生中斷,因為signalled之後會将會将節點狀态從CONDITION 設定為0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
// signalled之後發生中斷
// 如果節點還沒有被放入同步隊列,則放棄目前CPU資源
// 讓其他任務執行
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
複制
清除條件隊列中取消狀态的節點
// ConditionObject
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
// 指向尾節點
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
// 隻有頭節點的狀态不是CONDITION才會執行到這一步
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
// 周遊完連結清單,設定尾節點
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
複制
響應中斷的方式
// ConditionObject
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
// 直接響應中斷
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
複制
// AbstractQueuedSynchronizer
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
複制
來看signal,喚醒等待時間最長的線程
// ConditionObject
public final void signal() {
// 目前線程沒有擷取到鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 喚醒等待隊列中的頭結點
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
複制
// ConditionObject
private void doSignal(Node first) {
do {
// 将同步隊列的頭結點,設定為目前頭結點的下一個節點
// 如果頭節點的下一個節點為null,則設定尾節點為null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 将first節點從條件隊列中移除
first.nextWaiter = null;
// 通知第一個非CANCELLED節點被喚醒,或者周遊完,退出
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
複制
// 将節點從條件隊列放入同步隊列,true為成功
// AbstractQueuedSynchronizer
final boolean transferForSignal(Node node) {
// 通過CAS将節點的狀态從CONDITION設定為0
// 如果設定失敗,說明這個節點狀态為CANCELLED
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 加入同步隊列,并傳回前繼節點
Node p = enq(node);
int ws = p.waitStatus;
// 前繼節點為CANCELLED狀态,或者設定SIGNAL狀态失敗
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 喚醒線程
LockSupport.unpark(node.thread);
return true;
}
複制
signalAll和signal實作類似,差別如下,signal将等待隊列中的一個非CANCELLED節點放到同步隊列,而signalAll是将等待隊列中的所有非CANCELLED節點放到同步隊列中
參考書籍
《Java并發程式設計的藝術》