在java.util.concurrent包中,有兩個很特殊的工具類,Condition和ReentrantLock,使用過的人都知道,ReentrantLock(重入鎖)是jdk的concurrent包提供的一種獨占鎖的實作。它繼承自Dong Lea的 AbstractQueuedSynchronizer(同步器),确切的說是ReentrantLock的一個内部類繼承了AbstractQueuedSynchronizer,ReentrantLock隻不過是代理了該類的一些方法,可能有人會問為什麼要使用内部類在包裝一層? 我想是安全的關系,因為AbstractQueuedSynchronizer中有很多方法,還實作了共享鎖,Condition(稍候再細說)等功能,如果直接使ReentrantLock繼承它,則很容易出現AbstractQueuedSynchronizer中的API被無用的情況。
言歸正傳,今天,我們讨論下Condition工具類的實作。
ReentrantLock和Condition的使用方式通常是這樣的:
-
public static void main(String[] args) {
-
final ReentrantLock reentrantLock = new ReentrantLock();
-
final Condition condition = reentrantLock.newCondition();
-
Thread thread = new Thread((Runnable) () -> {
-
try {
-
reentrantLock.lock();
-
System.out.println("我要等一個新信号" + this);
-
condition.wait();
-
}
-
catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
System.out.println("拿到一個信号!!" + this);
-
reentrantLock.unlock();
-
}, "waitThread1");
-
thread.start();
-
Thread thread1 = new Thread((Runnable) () -> {
-
reentrantLock.lock();
-
System.out.println("我拿到鎖了");
-
try {
-
Thread.sleep(3000);
-
}
-
catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
condition.signalAll();
-
System.out.println("我發了一個信号!!");
-
reentrantLock.unlock();
-
}, "signalThread");
-
thread1.start();
-
}
運作後,結果如下:
-
我要等一個新信号lock.ReentrantLockTest$1@a62fc3
-
我拿到鎖了
-
我發了一個信号!!
-
拿到一個信号!!
可以看到,
Condition的執行方式,是當線上程1中調用await方法後,線程1将釋放鎖,并且将自己沉睡,等待喚醒,
線程2擷取到鎖後,開始做事,完畢後,調用Condition的signal方法,喚醒線程1,線程1恢複執行。
以上說明Condition是一個多線程間協調通信的工具類,使得某個,或者某些線程一起等待某個條件(Condition),隻有當該條件具備( signal 或者 signalAll方法被帶調用)時 ,這些等待線程才會被喚醒,進而重新争奪鎖。
那,它是怎麼實作的呢?
首先還是要明白,reentrantLock.newCondition() 傳回的是Condition的一個實作,該類在AbstractQueuedSynchronizer中被實作,叫做newCondition()
-
public Condition newCondition() {
-
return sync.newCondition();
-
}
它可以通路AbstractQueuedSynchronizer中的方法和其餘内部類(AbstractQueuedSynchronizer是個抽象類,至于他怎麼能通路,這裡有個很奇妙的點,後面我專門用demo說明 )
現在,我們一起來看下Condition類的實作,還是從上面的demo入手,
為了友善書寫,我将AbstractQueuedSynchronizer縮寫為AQS
當await被調用時,代碼如下:
-
public final void await() throws InterruptedException {
-
if (Thread.interrupted())
-
throw new InterruptedException();
-
Node node = addConditionWaiter(); // 将目前線程包裝下後,
-
// 添加到Condition自己維護的一個連結清單中。
-
int savedState = fullyRelease(node);// 釋放目前線程占有的鎖,從demo中看到,
-
// 調用await前,目前線程是占有鎖的
-
int interruptMode = 0;
-
while (!isOnSyncQueue(node)) {// 釋放完畢後,周遊AQS的隊列,看目前節點是否在隊列中,
-
// 不在 說明它還沒有競争鎖的資格,是以繼續将自己沉睡。
-
// 直到它被加入到隊列中,聰明的你可能猜到了,
-
// 沒有錯,在singal的時候加入不就可以了?
-
LockSupport.park(this);
-
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
-
break;
-
}
-
// 被喚醒後,重新開始正式競争鎖,同樣,如果競争不到還是會将自己沉睡,等待喚醒重新開始競争。
-
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
-
interruptMode = REINTERRUPT;
-
if (node.nextWaiter != null)
-
unlinkCancelledWaiters();
-
if (interruptMode != 0)
-
reportInterruptAfterWait(interruptMode);
-
}
回到上面的demo,鎖被釋放後,線程1開始沉睡,這個時候線程因為線程1沉睡時,會喚醒AQS隊列中的頭結點,所是以線程2會開始競争鎖,并擷取到,等待3秒後,線程2會調用signal方法,“發出”signal信号,signal方法如下:
-
public final void signal() {
-
if (!isHeldExclusively())
-
throw new IllegalMonitorStateException();
-
Node first = firstWaiter; // firstWaiter為condition自己維護的一個連結清單的頭結點,
-
// 取出第一個節點後開始喚醒操作
-
if (first != null)
-
doSignal(first);
-
}
說明下,其實Condition内部維護了等待隊列的頭結點和尾節點,該隊列的作用是存放等待signal信号的線程,該線程被封裝為Node節點後存放于此。
-
public class ConditionObject implements Condition, java.io.Serializable {
-
private static final long serialVersionUID = 1173984872572414699L;
-
/** First node of condition queue. */
-
private transient Node firstWaiter;
-
/** Last node of condition queue. */
-
private transient Node lastWaiter;
關鍵的就在于此,我們知道AQS自己維護的隊列是目前等待資源的隊列,AQS會在資源被釋放後,依次喚醒隊列中從前到後的所有節點,使他們對應的線程恢複執行。直到隊列為空。
而Condition自己也維護了一個隊列,該隊列的作用是維護一個等待signal信号的隊列,兩個隊列的作用是不同,事實上,每個線程也僅僅會同時存在以上兩個隊列中的一個,流程是這樣的:
線程1調用reentrantLock.lock時,線程被加入到AQS的等待隊列中。
線程1調用await方法被調用時,該線程從AQS中移除,對應操作是鎖的釋放。
接着馬上被加入到Condition的等待隊列中,以為着該線程需要signal信号。
線程2,因為線程1釋放鎖的關系,被喚醒,并判斷可以擷取鎖,于是線程2擷取鎖,并被加入到AQS的等待隊列中。
線程2調用signal方法,這個時候Condition的等待隊列中隻有線程1一個節點,于是它被取出來,并被加入到AQS的等待隊列中。 注意,這個時候,線程1 并沒有被喚醒。
signal方法執行完畢,線程2調用reentrantLock.unLock()方法,釋放鎖。這個時候因為AQS中隻有線程1,于是,AQS釋放鎖後按從頭到尾的順序喚醒線程時,線程1被喚醒,于是線程1回複執行。
直到釋放所整個過程執行完畢。
可以看到,整個協作過程是靠結點在AQS的等待隊列和Condition的等待隊列中來回移動實作的,Condition作為一個條件類,很好的自己維護了一個等待信号的隊列,并在适時的時候将結點加入到AQS的等待隊列中來實作的喚醒操作。
看到這裡,signal方法的代碼應該不難了解了。
取出頭結點,然後doSignal
-
public final void signal() {
-
if (!isHeldExclusively()) {
-
throw new IllegalMonitorStateException();
-
}
-
Node first = firstWaiter;
-
if (first != null) {
-
doSignal(first);
-
}
-
}
-
private void doSignal(Node first) {
-
do {
-
if ((firstWaiter = first.nextWaiter) == null) // 修改頭結點,完成舊頭結點的移出工作
-
lastWaiter = null;
-
first.nextWaiter = null;
-
} while (!transferForSignal(first) && // 将老的頭結點,加入到AQS的等待隊列中
-
(first = firstWaiter) != null);
-
}
-
final boolean transferForSignal(Node node) {
-
/*
-
* If cannot change waitStatus, the node has been cancelled.
-
*/
-
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
-
return false;
-
/*
-
* Splice onto queue and try to set waitStatus of predecessor to
-
* indicate that thread is (probably) waiting. If cancelled or attempt
-
* to set waitStatus fails, wake up to resync (in which case the
-
* waitStatus can be transiently and harmlessly wrong).
-
*/
-
Node p = enq(node);
-
int ws = p.waitStatus;
-
// 如果該結點的狀态為cancel 或者修改waitStatus失敗,則直接喚醒。
-
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
-
LockSupport.unpark(node.thread);
-
return true;
-
}
可以看到,正常情況 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)這個判斷是不會為true的,是以,不會在這個時候喚醒該線程。
隻有到發送signal信号的線程調用reentrantLock.unlock()後因為它已經被加到AQS的等待隊列中,是以才會被喚醒。
總結:
本文從代碼的角度說明了Condition的實作方式,其中,涉及到了AQS的很多操作,比如AQS的等待隊列實作獨占鎖功能,不過,這不是本文讨論的重點,等有機會再将AQS的實作單獨分享出來。
原文釋出時間為:2018-10-31
本文作者:Java技術驿站
本文來自雲栖社群合作夥伴“
Java技術驿站”,了解相關資訊可以關注“
”。