天天看點

常用線程同步類CountDownLatch、CyclicBarrier用法引子CountDownLatchCyclicBarrier

引子

随着時代的發展,Object當初的抽象模型部分不适用當下的技術潮流,比如finalize()

方法在JDK9 之後直接被标記為過時方法。而wait()和notify() 同步方式事實上已經

被同步信号、鎖、阻塞集合等取代。

                                                                                                           —— 《碼出高效》

那麼,如何用好線程同步類将是關鍵,以下将詳細給大家介紹幾個線程同步類的詳細使用場景,友善大家在工作中使用。

CountDownLatch

功能講解

常用于監聽某些初始化操作,等待其他子線程初始化執行完畢,主線程再繼續執行。

大體步驟為:

1、向CountDownLatch對象設定一個初始計數值

2、主線程通過CountDownLatch#await()方法進行阻塞

3、其他子線程初始化執行完畢後,調用CountDownLatch#countDown()對計數值減1,直到計數值降為0,此時表示所有資源初始化完畢。

4、主線程的CountDownLatch#await()方法不再阻塞,主線程繼續往下走。

栗子1

這裡通過Future#get()進行子線程是否異常的判斷,如果有子線程異常,則直接往上抛出異常。Future#get()觸發異常可參考blog:https://blog.csdn.net/y124675160/article/details/104399114 中關于CompleteService源碼解析的部分。

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

/**
 * @author tingyu
 * @date 2020/12/12
 */

@Slf4j
public class CountDownLatchDemo {

    private static ThreadFactory threadFactory =
            new ThreadFactoryBuilder().setNameFormat("sourceInitExecutor").build();
    // 根據實際場景進行線程池大小等配置
    // 如果任務多數為排隊執行,對隊列要求高的,建議使用MQ的隊列,将排隊的壓力交給雲端,線程池隻做執行使用
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10,
            60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), threadFactory);

    public static void main(String[] args) throws Exception {

        CountDownLatch countDownLatch = new CountDownLatch(3);
        List<Future<Integer>> sourceFutureList = new ArrayList<>(10);
        sourceFutureList.add(threadPoolExecutor.submit(new SourceCallable(countDownLatch, "SourceA")));
        sourceFutureList.add(threadPoolExecutor.submit(new SourceCallable(countDownLatch, "SourceB")));
        sourceFutureList.add(threadPoolExecutor.submit(new SourceCallable(countDownLatch, "SourceC")));
        // 主線程在此處阻塞,直到所有子線程資源初始化完畢!
        countDownLatch.await(10, TimeUnit.SECONDS);
        sourceFutureList.forEach(future -> {
            try {
                future.get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        log.info("資源準備完畢,主線程繼續向下執行!");
    }

    static class SourceCallable implements Callable<Integer> {

        static Random random = new Random();

        private CountDownLatch countDownLatch;
        private String name;

        SourceCallable(CountDownLatch countDownLatch, String name) {
            this.countDownLatch = countDownLatch;
            this.name = name;
        }

        @Override
        public Integer call() {
            try {
                // 開始資源初始化
                /********** 模拟資源初始化 ***************/
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                log.info("【{}】資源初始化完畢!", name);
            } catch (Exception e) {
                // 執行一些需要清理的工作
                /********** 模拟資源清理 ***************/
                log.error("【{}】資源初始化異常!", name, e);
                throw new RuntimeException(e);
            }finally {
                countDownLatch.countDown();
            }
            return 1;
        }
    }
}
           

Copy

栗子2

上家公司的時候用CountDownLatch實作過一個功能,詳情可參考:https://www.atatech.org/articles/162136

注意事項

CountDownLatch被設計為隻觸發一次,計數器不能被重置。如果需要能被重置計數器的版本,則可以使用CyclicBarrier。

countDown調用時要預防前面的代碼抛異常,導緻countDown未能執行,進而導緻await一直阻塞。是以可以在異常捕獲内部再調用一次,或者統一在異常捕獲之後進行調用,或者在finally裡面執行。

CyclicBarrier

功能講解

适用于你希望建立一組任務,它們并行地執行工作,然後在進行下一個步驟之前等待,直到所有任務都完成。它使得所有的并行任務都将在栅欄處列隊,是以可以一緻地向前移動。這非常像CountDownLatch,隻是CountDownLatch是隻觸發一次的事件,而CyclicBarrier可以多次重用。

栗子1

在上家公司時,由于機構 DIY 課程定制需要從固定課程複制,而複制需要調用三個小組的微服務,三個微服務分别為 課程建立、講次建立、卷子建立,講次挂在課程上,卷子挂在講次上,關系如下:

常用線程同步類CountDownLatch、CyclicBarrier用法引子CountDownLatchCyclicBarrier

但是後面發現這樣耗時太長,是以改為使用id生成器提前生成好班型id,再通過傳入班型id,同時進行班型、講次、卷子的複制,這時候需要保證3個服務中任何一個出現異常的時候,所有已經建立完成的任務資料需要復原(此處的復原為調用對應微服務提供的資料清除接口進行)。

一開始使用CompleteService + CountDownLatch + 線程池實作,但後面發現實作複雜,并且線程池隻做通信,并沒有發揮真正的作用,是以之後使用CyclicBarrier實作了一個優化版本,以下為優化版本的示例。 (CountDownLatch版可參考blog:https://blog.csdn.net/y124675160/article/details/104399114 其中還有關于ExecutorCompletionService的源碼講解)

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.concurrent.*;

/**
 * @author tingyu
 * @date 2020/12/12
 */

@Slf4j
public class ClassTypeCreateDemo {

    private static ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("ClassTypeCreateExecutor").build();
    // 初始化線程池,這裡的線程數等配置根據實際場景進行配置,由于此處多數為調用遠端微服務作業,為IO密集型,可以設定大一點。
    // 如果任務多數為排隊執行,對隊列要求高的,建議使用MQ的隊列,将排隊的壓力交給雲端,線程池隻做執行使用
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(20, 20,
            60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), threadFactory);

    public static void main(String[] args) throws InterruptedException {

        // 設定任務執行結果的初始值
        Map<String, Boolean> execResultMap = new HashMap<>(16);
        execResultMap.put("ClassType", true);
        execResultMap.put("Lesson", true);
        execResultMap.put("LessonPaper", true);

        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        // 講次建立
        threadPoolExecutor.submit(new LessonRunnable(cyclicBarrier, "Lesson", execResultMap));
        // 卷子建立
        threadPoolExecutor.submit(new LessonPaperRunnable(cyclicBarrier, "LessonPaper", execResultMap));
        // 課程建立
        Future<Integer> claaTypeFuture =
                threadPoolExecutor.submit(new ClassTypeCallable(cyclicBarrier, "ClassType", execResultMap));

        try {
            Integer classTypeId = claaTypeFuture.get();
            log.info("課程建立成功,課程ID為:{}", classTypeId);
        } catch (ExecutionException e) {
            // 課程建立失敗
            log.error("課程建立失敗!原因為:{}", e.getMessage(), e);
            throw new RuntimeException("課程建立失敗!", e);
        }
    }

    /**
     * 課程建立任務
     */
    static class ClassTypeCallable implements Callable<Integer> {

        static Random random = new Random();

        private CyclicBarrier cyclicBarrier;
        private String name;
        private Map<String, Boolean> execResultMap;

        ClassTypeCallable(CyclicBarrier cyclicBarrier, String name, Map<String, Boolean> execResultMap) {
            this.cyclicBarrier = cyclicBarrier;
            this.name = name;
            this.execResultMap = execResultMap;
        }

        @Override
        public Integer call() {

            Integer classTypeId = null;
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                // 模拟課程建立成功後的id指派
                classTypeId = random.nextInt(10000);
                log.info("課程建立成功!");
            } catch (Exception e) {
                log.error("課程建立異常!", e);
                // 将ClassType的處理狀态設定為false
                execResultMap.put(name, false);
            }

            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                log.warn("【{}】調用await異常!", name, e);
            }

            Set<Map.Entry<String, Boolean>> entries = execResultMap.entrySet();
            entries.forEach(entry -> {
                if(!entry.getValue()) {
                    log.warn("【{}】任務建立異常,【{}】進行復原!", entry.getKey(), name);
                    /********** 資料清理的操作模拟 ****************/
                    throw new RuntimeException("【" + entry.getKey() + "】任務建立異常!");
                }
            });
            return classTypeId;
        }
    }

    /**
     * 講次建立任務
     */
    static class LessonRunnable implements Runnable {

        static Random random = new Random();

        private CyclicBarrier cyclicBarrier;
        private String name;
        private Map<String, Boolean> execResultMap;

        LessonRunnable(CyclicBarrier cyclicBarrier, String name, Map<String, Boolean> execResultMap) {
            this.cyclicBarrier = cyclicBarrier;
            this.name = name;
            this.execResultMap = execResultMap;
        }

        @Override
        public void run() {
            try {
                // 開始資源初始化
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                log.info("講次建立成功!");
            } catch (Exception e) {
                log.error("講次建立異常!", e);
                // 将Lesson的處理狀态設定為false
                execResultMap.put(name, false);
            }

            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                log.warn("【{}】調用await異常!", name, e);
            }

            Set<Map.Entry<String, Boolean>> entries = execResultMap.entrySet();
            entries.forEach(entry -> {
                if(!entry.getValue()) {
                    log.warn("【{}】任務建立異常,【{}】進行復原!", entry.getKey(), name);
                    /********** 資料清理的操作模拟 ****************/
                }
            });
        }
    }

    /**
     * 卷子建立任務
     */
    static class LessonPaperRunnable implements Runnable {

        static Random random = new Random();

        private CyclicBarrier cyclicBarrier;
        private String name;
        private Map<String, Boolean> execResultMap;

        LessonPaperRunnable(CyclicBarrier cyclicBarrier, String name, Map<String, Boolean> execResultMap) {
            this.cyclicBarrier = cyclicBarrier;
            this.name = name;
            this.execResultMap = execResultMap;
        }

        @Override
        public void run() {
            try {
                // 開始資源初始化
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                log.info("卷子建立成功!");
            } catch (Exception e) {
                log.error("卷子建立異常!", e);
                // 将LessonPaper的處理狀态設定為false
                execResultMap.put(name, false);
            }

            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                log.warn("【{}】調用await異常!", name, e);
            }

            Set<Map.Entry<String, Boolean>> entries = execResultMap.entrySet();
            entries.forEach(entry -> {
                if(!entry.getValue()) {
                    log.warn("【{}】任務建立異常,【{}】進行復原!", entry.getKey(), name);
                    /********** 資料清理的操作模拟 ****************/
                }
            });
        }
    }
}
           

Copy

栗子2

以下栗子摘自《Thinking in Java》,作為經典的入門書籍,第一次認識到CyclicBarrier也是通過這本書,當時就覺得這個栗子很有意思。

下面的代碼主要是模拟了賽馬比賽,使用"==="表示栅欄,"***"表示目前馬兒跑過的距離,*最右邊是每批馬兒的編号,可以拷貝代碼導自己的工程後執行main方法看下運作效果。

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 這裡模拟了賽馬,使用"==="表示栅欄,"***"表示目前馬兒跑過的距離,*最右邊是每批馬兒的編号
 * 如以下例子,則表示6号馬兒跑在最前面
 * 可以執行main方法看下運作效果
 * ==================================================
 * **********************************************1
 * *****************************2
 * *********************************************3
 * ****************************************4
 * *********************************************5
 * ************************************************6
 * *****************************************7
 * ==================================================
 **/

class Horse implements Runnable{
    private static int counter = 1;
    // 馬的編号
    private final int id = counter++;
    // 目前馬所走的步數
    private int strides = 0;
    private static Random rand = new Random(47);
    private static CyclicBarrier barrier;

    public Horse(CyclicBarrier b){
        barrier = b;
    }
    // 傳回目前馬所跑的步數
    public synchronized int getStrides(){
        return strides;
    }

    public void run(){
        try
        {
            while(!Thread.interrupted()){
                synchronized(this){
                    strides += rand.nextInt(3);
                }
                barrier.await();
            }
        }catch(InterruptedException ex){
            System.out.println(this+ " 通過中斷異常退出");
        }catch(BrokenBarrierException e){
            throw new RuntimeException();
        }
    }

    public String toString(){
        return "Horse " + id + " ";
    }

    // 使用"*"表示目前馬的軌迹
    public String tracks(){
        StringBuilder s = new StringBuilder();
        for(int i = 0 ; i < getStrides(); i++){
            s.append("*");
        }
        s.append(id);
        return s.toString();
    }
}

public class HorseRace {
      static final int FINISH_LINE = 50;
      private List<Horse> horses = new ArrayList<>();
      private ExecutorService exec = Executors.newCachedThreadPool();
      private CyclicBarrier barrier;

      public HorseRace(int nHorse,final int pause){

          // 初始化栅欄, nHorse即為模拟的馬兒數量,代表nHorse隻馬兒都到達await位置後,執行Runnable線程的方法
          barrier = new CyclicBarrier(nHorse,new Runnable(){
              public void run(){

                  StringBuilder s = new StringBuilder();
                  for(int i = 0; i < FINISH_LINE; i++){
                       s.append("=");
                  }
                  System.out.println(s);
                  for(Horse horse : horses){
                      System.out.println(horse.tracks());
                  }

                  for(Horse horse : horses){
                      if(horse.getStrides() >= FINISH_LINE){
                          System.out.println(horse + " won");
                          exec.shutdownNow();
                          return;
                      }
                  }

                  try{
                      // 睡眠一段時間,模拟馬奔跑的時間
                      TimeUnit.MILLISECONDS.sleep(pause);
                  }catch(InterruptedException ex){
                      ex.printStackTrace();
                  }
              }
          });

          for(int i=0; i < nHorse;i++){
              Horse horse = new Horse(barrier);
              horses.add(horse);
              exec.execute(horse);
          }
      }

      public static void main(String[] args){
          // 配置初始7匹馬進行競賽
          int nHorses = 7;
          // 每200毫秒更新一次馬兒的移動位置
          int pause = 200;
          new HorseRace(nHorses,pause);
      }
}
           

注意事項

從以上馬兒的例子可以看出,CyclicBarrier是可以重複使用的。