天天看點

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

Condition隊列原理分析

  • 前言
  • 初識Condition
  • Condition使用示例
  • Condition原理分析
  • condition.wait()源碼解讀
    • AQS#await()
      • AQS#addConditionWaiter()
        • AQS#unlinkCancelledWaiters()
      • AQS#fullyRelease(Node)
      • AQS#isOnSyncQueue(Node)
        • AQS#findNodeFromTail(Node)
  • condition.signal()源碼解讀
    • AQS#signal()
      • AQS#doSignal(Node)
        • AQS#transferForSignal(Node)
    • 回到AQS#await()
      • AQS#checkInterruptWhileWaiting(Node)
        • AQS#checkInterruptWhileWaiting(Node)
    • 繼續回到AQS#await()
  • 總結

前言

每一個Java對象都擁有一組螢幕方法(定義在java.lang.Object上),主要包括wait()、 wait(long timeout)、notify()以及notifyAll()方法,這些方法與synchronized同步關鍵字配合,可以實作線程之間的通信(等待/通知)機制。

在前一篇文章中我們介紹了Lock對象的實作類ReentrantLock和AQS隊列實作原理,而Lock也有自己對應的等待/通知機制Condition隊列,Condition接口也提供了類似Object的螢幕方法,與Lock配合可以實作等待/通知模式,主要通過方法await()和singal()實作。

在學習本篇文章之前,建議先去學一下上一篇文章介紹的ReentrantLock和AQS隊列實作原理。因為本文的内容也離不開AQS和Node對象。

初識Condition

Condition和Lock一樣,也是JUC内的一個接口。Condition接口定義了等待/通知兩種類型的方法,目前線程調用這些方法時,需要提前擷取到 Condition對象關聯的鎖。Condition對象是由Lock對象(調用Lock對象的newCondition()方法)建立出來的,換句話說,Condition是依賴Lock對象的。

Condition的實作類ConditionObject也是AQS類中的一個内内部類,也依賴于Node對象。

Condition使用示例

Condition的使用也非常簡單,下面是一個簡單的使用示例:

package com.zwx.concurrent.lock;

import java.util.Locale;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockConditionDemo {

    public static void main(String[] args) throws InterruptedException {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        new Thread(new ConditionAwait(lock,condition)).start();
        Thread.sleep(1000);
        new Thread(new ConditionSingal(lock,condition)).start();
    }
}

class ConditionAwait implements Runnable{
    private Lock lock;
    private Condition condition;

    public ConditionAwait(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        System.out.println("await begin");
        try {
            lock.lock();
            condition.await();
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        System.out.println("await end");
    }
}

class ConditionSingal implements Runnable{
    private Lock lock;
    private Condition condition;

    public ConditionSingal(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        System.out.println("signal begin");
        try {
            lock.lock();
            condition.signal();
        }finally {
            lock.unlock();
        }
        System.out.println("signal end");
    }
}
           

運作之後,輸出結果為:

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

這個效果就是和wait(),nodity()一樣的,那麼Condition中的等待通知機制是如何實作的呢?

Condition原理分析

Condition接口的實作類ConditionObject是一個多線程協調通信的工具類,可以讓線程一起等待某個條件(condition),隻有滿足條件時,線程才會被喚醒。

和上一篇文章介紹的AQS同步隊列類似,Condition也是一個依賴Node對象建構的FIFO隊列。

Condition隊列,稱之為等待隊列,和AQS隊列不同的是,Condition等待隊列不會維護prev和next,維護的隻是一個單項清單,通過firstWaiter和lastWaiter實作頭尾節點,然後除了lastWaiter節點,其餘每個節點會有一個nextWaiter指向下一個節點,Condition隊列大緻示意圖如下:

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

condition.wait()源碼解讀

接下來讓我們進入源碼層面開始剖析condition的實作原理。上文的示例中,當我們調用condition.wait()時,我們進入AbstractQueuedSynchronizer類中的await()方法。

AQS#await()

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

第一步是檢測是否被中斷,這個就不用多說,我們看下面的addConditionWaiter()方法:

AQS#addConditionWaiter()

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

為了便于了解,我們還是把Node對象貼出來看一看:

static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1;//表示目前線程狀态是取消的
        static final int SIGNAL    = -1;//表示目前線程正在等待鎖
        static final int CONDITION = -2;//Condition隊列初始化Node節點時的預設狀态
        static final int PROPAGATE = -3;//CountDownLatch等工具中使用到,暫時用不到
        volatile int waitStatus;//Node節點中線程的狀态,AQS隊列中預設為0
        volatile Node prev;//目前節點的前一個節點
        volatile Node next;//目前節點的後一個節點
        volatile Thread thread;//目前節點封裝的線程資訊
        Node nextWaiter;//Condition隊列維護
        final boolean isShared() {//暫時用不到
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {//擷取目前節點的上一個節點
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        Node() {
        }
        Node(Thread thread, Node mode) {//構造一個節點:addWaiter方法中會使用,此時waitStatus預設等于0
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { //構造一個節點:Condition中會使用
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
           

需要說明的是,AQS隊列中初始化Node節點的時候不會傳入狀态,是以預設為0,然後我們之前分析的時候知道,中途會被改為1,然後線程異常時候會有3出現,是以AQS隊列中的Node節點實際上隻會有-1,0,1三種狀态,而Condition隊列,初始化的時候調用的是另一個構造器,直接傳入了-2狀态,是以不會有0這個預設狀态,故而Condition隊列中隻會有-2和1兩種狀态。

這裡删除無效節點的方法我們後面再分析,我們現在假設有線程A和線程B,線程A進來的時候因為Condition隊列還沒被初始化,是以執行的是1879和1882兩行代碼,這時候就建構出了這樣的一個Condition隊列:

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

這時候因為隻有一個節點,是以firstWaiter和lastWaiter都是指向同一個節點,而ThreadA節點中這時候nextWaiter是空的,因為這時候還沒有下一個節點。

這時候線程B也進來,那麼就會加入到已經建構好的對象(注意,這兩個線程必須共用一個Lock對象,否則會建構不同的Condition隊列),ThreadB進來就會執行1881和1882兩行代碼,最終得到下面的Condition隊列:

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

Condition建構好了,先不管Node節點狀态是怎麼變成1(cancle)的,我們假如線程B的節點狀态變成1了,然後進入unlinkCancelledWaiters()方法看看是怎麼移除無效節點的,當ThreadB狀态為1,得到如下Condition隊列(和上圖唯一的差別就是ThreadB所在Node狀态變成了1):

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

AQS#unlinkCancelledWaiters()

這個方法的邏輯也不算難,隻要記住兩個屬性:

一個是t,t是需要循環的節點,第一次是firstwaiter,循環完了之後就會把nextWaiter指派給t繼續循環(1933和1945兩行代碼);

另一個是trail,用來記錄已經循環過的節點,循環的時候如果沒有取消的節點,那就是把t循環完之後指派給trail,然後繼續循環

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

這裡我們還是繼續示範一下,第一次循環肯定肯定走的是1944行代碼和1945行代碼,因為firstWaiter肯定不為空,狀态也等于Node.CONDITION,循環結束之後會得到如下結果:t=ThreadB,trail=firstWaiter;

然後繼續循環,這時候因為t狀态是1,是以if條件成立,進入1935行開始執行清除無效節點的邏輯,t.nextWaiter = null;因為目前ThreadB是尾節點,是以這種情況這句話是不起什麼作用的,針對非尾節點,才會有作用。

又因為trail=firstWaiter不等于null,是以會執行1939行代碼(else分支),這時候因為ThreadB線程已經沒有下一個節點了,是以1939行相當于:trail.nextWaiter = null;因為trail=firstWaiter,是以等價于:firstWaiter.nextWaiter=null,于是得到下面的最新Condition隊列:

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

然後執行lastWaiter = trail;等價于lastWaiter = firster;得到如下Condition隊列:

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

可以看到ThreadB這個無效節點已經被清除了。

忘掉這個清除無效節點邏輯,回到我們的正常邏輯,隊列建構完成之後,await()方法會繼續往下面執行:

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

接下來回去執行釋放鎖fullyRelease(Node)的邏輯,因為線程await()方法本來就是要把目前鎖讓給另一個線程,是以肯定要釋放鎖,要不然其他線程不可能獲得鎖。

AQS#fullyRelease(Node)

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

這裡首先會擷取到目前的狀态,然後把狀态傳入elease()方法,前面介紹ReentrantLock的時候,lock.unlock()也會調用這個release(arg)方法,隻不過unlock()是固定傳的1,也就是說如果有重入調用一次隻會state-1,而這裡是直接全部被減去。

這裡就不在介紹release(arg)方法了,沒有了解過的可以看我前面介紹ReentrantLock和AQS的文章。

這裡如果釋放鎖成功之後,又會繼續回到我們的await()方法:

這時候會繼續去執行while循環中的isOnSyncQueue方法,這個方法的意思是判斷一下目前線程所在的Node是不是在AQS同步隊列,那麼為什麼要有這個判斷?

大家注意了,這是在并發場景下,是以也可能會有其他線程已經把線程B喚醒了,喚醒之後并不是說就能直接獲得鎖,而是會去争搶鎖,那麼争搶鎖失敗了就會加入到AQS同步隊列當中,是以這裡要有這個判斷,如果不在AQS同步隊列,那就可以把目前線程挂起了。

AQS#isOnSyncQueue(Node)

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

這裡有一個點需要特别指出的是,Condition隊列的節點,當被其他線程調用了singal()方法喚醒的時候,就需要去争搶鎖,而争搶鎖失敗就有可能被加入到AQS同步隊列,是以這裡才會有prev和next屬性的判斷

還有一個點如果大家不記得之前構造AQS同步隊列的邏輯可能就不太好了解,為了便于大家了解,我把上文介紹AQS同步隊列中的enq代碼片段貼過來解釋一下就很好了解了:

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

上面代碼中如果597行成功,而598行的CAS失敗,那麼這時候node.prev!=null,但是他替換tail節點失敗了,是以等于是沒有加入到AQS同步隊列,是以上面即使node.prev!=null,仍然需要從tail節點周遊一下來确定。

AQS#findNodeFromTail(Node)

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

這段代碼應該很好了解,就不多做解釋了。

回到await()主方法:

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

到這裡,我們的線程B進來的時候肯定是不會在AQS同步隊列中的,搜易進入下一行,目前線程被park()挂起。挂起之後需要等到其他線程調用singal()方法喚醒。

condition.signal()源碼解讀

上文的示例中,當我們調用condition.signal()時,我們進入AbstractQueuedSynchronizer類中的signal()方法。

AQS#signal()

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

這個方法比較簡單,隻是做了個簡單的判斷,我們進入doSignal(Node)方法看看具體是如何喚醒其他線程的。

AQS#doSignal(Node)

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

循環體中主要是判斷目前Condition隊列中第二個節點是否可用,如果可以用,就剔除掉。

而主要的邏輯在while條件當中的transferForSignal(Node),這個就是singal操作的核心代碼了,主要就是将Condition隊列中的Node轉移到AQS同步隊列當中去競争鎖。

這裡經過一次do操作之後實際上已經把原先的firstWaiter節點移除了,因為線程被喚醒後需要加入到AQ同步隊列當中,先把Node移出Condition,後面再調用transferForSignal方法加入AQS同步隊列:

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

注意了,線程被sigal喚醒後并不是說就能直接獲得鎖,還是需要通過競争才可以獲得鎖,是以需要将其轉移到AQS同步隊列去争搶鎖。

AQS#transferForSignal(Node)

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

這裡注釋上都寫明了大緻意思,應該能看的懂,期中enq方法就是将Node節點加入到AQS同步隊列的邏輯,而1710到1712行代碼不要也是可以的,因為我們在lock.lock()和lock.unlock()的時候都有剔除無效節點的操作,這裡這麼做的考慮之一,是可以提升一定的性能,我們假設這個AQS同步隊列當中原先隻有一個節點(除了head哨兵節點),那麼這時候p(即原先的tail)節點是無效節點,這時候重新喚醒目前節點去搶占鎖,而這時候之前持有鎖的線程恰巧釋放了鎖,那麼他就有可能直接搶占成功了。

回到AQS#await()

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

上面我們的線程被挂在了上面的2062行,但是要注意,這裡被喚醒有兩種情況:

  • 被singal()方法喚醒
  • 被interrupt()中斷

    是以喚醒之後第一件事就是要判斷到底是被interrupt()喚醒的還是被singal()喚醒的。

AQS#checkInterruptWhileWaiting(Node)

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

transferAfterCancelledWait(Node)方法主要就是判斷到底是情況2還是情況3。

AQS#checkInterruptWhileWaiting(Node)

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

上面我們可以知道線程恢複到底是先interrupt()還是先singal(),傳回之後回到之前的方法

繼續回到AQS#await()

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

到這裡我們的真個流程分析基本上結束了,後面的acquireQueued方法就是搶占鎖了,搶占鎖的時候如果被中斷了才會傳回true,是以這裡的判斷針對的就是如果搶占鎖被中斷了,而上面的interruptMode=0的情況,我們需要改為REINTERRUPT。再往後就是清除取消的節點,以及根據interruptMode來響應中斷了,reportInterruptAfterWait方法也非常簡單:

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

總結

Condition隊列和AQS同步隊列中的節點共用的是Node對象,通過不同狀态來區分,而一個Node同一時間隻能存在于一個隊列,一個Node從Condition隊列移出加入到AQS同步隊列的流程圖如下:

【并發程式設計系列6】Condition隊列原理及await和singal(等待/喚醒)機制源碼分析前言初識ConditionCondition使用示例Condition原理分析condition.wait()源碼解讀condition.signal()源碼解讀總結

後面将會繼續分析JUC中的其他工具的實作原理,感興趣的 請關注我,和孤狼一起學習進步。

繼續閱讀