AQS是AbstractQueneSynchronizer抽象類
- 封裝了Node節點,Node是AQS的靜态内部類,Node節點有兩個有參構造方法Node(thread , node){}建立雙向連結清單AQS隊列,Node( thread , int waitstatus){}建立的單向連結清單,等待隊列
- 封裝了ConditionObject,CoditionObject是AQS的内部類,實作了Condition接口
- 包含屬性,head(AQS頭節點)、tail(AQS尾節點)、state()、exclusiveOwnableThread(父類屬性)等等
Condition核心方法await();、signal();
使用生産者,消費者模型,對condition進行分析
生産者Producer
public class Producer implements Runnable {
private Queue<String> queue;
private int maxSize;
private Lock lock ;
private Condition condition;
public Producer(Queue<String> queue, int maxSize, Lock lock, Condition condition) {
super();
this.queue = queue;
this.maxSize = maxSize;
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
int i = 0;
while (true) {
i++;
lock.lock();
while (queue.size()==maxSize) {
System.out.println("消息已滿,請消費消息....");
try {
condition.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("生産第"+i+"條消息");
queue.add(i+"");
condition.signal();
lock.unlock();
}
}
消費者Consumer
public class Consumer implements Runnable {
private Queue<String> queue;
private int maxSize;
private Lock lock ;
private Condition condition;
public Consumer(Queue<String> queue, int maxSize, Lock lock, Condition condition) {
super();
this.queue = queue;
this.maxSize = maxSize;
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
int i = 0;
while (true) {
i++;
lock.lock();
while (queue.isEmpty()) {
System.out.println("消息已清空,等待生産消息");
try {
condition.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("消費第"+i+"條消息");
queue.remove();
condition.signal();
lock.unlock();
}
}
}
線程啟動
public static void main(String[] args) {
Queue< String> queue= new LinkedList<String>();
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
int maxSize = 6;
Consumer consumer = new Consumer(queue, maxSize, lock, condition);
Producer producer = new Producer(queue, maxSize, lock, condition);
Thread pThread = new Thread(producer);
Thread cThread = new Thread(consumer);
pThread.start();
cThread.start();
}
消費者消費消息,隊裡無消息,滿足條件,進入await()方法,使得目前線程釋放所有資源,然後阻塞,檢視源碼
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();//傳回Node單向連結清單節點,将目前線程加入等待隊列
int savedState = fullyRelease(node);//釋放目前的鎖,得到鎖的狀态,并喚醒 AQS 隊列中的一個線程
int interruptMode = 0;
//如果目前節點沒有在同步隊列上,即還沒有被 signal,則将目前線程阻塞
while (!isOnSyncQueue(node)) {//判斷這個節點是否在 AQS 隊列上,第一次判斷的是 false,因為前面已經釋放鎖了
LockSupport.park(this);//通過 park 挂起目前線程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 當這個線程醒來,會嘗試拿鎖, 當 acquireQueued 傳回 false 就是拿到鎖了.
// interruptMode != THROW_IE -> 表示這個線程沒有成功将 node 入隊,但 signal 執行了 enq 方法讓其入隊了.
// 将這個變量設定成 REINTERRUPT.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果 node 的下一個等待者不是 null, 則進行清理,清理 Condition 隊列上的節點.
// 如果是 null ,就沒有什麼好清理的了.
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 如果線程被中斷了,需要抛出異常.或者什麼都不做
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter();//傳回Node單向連結清單節點,将目前線程加入等待隊列
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如 果 lastWaiter 不 等 于 空 并 且waitStatus 不等于 CONDITION 時,把沖好這個節點從連結清單中移除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//建構一個 Node,waitStatus=CONDITION。這裡的連結清單是一個單向的,是以相比 AQS 來說會簡單很多
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
執行完 addConditionWaiter 這個方法之後,就會産生一個 condition 隊列
fullRelease,釋放鎖,如果目前鎖存在多次重入,那麼在這個方法中隻需要釋放一次就會把所有的重入次數歸零
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();//獲得重入的次數
if (release(savedState)) {//釋放鎖并且喚醒下一個同步隊列中的線程
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
isOnSyncQueue(node)//判斷這個節點是否在 AQS 隊列上,第一次判斷的是 false,因為前面已經釋放鎖了
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
正常情況下await()做了這三個基本操作
首先建立Node節點,将目前線程封裝到一個單向連結清單等待隊列中
然後釋放所有資源
最後阻塞處在等待隊列中的目前線程,等待被喚醒
Consumer線程阻塞了,那麼Producer可以搶占鎖了,進入他的同步方法塊,然後Producer調用了signal()方法,喚醒Consumer處于等待隊列的線程,來看看源碼是怎麼實作的
public final void signal() {
if (!isHeldExclusively())//先判斷目前線程是否獲得了鎖,這個判斷比較簡單,直接用獲得鎖的線程和目前線程相比即可
throw new IllegalMonitorStateException();
Node first = firstWaiter;// 拿到 Condition 隊列上第一個節點
if (first != null)
doSignal(first);
}
Condition.doSignal
對 condition 隊列中從首部開始的第一個 condition 狀态的節點,執行 transferForSignal 操作,将 node 從 condition隊列中轉換到 AQS 隊列中,同時修改 AQS 隊列中原先尾節點的狀态
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;// 将 next 節點設定成 null
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
AQS.transferForSignal
該方法先是 CAS 修改了節點狀态,如果成功,就将這個節點放到 AQS 隊列中,然後喚醒這個節點上的線程。
final boolean transferForSignal(Node node) {
//更新節點的狀态為 0,如果更新失敗,隻有一種可能就是節點被 CANCELLED 了,如果是CANCELLED 節點直接傳回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);//調用 enq,把目前節點添加到AQS 隊列。并且傳回傳回按目前節點的上一個節點,也就是原tail 節點
int ws = p.waitStatus;
// 如果上一個節點的狀态被取消了, 或者嘗試設定上一個節點的狀态為 SIGNAL 失敗了
//(SIGNAL 表示: 他的 next 節點需要停止阻塞),
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 喚醒節點上的線程.
return true;//如果 node 的 prev 節點已經是signal 狀态,那麼被阻塞線程的喚醒工作由 AQS 隊列來完成
}
執行完 doSignal 以後,會把 condition 隊列中的節點轉移到 aqs 隊列上,這個時候會判斷 Consumer的 prev 節點也就是 head 節點的 waitStatus,如果大于 0 或者設定 SIGNAL 失敗,表示節點被設定成了 CANCELLED 狀态。這個時候會喚醒Consumer這個線程。否則就基于 AQS 隊列的機制來喚醒,也就是等到 Producer 釋放鎖之後來喚醒 Consumer
前面在分析 await 方法時,線程會被阻塞。而通過 signal被喚醒之後又繼續回到上次執行的邏輯
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
如果目前線程被中斷,則調用transferAfterCancelledWait方法判斷後續的處理,應該是抛出InterruptedException還是重新中斷。這裡需要注意的地方是,如果第一次 CAS 失敗了,則不能判斷目前線程是先進行了中斷還是先進行了 signal 方法的調用,可能是先執行了 signal 然後中斷,也可能是先執行了中斷,後執行了 signal,當然,這兩個操作肯定是發生在 CAS 之前。這時需要做的就是等待目前線程的 node被添加到 AQS 隊列後,也就是 enq 方法傳回後,傳回false 告訴checkInterruptWhileWaiting 方法傳回REINTERRUPT(1),後續進行重新中斷。
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
簡單來說,該方法的傳回值代表目前線程是否在 park 的時候被中斷喚醒,如果為 true 表示中斷在 signal 調用之前,signal 還未執行,那麼這個時候會根據 await 的語義,在 await 時遇到中斷需要抛出interruptedException,傳回 true 就是告訴checkInterruptWhileWaiting 傳回 THROW_IE(-1)。如果傳回 false,否則表示 signal 已經執行過了,隻需要重新響應中斷即可
final boolean transferAfterCancelledWait(Node node) {
//使用 cas 修改節點狀态,如果還能修改成功,說明線程被中斷時,signal 還沒有被調用。
// 這裡有一個知識點,就是線程被喚醒,并不一定是在 java 層面執行了locksupport.unpark,也可能是調用了線程
//的 interrupt()方法,這個方法會更新一個中斷辨別,并且會喚醒處于阻塞狀态下的線程。
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);//如果 cas 成功,則把node 添加到 AQS 隊列
return true;
}
//如果 cas 失敗,則判斷目前 node 是否已經在 AQS 隊列上,如果不在,則讓給其他線程執行
//當 node 被觸發了 signal 方法時,node 就會被加到 aqs 隊列上
while (!isOnSyncQueue(node))//循環檢測 node 是否已經成功添加到 AQS 隊列中。如果沒有,則通過 yield
Thread.yield();
return false;
}
acquireQueued
這個方法在講 aqs 的時候說過,是的目前被喚醒的節點ThreadA 去搶占同步鎖。并且要恢複到原本的重入次數狀态。調用完這個方法之後,AQS 隊列的狀态如下将 head 節點的 waitStatus 設定為-1,Signal 狀态。
reportInterruptAfterWait
根據 checkInterruptWhileWaiting 方法傳回的中斷辨別來進行中斷上報。
如果是 THROW_IE,則抛出中斷異常
如果是 REINTERRUPT,則重新響應中斷
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
流程圖:

await 和 signal 的總結
線程 awaitThread 先通過 lock.lock()方法擷取鎖成功後調用了 condition.await 方法進入等待隊列,而另一個線程 signalThread 通過 lock.lock()方法擷取鎖成功後調用了 condition.signal 或者 signalAll 方法,使得線程awaitThread 能夠有機會移入到同步隊列中,當其他線程釋放 lock 後使得線程 awaitThread 能夠有機會擷取lock,進而使得線程 awaitThread 能夠從 await 方法中退出執行後續操作。如果 awaitThread 擷取 lock 失敗會直接進入到同步隊列。
阻塞:await()方法中,線上程釋放鎖資源之後,如果節點不在 AQS 等待隊列,則阻塞目前線程,如果在等待隊列,則自旋等待嘗試擷取鎖
釋放:signal()後,節點會從 condition 隊列移動到 AQS等待隊列,則進入正常鎖的擷取流程