天天看點

AQS源碼探究_07 CountDownLatch源碼分析

1、CountDownLatch簡介

CountDownLatch,是一個簡單的同步器,它的含義是允許一個或多個線程等待其它線程的操作執行完畢後再執行後續的操作。

CountDownLatch的通常用法和Thread.join()有點類似,等待其它線程都完成後再執行主任務。

2、入門案例分析

案例1:

對于像我一樣的學生來說,CountDwonLatch的實際開發應用很少,甚至有同學沒有接觸過它。但是在并發條件下,這個類的使用還是很常見的,是以先引入2個案例去了解下它的用途:

借助CountDownLatch,控制主線程等待子線程完成再執行

/**
 * date: 2021/5/7 10:01
 * @author csp
 */
public class CountDownLatchTest01 {
    private static final int TASK_COUNT = 8;
    private static final int THREAD_CORE_SIZE = 10;

    public static void main(String[] args) throws InterruptedException {
        // 執行個體化CountDownLatch,指定初始計數值為TASK_COUNT(8)
        CountDownLatch latch = new CountDownLatch(TASK_COUNT);
        // 通過Executors建立一個初始容量為THREAD_CORE_SIZE(10)的線程池
        // (注意:在阿裡巴巴開發手冊中,建議不要使用Executors直接去建立線程池,
        // 要使用其内部調用的ThreadPoolExecutor去手動設定線程池的相關參數,并建立線程池)
        Executor executor = Executors.newFixedThreadPool(THREAD_CORE_SIZE);

        // 依次向線程池中投入8個執行的線程
        for(int i = 0; i < 8; i++) {
            // i -> taskId 任務id
            // latch -> 同步計數器的值
            executor.execute(new WorkerRunnable(i, latch));
        }

        System.out.println("主線程等待所有子任務完成....");
        long mainWaitStartTimeMillis = System.currentTimeMillis();
        latch.await();
        long mainWaitEndTimeMillis = System.currentTimeMillis();
        System.out.println("主線程等待時長:" + (mainWaitEndTimeMillis - mainWaitStartTimeMillis));
    }
    
    /**
     * 工作線程
     */
    static class WorkerRunnable implements Runnable {
        /**
         * 任務id
         */
        private int taskId;

        /**
         * CountDownLatch同步計數器
         */
        private CountDownLatch latch;

        @Override
        public void run() {
            doWorker();
        }

        /**
         * 工作方法
         */
        public void doWorker() {
            System.out.println("任務ID:" + taskId + ",正在執行任務中....");
            try {
                // 休眠5s,模拟正在處理任務
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
            } finally {
                // latch = latch-1 :
                // 計數器的值latch開始是TASK_COUNT,每執行完一個doWorker方法就-1
                // 直到latch值減小為0,才能繼續執行latch.await();之後的方法
                latch.countDown();
            }
            System.out.println("任務ID:" + taskId + ",任務執行結束!");
        }

        public WorkerRunnable(int taskId, CountDownLatch latch) {
            this.taskId = taskId;
            this.latch = latch;
        }
    }
}
      

運作結果如下:

主線程等待所有子任務完成....
任務ID:0,正在執行任務中....
任務ID:1,正在執行任務中....
任務ID:2,正在執行任務中....
任務ID:4,正在執行任務中....
任務ID:5,正在執行任務中....
任務ID:3,正在執行任務中....
任務ID:6,正在執行任務中....
任務ID:7,正在執行任務中....
任務ID:0,任務執行結束!
任務ID:5,任務執行結束!
任務ID:3,任務執行結束!
任務ID:4,任務執行結束!
任務ID:2,任務執行結束!
任務ID:1,任務執行結束!
任務ID:7,任務執行結束!
任務ID:6,任務執行結束!
主線程等待時長:5000
      

案例2:

  • 執行任務的線程,也可能是多對多的關系:本案例就來了解一下,借助CountDownLatch,使主線程控制子線程同時開啟,主線程再去阻塞等待子線程結束!
/**
 * date: 2021/5/7 10:01
 * @author csp
 */
public class CountDownLatchTest02 {

    // 主線程
    public static void main(String[] args) throws InterruptedException {
        // 開始信号:CountDownLatch初始值為1
        CountDownLatch startSignal = new CountDownLatch(1);
        // 結束信号:CountDownLatch初始值為10
        CountDownLatch doneSignal = new CountDownLatch(10);

        // 開啟10個線程,
        for(int i = 0; i < 10; i++) {
            new Thread(new Worker(i, startSignal, doneSignal)).start();
        }

        // 這裡讓主線程休眠500毫秒,確定所有子線程已經啟動,并且阻塞在startSignal栅欄處
        TimeUnit.MILLISECONDS.sleep(500);

        // 因為startSignal 栅欄值為1,是以主線程隻要調用一次countDown()方法
        // 那麼所有調用startSignal.await()阻塞的子線程,就都可以通過栅欄了
        System.out.println("子任務栅欄已開啟...");
        startSignal.countDown();


        System.out.println("等待子任務結束...");
        long startTime = System.currentTimeMillis();
        // 等待所有子任務結束,主線程再繼續往下執行
        doneSignal.await();
        long endTime = System.currentTimeMillis();
        System.out.println("所有子任務已經運作結束,耗時:" + (endTime - startTime));
    }

    /**
     * 工作線程:子線程
     */
    static class Worker implements Runnable {
        /**
         * 開啟信号
         */
        private final CountDownLatch startSignal;
        /**
         * 結束信号
         */
        private final CountDownLatch doneSignal;
        /**
         * 任務id
         */
        private int id;

        @Override
        public void run() {
            try {
                // 為了讓所有線程同時開始任務,我們讓所有線程先阻塞在這裡(相當于一個栅欄)
                // 等到startSignal值被countDown為0時才往下繼續執行:等大家都準備好了,再打開這個門栓
                startSignal.await();
                System.out.println("子任務-" + id + ",開啟時間:" + System.currentTimeMillis());
                // sleep 5秒,模拟線程處理任務
                doWork();
            } catch (InterruptedException e) {
            }finally {
                doneSignal.countDown();
            }
        }

        private void doWork() throws InterruptedException {
            TimeUnit.SECONDS.sleep(5);
        }

        public Worker(int id, CountDownLatch startSignal, CountDownLatch doneSignal) {
            this.id = id;
            this.startSignal = startSignal;
            this.doneSignal = doneSignal;
        }
    }
}
      

執行結果:

子任務栅欄已開啟...
等待子任務結束...
子任務-9,開啟時間:1620432037554
子任務-8,開啟時間:1620432037554
子任務-3,開啟時間:1620432037554
子任務-4,開啟時間:1620432037554
子任務-1,開啟時間:1620432037554
子任務-0,開啟時間:1620432037554
子任務-5,開啟時間:1620432037554
子任務-7,開啟時間:1620432037554
子任務-2,開啟時間:1620432037554
子任務-6,開啟時間:1620432037554
所有子任務已經運作結束,耗時:5002
      

上面代碼中

startSignal.await();

就相當于一個栅欄,把所有子線程都抵擋在他們的

run

方法,等待主線程執行

startSignal.countDown();

,即關閉栅欄之後,所有子線程在同時繼續執行他們自己的

run

方法,如下圖:

AQS源碼探究_07 CountDownLatch源碼分析

案例3:

/**
 * date: 2021/5/8
 *
 * @author csp
 */
public class CountDownLatchTest03 {
    public static void main(String[] args) {
        // 聲明CountDownLatch計數器,初始值為2
        CountDownLatch latch = new CountDownLatch(2);

        // 任務線程1:
        Thread t1 = new Thread(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException ignore) {
            }
            // 休息 5 秒後(模拟線程工作了 5 秒),調用 countDown()
            latch.countDown();
        }, "t1");

        // 任務線程2:
        Thread t2 = new Thread(() -> {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException ignore) {
            }
            // 休息 10 秒後(模拟線程工作了 10 秒),調用 countDown()
            latch.countDown();
        }, "t2");

        // 線程1、2開始執行
        t1.start();
        t2.start();

        // 任務線程3:
        Thread t3 = new Thread(() -> {
            try {
                // 阻塞,等待 state 減為 0
                latch.await();
                System.out.println("線程 t3 從 await 中傳回了");
            } catch (InterruptedException e) {
                System.out.println("線程 t3 await 被中斷");
                Thread.currentThread().interrupt();
            }
        }, "t3");

        // 任務線程4:
        Thread t4 = new Thread(() -> {
            try {
                // 阻塞,等待 state 減為 0
                latch.await();
                System.out.println("線程 t4 從 await 中傳回了");
            } catch (InterruptedException e) {
                System.out.println("線程 t4 await 被中斷");
                Thread.currentThread().interrupt();
            }
        }, "t4");

        // 線程3、4開始執行
        t3.start();
        t4.start();
    }
}
      
線程 t4 從 await 中傳回了
線程 t3 從 await 中傳回了
      

3、源碼分析

Sync内部類

  • CountDownLatch的Sync内部類繼承AQS
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
    
    // 傳入初始count次數
    Sync(int count) {
        setState(count);
    }
    
    // 擷取還剩的count次數
    int getCount() {
        return getState();
    }
    
    // 嘗試擷取共享鎖
    protected int tryAcquireShared(int acquires) {
        // 注意,這裡state等于0的時候傳回的是1,也就是說count減為0的時候擷取鎖總是成功
        // state不等于0的時候傳回的是-1,也就是count不為0的時候總是要排隊
        return (getState() == 0) ? 1 : -1;
    }
    
    // 嘗試釋放鎖:
    // 更新 AQS.state 值,每調用一次,state值減一,當state -1 正好為0時,傳回true
    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            // 擷取目前state的值(AQS.state)
            int c = getState();
            // 如果state等于0了,說明已釋放鎖,無法再釋放了,這裡傳回false
            if (c == 0)
                return false;
            
            //執行到這裡,說明 state > 0
            // 如果count>0,則将count的值減1
            int nextc = c-1;
            
            // 原子更新state的值:
            // cas成功,說明目前線程執行 tryReleaseShared 方法 c-1之前,沒有其它線程 修改過 state。
            if (compareAndSetState(c, nextc))
                // 減為0的時候傳回true,這時會喚醒後面排隊的線程
                // 說明目前調用 countDown() 方法的線程就是需要觸發 喚醒操作的線程!
                return nextc == 0;
        }
    }
}
      

Sync重寫了

tryAcquireShared()

tryReleaseShared()

方法,并把

count

存到

state

變量中去。這裡要注意一下,上面兩個方法的參數并沒有被用到。

構造方法

// 構造方法需要傳入一個count,也就是初始次數。
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 初始化Sync内部類: 
    this.sync = new Sync(count);
}
      

await()方法

await()方法是等待其它線程完成的方法,它會先嘗試擷取一下共享鎖,如果失敗則進入AQS的隊列中排隊等待被喚醒。

根據上面Sync的源碼,我們知道,state不等于0的時候tryAcquireShared()傳回的是-1,也就是說count未減到0的時候,所有調用await()方法的線程都要排隊。

public void await() throws InterruptedException {
    // 調用AQS的acquireSharedInterruptibly()方法: 
    sync.acquireSharedInterruptibly(1);
}
      

AQS中的

acquireInterruptibly

方法:

// 位于AQS中:可以響應中斷擷取共享鎖的方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // CASE1: Thread.interrupted()
    // 條件成立:說明目前調用await方法的線程已經是中斷狀态,直接抛出異常即可~
    if (Thread.interrupted())
        throw new InterruptedException();
    
    // CASE2: tryAcquireShared(arg) < 0 注意:-1表示擷取到了共享鎖,1表示沒有擷取共享鎖
    // 條件成立:說明目前AQS的state是大于0的,此時将線程入隊,然後等待被喚醒
    // 條件不成立:說明AQS的state = 0,此時就不會阻塞線程:
    // 即,對應業務層面來說,執行任務的線程這時已經将latch打破了。然後其他再調用latch.await的線程,就不會在這裡阻塞了
    if (tryAcquireShared(arg) < 0)
        // 采用共享中斷模式
        doAcquireSharedInterruptibly(arg);
}

// 位于AQS中:采用共享中斷模式
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 将調用latch.await()方法的線程包裝成node加入到AQS的阻塞隊列當中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 擷取目前線程節點的前驅節點
            final Node p = node.predecessor();
            // 條件成立,說明目前線程對應的節點為head.next節點
            if (p == head) {
                // head.next節點就有權利擷取共享鎖了..
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // shouldParkAfterFailedAcquire 會給目前線程找一個好爸爸,最終給爸爸節點設定狀态為 signal(-1),傳回true
            // parkAndCheckInterrupt 挂起目前節點對應的線程...
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
      

圖解分析:

AQS源碼探究_07 CountDownLatch源碼分析

countDown()方法

countDown()方法,會釋放一個共享鎖,也就是count的次數會減1。

根據上面Sync的源碼,我們知道,tryReleaseShared()每次會把count的次數減1,當其減為0的時候傳回true,這時候才會喚醒等待的線程。

注意,doReleaseShared()是喚醒等待的線程,這個方法我們在前面的章節中分析過了。

public void countDown() {
    // 釋放共享鎖
    sync.releaseShared(1);
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer.releaseShared()
public final boolean releaseShared(int arg) {
    // tryReleaseShared(arg) 嘗試釋放共享鎖,如果成功了,就喚醒排隊的線程:
    // 條件成立:說明目前調用latch.countDown() 方法線程正好是 state - 1 == 0 的這個線程,需要做觸發喚醒await狀态的線程。
    if (tryReleaseShared(arg)) {// Sync的内部成員方法
        // 喚醒等待的線程:
        // 調用countDown()方法的線程 隻有一個線程會進入到這個 if塊 裡面,去調用 doReleaseShared() 喚醒 阻塞狀态的線程的邏輯。
        doReleaseShared();
        return true;
    }
    return false;
}

/**
  * 都有哪幾種路徑會調用到doReleaseShared方法呢?
  * 1.latch.countDown() -> AQS.state == 0 -> doReleaseShared() 喚醒目前阻塞隊列内的 head.next 對應的線程。
  * 2.被喚醒的線程 -> doAcquireSharedInterruptibly parkAndCheckInterrupt() 喚醒 -> setHeadAndPropagate() -> doReleaseShared()
  */
// AQS.doReleaseShared
private void doReleaseShared() {
    for (;;) {
        // 擷取目前AQS 内的 頭結點
        Node h = head;
        // 條件一:h != null 成立,說明阻塞隊列不為空..
        // 不成立:h == null 什麼時候會是這樣呢?
        // latch建立出來後,沒有任何線程調用過 await() 方法之前,有線程調用latch.countDown()操作 且觸發了 喚醒阻塞節點的邏輯..

        // 條件二:h != tail 成立,說明目前阻塞隊列内,除了head節點以外  還有其他節點。
        // h == tail  -> head 和 tail 指向的是同一個node對象。 什麼時候會有這種情況呢?
        // 1. 正常喚醒情況下,依次擷取到 共享鎖,目前線程執行到這裡時 (這個線程就是 tail 節點。)
        // 2. 第一個調用await()方法的線程 與 調用countDown()且觸發喚醒阻塞節點的線程 出現并發了..
        //   因為await()線程是第一個調用 latch.await()的線程,此時隊列内什麼也沒有,它需要補充建立一個Head節點,然後再次自旋時入隊
        //   在await()線程入隊完成之前,假設目前隊列内 隻有 剛剛補充建立的空元素 head 。
        //   同期,外部有一個調用countDown()的線程,将state 值從1,修改為0了,那麼這個線程需要做 喚醒 阻塞隊列内元素的邏輯..
        //   注意:調用await()的線程 因為完全入隊完成之後,再次回到上層方法 doAcquireSharedInterruptibly 會進入到自旋中,
        //   擷取目前元素的前驅,判斷自己是head.next, 是以接下來該線程又會将自己設定為 head,然後該線程就從await()方法傳回了...
        if (h != null && h != tail) {
            // 執行到if裡面,說明目前head 一定有 後繼節點!

            int ws = h.waitStatus;
            // 目前head狀态 為 signal 說明 後繼節點并沒有被喚醒過呢...
            if (ws == Node.SIGNAL) {
                // 喚醒後繼節點前 将head節點的狀态改為 0
                // 這裡為什麼,使用CAS呢? 回頭說...
                // 當doReleaseShared方法 存在多個線程 喚醒 head.next 邏輯時,
                // CAS 可能會失敗...
                // 案例:
                // t3 線程在if(h == head) 傳回false時,t3 會繼續自旋. 參與到 喚醒下一個head.next的邏輯..
                // t3 此時執行到 CAS WaitStatus(h,Node.SIGNAL, 0) 成功.. t4 在t3修改成功之前,也進入到 if (ws == Node.SIGNAL) 裡面了
                // 但是t4 修改 CAS WaitStatus(h,Node.SIGNAL, 0) 會失敗,因為 t3 改過了...
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 喚醒後繼節點
                unparkSuccessor(h);
            }

            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }

        // 條件成立:
        // 1.說明剛剛喚醒的 後繼節點,還沒執行到 setHeadAndPropagate方法裡面的 設定目前喚醒節點為head的邏輯。
        // 這個時候,目前線程 直接跳出去...結束了..
        // 此時用不用擔心,喚醒邏輯 在這裡斷掉呢?、
        // 不需要擔心,因為被喚醒的線程 早晚會執行到doReleaseShared方法。

        // 2.h == null  latch建立出來後,沒有任何線程調用過 await() 方法之前,
        // 有線程調用latch.countDown()操作 且觸發了 喚醒阻塞節點的邏輯..
        // 3.h == tail  -> head 和 tail 指向的是同一個node對象

        // 條件不成立:
        // 被喚醒的節點 非常積極,直接将自己設定為了新的head,此時 喚醒它的節點(前驅),執行h == head 條件會不成立..
        // 此時 head節點的前驅,不會跳出 doReleaseShared 方法,會繼續喚醒 新head 節點的後繼...
        if (h == head)                   // loop if head changed
            break;
    }
}
      

CountDownLatch.countDown()

執行流程圖:

AQS源碼探究_07 CountDownLatch源碼分析

總結

CountDownLatch表示允許一個或多個線程等待其它線程的操作執行完畢後再執行後續的操作;

CountDownLatch使用AQS的共享鎖機制實作;

CountDownLatch初始化的時候需要傳入次數count;

每次調用countDown()方法count的次數減1;

每次調用await()方法的時候會嘗試擷取鎖,這裡的擷取鎖其實是檢查AQS的state變量的值是否為0;

當count的值(也就是state的值)減為0的時候會喚醒排隊着的線程(這些線程調用await()進入隊列);

文章參考:小劉講源碼、彤哥讀源碼!這裡吹一波劉哥講的源碼付費課,真的是一行一行的解析(TQL),不愧是架構師老油條!