天天看點

AQS同步元件-CountDownLatch解析和案例

CountDownLatch原理

CountDownLatch是通過一個計數器來實作的,計數器的初始化值為線程的數量。每當一個線程完成了自己的任務後,計數器的值就相應得減1。當計數器到達0時,表示所有的線程都已完成任務,然後在閉鎖上等待的線程就可以恢複執行任務。CountDownLatch可以起到阻塞線程,并保證線程在滿足某種特定的情況後繼續執行。

AQS同步元件-CountDownLatch解析和案例

CountDownLatch 的兩種典型用法

  • 某一線程在開始運作前等待n個線程執行完畢。将 CountDownLatch 的計數器初始化為n :new CountDownLatch(n),每當一個任務線程執行完畢,就将計數器減1 countdownlatch.countDown(),當計數器的值變為0時,在CountDownLatch上 await()的線程就會被喚醒。一個典型應用場景就是啟動一個服務時,主線程需要等待多個元件加載完畢,之後再繼續執行。即可以通過初始化,定義線程個數。
  • 實作多個線程開始執行任務的最大并行性。注意是并行性,不是并發,強調的是多個線程在某一時刻同時開始執行。類似于賽跑,将多個線程放到起點,等待發令槍響,然後同時開跑。做法是初始化一個共享的 CountDownLatch 對象,将其計數器初始化為 1 :new CountDownLatch(1),多個線程在開始執行任務前首先 coundownlatch.await(),當主線程調用 countDown() 時,計數器變為0,多個線程同時被喚醒。

源碼分析

/**
     * 構造器中的計數值(count)實際上就是閉鎖需要等待的線程數量。這個值隻能被設定一次,
     *而且CountDownLatch沒有提供任何機制去重新設定這個計數值。
     *
     * @param count 線上程可以通過 {@link #await} 之前必須調用 {@link #countDown} 的次數
     * @throws IllegalArgumentException 如果給定參數小于0則抛出異常
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
           
/**
     * 使目前線程等待,直到計數器為零,除非目前線程被中斷
     * 1.目前計數為零,則此方法立即傳回
     * 2.如果目前計數大于零,則目前線程将因線程排程目的而被禁用并處于休眠狀态,
     * 直到發生如下可能:
     *	 由于調用了 {@link #countDown} 方法,計數達到零
     *   線程等待被中斷。
     * @throws 如果目前線程在等待時被中斷
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
           
/**    
	 * 可以設定等待時間,超過這個時間就會執行,不會等到計數器變為0,但是
     * 之前給定的線程還是會執行完
     *
     * @param timeout 等待時間長度
     * @param unit 等待時間機關
     * @return 
     * @throws 如果目前線程在等待時被中斷
     */
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
           

與CountDownLatch的第一次互動是主線程等待其他線程。主線程必須在啟動其他線程後立即調用CountDownLatch.await()方法。這樣主線程的操作就會在這個方法上阻塞,直到其他線程完成各自的任務。

其他N 個線程必須引用閉鎖對象,因為他們需要通知CountDownLatch對象,他們已經完成了各自的任務。這種通知機制是通過 CountDownLatch.countDown()方法來完成的;每調用一次這個方法,在構造函數中初始化的count值就減1。是以當N個線程都調用了這個方法,count的值等于0,然後主線程就能通過await()方法,恢複執行自己的任務。

注意:

  • CountDownLatch的構造函數 CountDownLatch countDownLatch = new CountDownLatch(7); //7表示需要等待執行完畢的線程數量。
  • 在每一個線程執行完畢之後,都需要執行 countDownLatch.countDown() 方法,不然計數器就不會準确;
  • 隻有所有的線程執行完畢之後,才會執行 countDownLatch.await() 之後的 代碼;
  • CountDownLatch 阻塞的是主線程;

CountDownLatch 的使用示例

@Slf4j
public class CountDownLatchExample1 {
    /**
     * 線程數量
     */
    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    // 表示一個請求已經完成
                    countDownLatch.countDown();
                }
            });
        }
        //使目前線程等待,直到計數器為零,除非目前線程被中斷
        countDownLatch.await();
        //當這200個請求被處理完成之後,才會執行
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        // 模拟請求的耗時操作
        Thread.sleep(100);
        log.info("{}", threadNum);
        Thread.sleep(100);
    }
}
           

上面的代碼中,我們定義了請求的數量為200,當這200個請求被處理完成之後,才會執行System.out.println("finish");。

//可以設定等待時間,超過這個時間就會執行,不會等到計數器變為0,但是之前給定的線程還是會執行完
countDownLatch.await(20, TimeUnit.MILLISECONDS);
           

上面代碼中其他跟第一個代碼一緻,使用了wait設定等待一定時間後繼續執行方法。

/**
 * CountDownLatch 模拟并發調用多個任務
 *
 * @author zjq
 */
@Slf4j
public class CountDownLatchExample3 {

    public static void main(String[] args) throws InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(2) {
            @Override
            public void await() throws InterruptedException {
                super.await();
                log.info("其他線程執行完畢後主線程執行的内容");
                log.info("threadName:{},", Thread.currentThread().getName() + " count down is ok");
            }
        };

        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    log.info(Thread.currentThread().getName() + "任務已完成");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //計數器減1
                    countDownLatch.countDown();
                }
            }
        }, "thread111");

        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                    log.info(Thread.currentThread().getName() + "任務已完成");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //計數器減1
                    countDownLatch.countDown();
                }
            }
        }, "thread222");


        thread1.start();
        thread2.start();

        countDownLatch.await();
        log.info("====everything is end====");
    }

}
           

上述代碼執行輸出結果:

[thread111] INFO com.zjq.concurrency.aqs.CountDownLatchExample3 - thread111任務已完成
[thread222] INFO com.zjq.concurrency.aqs.CountDownLatchExample3 - thread222任務已完成
[main] INFO com.zjq.concurrency.aqs.CountDownLatchExample3 - 其他線程執行完畢後主線程執行的内容
[main] INFO com.zjq.concurrency.aqs.CountDownLatchExample3 - threadName:main count down is ok,
[main] INFO com.zjq.concurrency.aqs.CountDownLatchExample3 - ====everything is end====
           
/**
 * CountDownLatch 模拟多個任務并發執行完畢後等待主線程發令同時執行後續操作
 *
 * @author zjq
 */
@Slf4j
public class CountDownLatchExample4 {

    public static void main(String[] args) throws InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    //所有線程阻塞在這,等待主線程号令
                    log.info(Thread.currentThread().getName() + "已準備完畢!!");
                    countDownLatch.await();
                    log.info("【" + Thread.currentThread().getName() + "】" + "開始執行……");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        // 主線程準備發令
        Thread.sleep(2000);
        log.info(Thread.currentThread().getName() + "發号施令,給我沖!!");
        // 主線程:執行發令
        countDownLatch.countDown();
    }

}
           

實作最大并行性輸出結果:

[Thread-3] INFO com.zjq.aqs.CountDownLatchExample4 - Thread-3已準備完畢!!
[Thread-0] INFO com.zjq.aqs.CountDownLatchExample4 - Thread-0已準備完畢!!
[Thread-2] INFO com.zjq.aqs.CountDownLatchExample4 - Thread-2已準備完畢!!
[Thread-1] INFO com.zjq.aqs.CountDownLatchExample4 - Thread-1已準備完畢!!
[Thread-4] INFO com.zjq.aqs.CountDownLatchExample4 - Thread-4已準備完畢!!
[main] INFO com.zjq.aqs.CountDownLatchExample4 - main發号施令,給我沖!!
[Thread-3] INFO com.zjq.aqs.CountDownLatchExample4 - 【Thread-3】開始執行……
[Thread-1] INFO com.zjq.aqs.CountDownLatchExample4 - 【Thread-1】開始執行……
[Thread-0] INFO com.zjq.aqs.CountDownLatchExample4 - 【Thread-0】開始執行……
[Thread-2] INFO com.zjq.aqs.CountDownLatchExample4 - 【Thread-2】開始執行……
[Thread-4] INFO com.zjq.aqs.CountDownLatchExample4 - 【Thread-4】開始執行……
           

CountDownLatch 的不足

CountDownLatch是一次性的,計數器的值隻能在構造方法中初始化一次,之後沒有任何機制再次對其設定值,當CountDownLatch使用完畢後,它不能再次被使用。

CountDownLatch應用場景

(1)實作最大的并行性:有時我們想同時啟動多個線程,實作最大程度的并行性。例 如,我們想測試一個單例類。如果我們建立一個初始計數為1的CountDownLatch,并 讓所有線程都在這個鎖上等待,那麼我們可以很輕松地完成測試。我們隻需調用 一次 countDown()方法就可以讓所有的等待線程同時恢複執行。

(2)開始執行前等待n個線程完成各自任務:例如應用程式啟動類要確定在處理使用者 請求前,所有N個外部系統已經啟動和運作了。

(3)死鎖檢測:一個非常友善的使用場景是,你可以使用n個線程通路共享資源,在每次測試階段的線程數目是不同的,并嘗試産生死鎖。