天天看點

并發程式設計-16AQS同步元件之CountDownLatch 閉鎖

文章目錄

  • 腦圖
  • CountDownLatch閉鎖
  • 示例
  • Thread#join()
  • CountDownLatch
  • CountDownLatch示例二 await一直等待其他線程執行完
  • CountDownLatch示例三 await指定等待時間
  • 小結
  • 代碼

腦圖

并發程式設計-16AQS同步元件之CountDownLatch 閉鎖
并發程式設計-16AQS同步元件之CountDownLatch 閉鎖
并發程式設計-16AQS同步元件之CountDownLatch 閉鎖

CountDownLatch閉鎖

  • Java 5.0 在 java.util.concurrent 包中提供了多種并發容器類來改進同步容器的性能。
  • CountDownLatch 一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待 ,即CountDownLatch允許一個或多個線程等待其他線程完成操作。
  • 閉鎖可以延遲線程的進度直到其到達終止狀态,閉鎖可以用來確定某些活動直到其他活動都完成才繼續執行:
  • 確定某個計算在其需要的所有資源都被初始化之後才繼續執行;
  • 確定某個服務在其依賴的所有其他服務都已經啟動之後才啟動;
  • 等待直到某個操作所有參與者都準備就緒再繼續執行。

示意圖

并發程式設計-16AQS同步元件之CountDownLatch 閉鎖

示例

假設有個需求: 解析一個Excel裡多個sheet的資料,此時可以考慮使用多線程,每個線程解析一個sheet裡的資料,等到所有的sheet都解析完之後,程式需要提示解析完成。在這個需求中,要實作主線程等待所有線程完成sheet的解析操作。

Thread#join()

package com.artisan.example.aqs;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JoinTest {

  public static void main(String[] args) throws InterruptedException {
    
    // 線程1 處理sheet A

    Thread t1 = new Thread(() -> {
      try {
        log.info(Thread.currentThread().getName() + " parse sheetA start");
        // 休眠1S,模拟業務耗時
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      log.info(Thread.currentThread().getName() + " parse sheetA finish");
    }, "t1");

    
    // 線程2 處理sheet B
    Thread t2 = new Thread(() -> {
      try {
        log.info(Thread.currentThread().getName() + " parse sheetB start");
        // 休眠1S,模拟業務耗時
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      log.info(Thread.currentThread().getName() + " parse sheetB finish");
    }, "t2");
    
    
    
    
    // 開啟線程
    t1.start();
    t2.start();
    
    // join用于讓目前執行線程等待join線程執行結束。其實作原理是不停檢查join線程是否存
    // 活,如果join線程存活則讓目前線程永遠等待。
    t1.join();
    t2.join();
    
    // 直到join線程中止後,線程的this.notifyAll()方法會被調用.
    // 調用notifyAll()方法是在JVM裡實作的,JDK裡看不到

    log.info("t1 t2 完成,繼續其他操作");

  }

}      

輸出:

并發程式設計-16AQS同步元件之CountDownLatch 閉鎖

CountDownLatch

package com.artisan.example.aqs;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CountDownLatchDemo {

  public static void main(String[] args) throws InterruptedException {

//    ExecutorService executorService = Executors.newCachedThreadPool();
    CountDownLatch countDownLatch = new CountDownLatch(2);
//    線程池的寫法 (推薦)
//    executorService.execute(() -> {
//      try {
//        // 休眠1S,模拟業務耗時
//        Thread.sleep(1000);
//      } catch (InterruptedException e) {
//        e.printStackTrace();
//      }
//      countDownLatch.countDown();
//      log.info("parse sheetA");
//    });
//
//    executorService.execute(() -> {
//      try {
//        // 休眠1S,模拟業務耗時
//        Thread.sleep(1000);
//      } catch (InterruptedException e) {
//        e.printStackTrace();
//      }
//      log.info("parse sheetB");
//      countDownLatch.countDown();
//
//    });

    
    Thread t1 = new Thread(() -> {
      try {
        log.info(Thread.currentThread().getName() + " parse sheetA start");
        // 休眠1S,模拟業務耗時
        Thread.sleep(1000);
        // 調用CountDownLatch的countDown方法時,計數器N就會減1,CountDownLatch的await方法
        // 會阻塞目前線程,直到計數器N變成零
        countDownLatch.countDown();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      log.info(Thread.currentThread().getName() + " parse sheetA finish");
    }, "t1");

    Thread t2 = new Thread(() -> {
      try {
        log.info(Thread.currentThread().getName() + " parse sheetB start");
        // 休眠1S,模拟業務耗時
        Thread.sleep(1000);
        // 調用CountDownLatch的countDown方法時,計數器N就會減1,CountDownLatch的await方法
        // 會阻塞目前線程,直到計數器N變成零
        countDownLatch.countDown();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      log.info(Thread.currentThread().getName() + " parse sheetB finish");
    }, "t2");
    
    
    // 開啟線程
    t1.start();
    t2.start();
    
    // 主線程調用await方法 ,等待t1 t2 完成後繼續操作。 即計數器N變為0。 等于0時候,調用await方法時不會
    // 阻塞目前線程
    countDownLatch.await();
    
    log.info("t1 t2 完成,繼續其他操作");

  }

}      

輸出:

并發程式設計-16AQS同步元件之CountDownLatch 閉鎖

CountDownLatch的構造函數接收一個int類型的參數作為計數器,如果你想等待N個點完 成,這裡就傳入N。

當我們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法 會阻塞目前線程,直到N變成零。

由于countDown方法可以用在任何地方,是以這裡說的N個點,可以是N個線程,也可以是1個線程裡的N個執行步驟。

用在多個線程時,隻需要把這個CountDownLatch的引用傳遞到線程裡即可。

CountDownLatch示例二 await一直等待其他線程執行完

再來看個我們之前一直使用的例子

并發程式設計-16AQS同步元件之CountDownLatch 閉鎖

輸出:

14:41:27.453 [pool-1-thread-22] INFO com.artisan.example.aqs.CountDownLatchExample1 - 21
14:41:27.453 [pool-1-thread-5] INFO 
......
......
......
14:41:27.510 [pool-1-thread-177] INFO com.artisan.example.aqs.CountDownLatchExample1 - 176
14:41:27.610 [main] INFO com.artisan.example.aqs.CountDownLatchExample1 - finish      

可以看到finish最後輸出,即等待其他線程全部處理完成後,目前主線程才繼續執行。

CountDownLatch示例三 await指定等待時間

如果有某個線程處理得比較慢,我們不可能讓主線程一直等待,是以可以使用另外一個帶指定時間的await方法——​

​await(long time,TimeUnit unit)​

​,這個方法等待特定時間後,就會不再阻塞目前線程。join也有類似的方法

并發程式設計-16AQS同步元件之CountDownLatch 閉鎖

日志輸出:

并發程式設計-16AQS同步元件之CountDownLatch 閉鎖

小結

  • 計數器必須大于等于0,隻是等于0時候,計數器就是零,調用await方法時不會阻塞目前線程。
  • CountDownLatch不可能重新初始化或者修改CountDownLatch對象的内部計數器的值。如果需要重置,可以使用CyclicBarrier 同步屏障
  • 一個線程調用countDown方法happen-before,另外一個線程調用await

代碼