天天看點

怎麼了解Condition?

在java.util.concurrent包中,有兩個很特殊的工具類,Condition和ReentrantLock,使用過的人都知道,ReentrantLock(重入鎖)是jdk的concurrent包提供的一種獨占鎖的實作。它繼承自Dong Lea的 AbstractQueuedSynchronizer(同步器),确切的說是ReentrantLock的一個内部類繼承了AbstractQueuedSynchronizer,ReentrantLock隻不過是代理了該類的一些方法,可能有人會問為什麼要使用内部類在包裝一層? 我想是安全的關系,因為AbstractQueuedSynchronizer中有很多方法,還實作了共享鎖,Condition(稍候再細說)等功能,如果直接使ReentrantLock繼承它,則很容易出現AbstractQueuedSynchronizer中的API被無用的情況。

言歸正傳,今天,我們讨論下Condition工具類的實作。

ReentrantLock和Condition的使用方式通常是這樣的:

  1. public static void main(String[] args) {

  2. final ReentrantLock reentrantLock = new ReentrantLock();

  3. final Condition condition = reentrantLock.newCondition();

  4. Thread thread = new Thread((Runnable) () -> {

  5. try {

  6. reentrantLock.lock();

  7. System.out.println("我要等一個新信号" + this);

  8. condition.wait();

  9. }

  10. catch (InterruptedException e) {

  11. e.printStackTrace();

  12. }

  13. System.out.println("拿到一個信号!!" + this);

  14. reentrantLock.unlock();

  15. }, "waitThread1");

  16. thread.start();

  17. Thread thread1 = new Thread((Runnable) () -> {

  18. reentrantLock.lock();

  19. System.out.println("我拿到鎖了");

  20. try {

  21. Thread.sleep(3000);

  22. }

  23. catch (InterruptedException e) {

  24. e.printStackTrace();

  25. }

  26. condition.signalAll();

  27. System.out.println("我發了一個信号!!");

  28. reentrantLock.unlock();

  29. }, "signalThread");

  30. thread1.start();

  31. }

運作後,結果如下:

  1. 我要等一個新信号lock.ReentrantLockTest$1@a62fc3

  2. 我拿到鎖了

  3. 我發了一個信号!!

  4. 拿到一個信号!!

可以看到,

Condition的執行方式,是當線上程1中調用await方法後,線程1将釋放鎖,并且将自己沉睡,等待喚醒,

線程2擷取到鎖後,開始做事,完畢後,調用Condition的signal方法,喚醒線程1,線程1恢複執行。

以上說明Condition是一個多線程間協調通信的工具類,使得某個,或者某些線程一起等待某個條件(Condition),隻有當該條件具備( signal 或者 signalAll方法被帶調用)時 ,這些等待線程才會被喚醒,進而重新争奪鎖。

那,它是怎麼實作的呢?

首先還是要明白,reentrantLock.newCondition() 傳回的是Condition的一個實作,該類在AbstractQueuedSynchronizer中被實作,叫做newCondition()

  1. public Condition newCondition() {

  2. return sync.newCondition();

  3. }

它可以通路AbstractQueuedSynchronizer中的方法和其餘内部類(AbstractQueuedSynchronizer是個抽象類,至于他怎麼能通路,這裡有個很奇妙的點,後面我專門用demo說明 )

現在,我們一起來看下Condition類的實作,還是從上面的demo入手,

為了友善書寫,我将AbstractQueuedSynchronizer縮寫為AQS

當await被調用時,代碼如下:

  1. public final void await() throws InterruptedException {

  2. if (Thread.interrupted())

  3. throw new InterruptedException();

  4. Node node = addConditionWaiter(); // 将目前線程包裝下後,

  5. // 添加到Condition自己維護的一個連結清單中。

  6. int savedState = fullyRelease(node);// 釋放目前線程占有的鎖,從demo中看到,

  7. // 調用await前,目前線程是占有鎖的

  8. int interruptMode = 0;

  9. while (!isOnSyncQueue(node)) {// 釋放完畢後,周遊AQS的隊列,看目前節點是否在隊列中,

  10. // 不在 說明它還沒有競争鎖的資格,是以繼續将自己沉睡。

  11. // 直到它被加入到隊列中,聰明的你可能猜到了,

  12. // 沒有錯,在singal的時候加入不就可以了?

  13. LockSupport.park(this);

  14. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

  15. break;

  16. }

  17. // 被喚醒後,重新開始正式競争鎖,同樣,如果競争不到還是會将自己沉睡,等待喚醒重新開始競争。

  18. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

  19. interruptMode = REINTERRUPT;

  20. if (node.nextWaiter != null)

  21. unlinkCancelledWaiters();

  22. if (interruptMode != 0)

  23. reportInterruptAfterWait(interruptMode);

  24. }

回到上面的demo,鎖被釋放後,線程1開始沉睡,這個時候線程因為線程1沉睡時,會喚醒AQS隊列中的頭結點,所是以線程2會開始競争鎖,并擷取到,等待3秒後,線程2會調用signal方法,“發出”signal信号,signal方法如下:

  1. public final void signal() {

  2. if (!isHeldExclusively())

  3. throw new IllegalMonitorStateException();

  4. Node first = firstWaiter; // firstWaiter為condition自己維護的一個連結清單的頭結點,

  5. // 取出第一個節點後開始喚醒操作

  6. if (first != null)

  7. doSignal(first);

  8. }

說明下,其實Condition内部維護了等待隊列的頭結點和尾節點,該隊列的作用是存放等待signal信号的線程,該線程被封裝為Node節點後存放于此。

  1. public class ConditionObject implements Condition, java.io.Serializable {

  2. private static final long serialVersionUID = 1173984872572414699L;

  3. /** First node of condition queue. */

  4. private transient Node firstWaiter;

  5. /** Last node of condition queue. */

  6. private transient Node lastWaiter;

關鍵的就在于此,我們知道AQS自己維護的隊列是目前等待資源的隊列,AQS會在資源被釋放後,依次喚醒隊列中從前到後的所有節點,使他們對應的線程恢複執行。直到隊列為空。

而Condition自己也維護了一個隊列,該隊列的作用是維護一個等待signal信号的隊列,兩個隊列的作用是不同,事實上,每個線程也僅僅會同時存在以上兩個隊列中的一個,流程是這樣的:

線程1調用reentrantLock.lock時,線程被加入到AQS的等待隊列中。

線程1調用await方法被調用時,該線程從AQS中移除,對應操作是鎖的釋放。

接着馬上被加入到Condition的等待隊列中,以為着該線程需要signal信号。

線程2,因為線程1釋放鎖的關系,被喚醒,并判斷可以擷取鎖,于是線程2擷取鎖,并被加入到AQS的等待隊列中。

線程2調用signal方法,這個時候Condition的等待隊列中隻有線程1一個節點,于是它被取出來,并被加入到AQS的等待隊列中。 注意,這個時候,線程1 并沒有被喚醒。

signal方法執行完畢,線程2調用reentrantLock.unLock()方法,釋放鎖。這個時候因為AQS中隻有線程1,于是,AQS釋放鎖後按從頭到尾的順序喚醒線程時,線程1被喚醒,于是線程1回複執行。

直到釋放所整個過程執行完畢。

可以看到,整個協作過程是靠結點在AQS的等待隊列和Condition的等待隊列中來回移動實作的,Condition作為一個條件類,很好的自己維護了一個等待信号的隊列,并在适時的時候将結點加入到AQS的等待隊列中來實作的喚醒操作。

看到這裡,signal方法的代碼應該不難了解了。

取出頭結點,然後doSignal

  1. public final void signal() {

  2. if (!isHeldExclusively()) {

  3. throw new IllegalMonitorStateException();

  4. }

  5. Node first = firstWaiter;

  6. if (first != null) {

  7. doSignal(first);

  8. }

  9. }

  10. private void doSignal(Node first) {

  11. do {

  12. if ((firstWaiter = first.nextWaiter) == null) // 修改頭結點,完成舊頭結點的移出工作

  13. lastWaiter = null;

  14. first.nextWaiter = null;

  15. } while (!transferForSignal(first) && // 将老的頭結點,加入到AQS的等待隊列中

  16. (first = firstWaiter) != null);

  17. }

  18. final boolean transferForSignal(Node node) {

  19. /*

  20. * If cannot change waitStatus, the node has been cancelled.

  21. */

  22. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

  23. return false;

  24. /*

  25. * Splice onto queue and try to set waitStatus of predecessor to

  26. * indicate that thread is (probably) waiting. If cancelled or attempt

  27. * to set waitStatus fails, wake up to resync (in which case the

  28. * waitStatus can be transiently and harmlessly wrong).

  29. */

  30. Node p = enq(node);

  31. int ws = p.waitStatus;

  32. // 如果該結點的狀态為cancel 或者修改waitStatus失敗,則直接喚醒。

  33. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

  34. LockSupport.unpark(node.thread);

  35. return true;

  36. }

可以看到,正常情況 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)這個判斷是不會為true的,是以,不會在這個時候喚醒該線程。

隻有到發送signal信号的線程調用reentrantLock.unlock()後因為它已經被加到AQS的等待隊列中,是以才會被喚醒。

總結:

本文從代碼的角度說明了Condition的實作方式,其中,涉及到了AQS的很多操作,比如AQS的等待隊列實作獨占鎖功能,不過,這不是本文讨論的重點,等有機會再将AQS的實作單獨分享出來。

原文釋出時間為:2018-10-31

本文作者:Java技術驿站

本文來自雲栖社群合作夥伴“

Java技術驿站

”,了解相關資訊可以關注“

”。