天天看點

一文讓你深入了解并發工具類CountDownLatch

作者:二進制狂人kxyL

CountDownLatch 概述及使用方式

本篇文章想要講解 JUC 工具類 CountDownLatch,因為 CountDownLatch 提供了簡單有效的線程協調和控制機制,是以實際開發中是比較常用的,是以有必要了解一下 CountDownLatch。

初識 CountDownLatch

CountDownLatch 作為 Java 中的一個同步工具類,用于在多線程間實作協調和控制,允許一個或多個線程等待其他線程完成操作後再繼續執行。

CountDownLatch 内部維護了一個計數器,可以通過構造函數指定初始計數值。當一個線程完成了自己的任務後,可以調用 countDown() 方法将計數值減一。而其他線程可以通過調用 await() 方法等待計數值減為零,然後再繼續執行。

一般情況下,主線程會建立 CountDownLatch 對象,然後傳遞給其他線程。其他線程執行完自己的任務後,調用 countDown() 方法進行計數,主線程調用 await() 方法等待計數值為零。

CountDownLatch 的核心方法

CountDownLatch 提供了四個核心方法來實作線程的協調和控制,核心方法如下:

  1. public CountDownLatch(int count) CountDownLatch 的構造方法,用于建立一個 CountDownLatch 對象,并指定初始計數值(計數值表示需要等待的線程數量)。
  2. public void countDown() 當一個線程完成任務後,可以調用該方法将計數器的值減一(如果計數器的值已經為零,那麼調用該方法沒有任何影響,即計數器的值不會再減,而是一直為零)。
  3. public void await() 當一個線程需要等待其他線程完成任務後再繼續執行時,可以調用該方法進行等待(如果計數器的值已經為零,那麼調用該方法會立即傳回)。 如果在等待過程中,目前線程被中斷,則會抛出 InterruptedException 異常。 需要注意的是調用該方法時,計數器的值應當在所有線程都能夠完成任務後變為零,否則可能導緻線程一直等待或提前繼續執行的問題。
  4. public boolean await(long timeout, TimeUnit unit) 與 await() 方法作用一樣都能使目前線程等待,不同點在于允許設定逾時時間(即如果計數器的值在逾時時間内變為零,那麼方法會傳回 true,否則傳回 false)。 參數中的 timeout 表示逾時時間的數值,unit 表示逾時時間的機關。 如果在等待過程中,目前線程被中斷,則會抛出 InterruptedException 異常。

CountDownLatch 的應用場景

通過上面的介紹,應該能了解到 CountDownLatch 是什麼以及如何使用,接下來通過具體的應用場景來看看 CountDownLatch 都可以在實際開發中起到怎樣的作用。

應用場景一:等待多個線程任務執行完成

場景:如果需要等待多個線程執行完成後,才能進行下一步操作,就可以使用 CountDownLatch 來實作。通過建立一個 CountDownLatch 對象,并将計數器的值初始化為線程數(任務數),每個線程執行完成後,調用 countDown() 方法将計數器減一,主線程通過調用 await() 方法等待所有線程執行完成後執行下一步操作。

示例:有一個主線程需要等待五個子任務(線程)都完成後再進行後續操作(彙總子任務的結果)。

示例代碼:

java複制代碼/**
 * CountDownLatch 示例
 * @author 單程車票
 */
public class CountDownLatchDemo {
    public static void main(String[] args) {
        // 任務數為5
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            int task = i;
            // 建立線程
            new Thread(() -> {
                try {
                    System.out.println("執行任務" + task + "業務");
                    try { TimeUnit.SECONDS.sleep(1);  } catch (InterruptedException e) {e.printStackTrace();}
                } finally {
                    countDownLatch.countDown();
                }
            }).start();
        }
        // 阻塞直到所有任務執行完成或超出逾時時間(30min)
        try {
            countDownLatch.await(30, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("子線程任務完成,主線程合并子線程結果");
    }
}
           

示例結果:

一文讓你深入了解并發工具類CountDownLatch
應用場景二:等待外部資源初始化

場景:當多個線程在執行前需要初始化某個系統元件或外部資源(如資料庫連接配接池)時,可以使用 CountDownLatch 實作。通過主線程建立 CountDownLatch 對象,設定計數值為 1。初始化線程在完成資源初始化後調用 countDown() 方法,然後其他線程通過 await() 方法等待初始化完成後再開始使用資源。

示例:有三個線程等待外部資源初始化線程執行完成後再執行各自線程的業務。

示例代碼:

java複制代碼/**
 * CountDownLatch 示例
 * @author 單程車票
 */
public class CountDownLatchDemo {
    public static void main(String[] args) {
        // 初始計數值為1
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // 三個線程等待外部資源線程初始化後在執行
        for (int i = 0; i < 3; i++) {
            int task = i;
            // 建立線程
            new Thread(() -> {
                // 阻塞直到外部資源初始化完成
                try {
                    countDownLatch.await(30, TimeUnit.MINUTES);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("外部資源初始化完成,執行任務" + task + "業務");
            }).start();
        }
        // 建立線程進行外部資源初始化
        new Thread(() -> {
            try {
                System.out.println("初始化外部資源");
                try { TimeUnit.SECONDS.sleep(1);  } catch (InterruptedException e) {e.printStackTrace();}
            } finally {
                countDownLatch.countDown();
            }
        }).start();
    }
}
           

示例結果:

一文讓你深入了解并發工具類CountDownLatch
應用場景三:控制線程執行順序

場景:當需要保證多個線程按照特定的順序執行時,可以通過 CountDownLatch 實作。主線程可以根據特定執行順序建立多個 CountDownLatch 對象對應多個線程,每個 CountDownLatch 對象的初始計數值都為 1,保證某一時刻隻有指定順序的線程執行,執行完成後,調用下一個 CountDownLatch 對象的 countDown() 方法喚醒下一個指定順序線程執行。

示例:有三個線程,需要按照 3 1 2 的順序依次執行各自線程的業務。

示例代碼:

java複制代碼/**
 * CountDownLatch 示例
 * @author 單程車票
 */
public class CountDownLatchDemo {
    public static void main(String[] args) {
        // 初始計數值為1
        CountDownLatch order1 = new CountDownLatch(1);
        CountDownLatch order2 = new CountDownLatch(1);
        CountDownLatch order3 = new CountDownLatch(1);
        // 三個線程按照 3 1 2 的順序執行
        order3.countDown();  // 開啟多個線程順序執行
        // 建立線程1
        new Thread(() -> {
            // 阻塞直到線程3完成
            try {
                order1.await(30, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                System.out.println("執行任務 1 的業務");
            } finally {
                order2.countDown();
            }
        }).start();
        // 建立線程2
        new Thread(() -> {
            // 阻塞直到線程1完成
            try {
                order2.await(30, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("執行任務 2 的業務");
        }).start();
        // 建立線程3
        new Thread(() -> {
            // 阻塞直到主線程開啟順序執行
            try {
                order3.await(30, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                System.out.println("執行任務 3 的業務");
            } finally {
                order1.countDown();
            }
        }).start();
    }
}
           

示例結果:

一文讓你深入了解并發工具類CountDownLatch

CountDownLatch 的源碼分析

通過前兩部分的内容可以了解到 CountDownLatch 的使用方式和應用場景了,可以看到 CountDownLatch 最為核心的兩個方法是 countDown() 和 await()。接下來通過源碼分析來看看這兩個方法是如何實作的。

通過源碼可以看到 CountDownLatch 其實是基于 AQS 實作的(想進一步了解 AQS 的,可以檢視深入了解AbstractQueuedSynchronizer - 掘金 (juejin.cn)), CountDownLatch 内部通過一個靜态内部類 Sync 繼承 AQS 來實作建構同步鎖。下面從 countDown() 和 await() 這兩個方法開始進行源碼分析。

核心方法一:await()

await() 源碼:

一文讓你深入了解并發工具類CountDownLatch

可以看到 await() 方法中調用了 Sync 的 acquireSharedInterruptibly() 方法,但是 Sync 中并沒有實作該方法,是以實際上調用的是 AQS 中的 acquireSharedInterruptibly() 方法,進入方法:

一文讓你深入了解并發工具類CountDownLatch

方法中先判斷線程是否被中斷,如果被中斷則抛出 InterruptedException 異常,通過調用 tryAcquireShared() 方法嘗試搶占共享鎖,這個方法是 AQS 的抽象方法由子類實作,這裡實際上調用的就是 Sync 的 tryAcquireShared() 方法,進入方法:

一文讓你深入了解并發工具類CountDownLatch

該方法調用 getState() 方法擷取目前計數器的值,并判斷是否為 0,若為 0 則傳回 1,不為 0 則傳回 -1。回到上面的 tryAcquireShared() 中可以看到當計數器的值為 0 時則不需要進入等待隊列,當計數器的值不為 0 時,則調用 doAcquireSharedInterruptibly())。進入方法:

一文讓你深入了解并發工具類CountDownLatch

深入方法代碼可以分為以下幾步:

  • 首先通過 addWaiter() 建構一個共享模式的 Node 并加入等待隊列。
  • 然後通過無限循環,判斷目前節點的前驅節點是否是頭節點(前驅節點為頭節點表示意味着具有嘗試資源擷取的機會) 前驅節點是頭節點,則不斷地嘗試擷取資源(即調用 tryAcquireShared() 這個方法前面有提到,用于判斷計數器的值是否為 0),計數值為 0,則表示擷取資源成功,即線程可以運作,是以執行 setHeadAndPropagate() 将目前節點設定為新的頭結點,并設定 p.next=null 等待 GC 回收。 前驅結點不是頭節點,則執行 shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() 根據一定條件判斷線程是否應該被阻塞并檢查是否發生中斷,等待後續喚醒。
  • 最後的 finally 通過标志 failed (表示是否擷取資源失敗),如果為 true,則執行 cancelAcquire() 方法取消對資源的擷取,并移出等待隊列。

是以這個方法核心為通過無限循環不斷地嘗試擷取共享資源,擷取成功則将目前節點設定為頭結點,擷取失敗則判斷是否需要阻塞并檢查是否被中斷,如果最後擷取失敗,則放棄擷取資源并移出等待隊列。

到這裡就是 await() 方法的整個實作流程了,底層通過調用 AQS 的 doAcquireSharedInterruptibly() 方法以及 CountDownLatch 實作 AQS 的抽象方法 tryAcquireShared() 實作線程阻塞和喚醒。

核心方法二:countDown()

countDown() 源碼:

一文讓你深入了解并發工具類CountDownLatch

可以看到 countDown() 方法中調用了 Sync 的 releaseShared() 方法,但是 Sync 中并沒有實作該方法,是以實際上調用的是 AQS 中的 releaseShared() 方法,進入方法:

一文讓你深入了解并發工具類CountDownLatch

方法中調用 Sync 實作 AQS 的抽象方法 tryReleaseShared() 來進行判斷,進入方法:

一文讓你深入了解并發工具類CountDownLatch

方法中判斷目前計數器值是否為 0,是則傳回 false 不做任何操作,也就是當計數器值為 0 時調用 CountDownLatch() 方法不會做任何操作。不是 0 則進行計數器值減一,并通過 CAS 操作更新計數器值,如果更新後的值為 0,則調用 AQS 内部的 doReleaseShared() 方法釋放共享資源,否則除了更新計數器值之外不做任何操作。進入 doReleaseShared() 方法:

一文讓你深入了解并發工具類CountDownLatch

doReleaseShared() 方法的目的是在釋放共享資源時,確定喚醒等待的線程,并通過循環和 CAS 操作來處理并發情況和頭節點的變化。

到這裡就是 countDown() 方法的整個實作過程了,底層通過 CountDownLatch 實作 AQS 的抽象方法 tryReleaseShared() 采用 CAS 來完成計數器減一,并通過 AQS 的内部方法 doReleaseShared() 實作釋放資源。

來源:https://juejin.cn/post/7256307512243585080

繼續閱讀