引子
随着時代的發展,Object當初的抽象模型部分不适用當下的技術潮流,比如finalize()
方法在JDK9 之後直接被标記為過時方法。而wait()和notify() 同步方式事實上已經
被同步信号、鎖、阻塞集合等取代。
—— 《碼出高效》
那麼,如何用好線程同步類将是關鍵,以下将詳細給大家介紹幾個線程同步類的詳細使用場景,友善大家在工作中使用。
CountDownLatch
功能講解
常用于監聽某些初始化操作,等待其他子線程初始化執行完畢,主線程再繼續執行。
大體步驟為:
1、向CountDownLatch對象設定一個初始計數值
2、主線程通過CountDownLatch#await()方法進行阻塞
3、其他子線程初始化執行完畢後,調用CountDownLatch#countDown()對計數值減1,直到計數值降為0,此時表示所有資源初始化完畢。
4、主線程的CountDownLatch#await()方法不再阻塞,主線程繼續往下走。
栗子1
這裡通過Future#get()進行子線程是否異常的判斷,如果有子線程異常,則直接往上抛出異常。Future#get()觸發異常可參考blog:https://blog.csdn.net/y124675160/article/details/104399114 中關于CompleteService源碼解析的部分。
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
/**
* @author tingyu
* @date 2020/12/12
*/
@Slf4j
public class CountDownLatchDemo {
private static ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("sourceInitExecutor").build();
// 根據實際場景進行線程池大小等配置
// 如果任務多數為排隊執行,對隊列要求高的,建議使用MQ的隊列,将排隊的壓力交給雲端,線程池隻做執行使用
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10,
60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), threadFactory);
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(3);
List<Future<Integer>> sourceFutureList = new ArrayList<>(10);
sourceFutureList.add(threadPoolExecutor.submit(new SourceCallable(countDownLatch, "SourceA")));
sourceFutureList.add(threadPoolExecutor.submit(new SourceCallable(countDownLatch, "SourceB")));
sourceFutureList.add(threadPoolExecutor.submit(new SourceCallable(countDownLatch, "SourceC")));
// 主線程在此處阻塞,直到所有子線程資源初始化完畢!
countDownLatch.await(10, TimeUnit.SECONDS);
sourceFutureList.forEach(future -> {
try {
future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
log.info("資源準備完畢,主線程繼續向下執行!");
}
static class SourceCallable implements Callable<Integer> {
static Random random = new Random();
private CountDownLatch countDownLatch;
private String name;
SourceCallable(CountDownLatch countDownLatch, String name) {
this.countDownLatch = countDownLatch;
this.name = name;
}
@Override
public Integer call() {
try {
// 開始資源初始化
/********** 模拟資源初始化 ***************/
TimeUnit.SECONDS.sleep(random.nextInt(3));
log.info("【{}】資源初始化完畢!", name);
} catch (Exception e) {
// 執行一些需要清理的工作
/********** 模拟資源清理 ***************/
log.error("【{}】資源初始化異常!", name, e);
throw new RuntimeException(e);
}finally {
countDownLatch.countDown();
}
return 1;
}
}
}
Copy
栗子2
上家公司的時候用CountDownLatch實作過一個功能,詳情可參考:https://www.atatech.org/articles/162136
注意事項
CountDownLatch被設計為隻觸發一次,計數器不能被重置。如果需要能被重置計數器的版本,則可以使用CyclicBarrier。
countDown調用時要預防前面的代碼抛異常,導緻countDown未能執行,進而導緻await一直阻塞。是以可以在異常捕獲内部再調用一次,或者統一在異常捕獲之後進行調用,或者在finally裡面執行。
CyclicBarrier
功能講解
适用于你希望建立一組任務,它們并行地執行工作,然後在進行下一個步驟之前等待,直到所有任務都完成。它使得所有的并行任務都将在栅欄處列隊,是以可以一緻地向前移動。這非常像CountDownLatch,隻是CountDownLatch是隻觸發一次的事件,而CyclicBarrier可以多次重用。
栗子1
在上家公司時,由于機構 DIY 課程定制需要從固定課程複制,而複制需要調用三個小組的微服務,三個微服務分别為 課程建立、講次建立、卷子建立,講次挂在課程上,卷子挂在講次上,關系如下:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5CNjljM4Q2MiVGMmFDN2AjMxATM0ETMhFmM3EWZldDN08CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
但是後面發現這樣耗時太長,是以改為使用id生成器提前生成好班型id,再通過傳入班型id,同時進行班型、講次、卷子的複制,這時候需要保證3個服務中任何一個出現異常的時候,所有已經建立完成的任務資料需要復原(此處的復原為調用對應微服務提供的資料清除接口進行)。
一開始使用CompleteService + CountDownLatch + 線程池實作,但後面發現實作複雜,并且線程池隻做通信,并沒有發揮真正的作用,是以之後使用CyclicBarrier實作了一個優化版本,以下為優化版本的示例。 (CountDownLatch版可參考blog:https://blog.csdn.net/y124675160/article/details/104399114 其中還有關于ExecutorCompletionService的源碼講解)
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.*;
/**
* @author tingyu
* @date 2020/12/12
*/
@Slf4j
public class ClassTypeCreateDemo {
private static ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("ClassTypeCreateExecutor").build();
// 初始化線程池,這裡的線程數等配置根據實際場景進行配置,由于此處多數為調用遠端微服務作業,為IO密集型,可以設定大一點。
// 如果任務多數為排隊執行,對隊列要求高的,建議使用MQ的隊列,将排隊的壓力交給雲端,線程池隻做執行使用
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(20, 20,
60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), threadFactory);
public static void main(String[] args) throws InterruptedException {
// 設定任務執行結果的初始值
Map<String, Boolean> execResultMap = new HashMap<>(16);
execResultMap.put("ClassType", true);
execResultMap.put("Lesson", true);
execResultMap.put("LessonPaper", true);
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
// 講次建立
threadPoolExecutor.submit(new LessonRunnable(cyclicBarrier, "Lesson", execResultMap));
// 卷子建立
threadPoolExecutor.submit(new LessonPaperRunnable(cyclicBarrier, "LessonPaper", execResultMap));
// 課程建立
Future<Integer> claaTypeFuture =
threadPoolExecutor.submit(new ClassTypeCallable(cyclicBarrier, "ClassType", execResultMap));
try {
Integer classTypeId = claaTypeFuture.get();
log.info("課程建立成功,課程ID為:{}", classTypeId);
} catch (ExecutionException e) {
// 課程建立失敗
log.error("課程建立失敗!原因為:{}", e.getMessage(), e);
throw new RuntimeException("課程建立失敗!", e);
}
}
/**
* 課程建立任務
*/
static class ClassTypeCallable implements Callable<Integer> {
static Random random = new Random();
private CyclicBarrier cyclicBarrier;
private String name;
private Map<String, Boolean> execResultMap;
ClassTypeCallable(CyclicBarrier cyclicBarrier, String name, Map<String, Boolean> execResultMap) {
this.cyclicBarrier = cyclicBarrier;
this.name = name;
this.execResultMap = execResultMap;
}
@Override
public Integer call() {
Integer classTypeId = null;
try {
TimeUnit.SECONDS.sleep(random.nextInt(3));
// 模拟課程建立成功後的id指派
classTypeId = random.nextInt(10000);
log.info("課程建立成功!");
} catch (Exception e) {
log.error("課程建立異常!", e);
// 将ClassType的處理狀态設定為false
execResultMap.put(name, false);
}
try {
cyclicBarrier.await();
} catch (Exception e) {
log.warn("【{}】調用await異常!", name, e);
}
Set<Map.Entry<String, Boolean>> entries = execResultMap.entrySet();
entries.forEach(entry -> {
if(!entry.getValue()) {
log.warn("【{}】任務建立異常,【{}】進行復原!", entry.getKey(), name);
/********** 資料清理的操作模拟 ****************/
throw new RuntimeException("【" + entry.getKey() + "】任務建立異常!");
}
});
return classTypeId;
}
}
/**
* 講次建立任務
*/
static class LessonRunnable implements Runnable {
static Random random = new Random();
private CyclicBarrier cyclicBarrier;
private String name;
private Map<String, Boolean> execResultMap;
LessonRunnable(CyclicBarrier cyclicBarrier, String name, Map<String, Boolean> execResultMap) {
this.cyclicBarrier = cyclicBarrier;
this.name = name;
this.execResultMap = execResultMap;
}
@Override
public void run() {
try {
// 開始資源初始化
TimeUnit.SECONDS.sleep(random.nextInt(3));
log.info("講次建立成功!");
} catch (Exception e) {
log.error("講次建立異常!", e);
// 将Lesson的處理狀态設定為false
execResultMap.put(name, false);
}
try {
cyclicBarrier.await();
} catch (Exception e) {
log.warn("【{}】調用await異常!", name, e);
}
Set<Map.Entry<String, Boolean>> entries = execResultMap.entrySet();
entries.forEach(entry -> {
if(!entry.getValue()) {
log.warn("【{}】任務建立異常,【{}】進行復原!", entry.getKey(), name);
/********** 資料清理的操作模拟 ****************/
}
});
}
}
/**
* 卷子建立任務
*/
static class LessonPaperRunnable implements Runnable {
static Random random = new Random();
private CyclicBarrier cyclicBarrier;
private String name;
private Map<String, Boolean> execResultMap;
LessonPaperRunnable(CyclicBarrier cyclicBarrier, String name, Map<String, Boolean> execResultMap) {
this.cyclicBarrier = cyclicBarrier;
this.name = name;
this.execResultMap = execResultMap;
}
@Override
public void run() {
try {
// 開始資源初始化
TimeUnit.SECONDS.sleep(random.nextInt(3));
log.info("卷子建立成功!");
} catch (Exception e) {
log.error("卷子建立異常!", e);
// 将LessonPaper的處理狀态設定為false
execResultMap.put(name, false);
}
try {
cyclicBarrier.await();
} catch (Exception e) {
log.warn("【{}】調用await異常!", name, e);
}
Set<Map.Entry<String, Boolean>> entries = execResultMap.entrySet();
entries.forEach(entry -> {
if(!entry.getValue()) {
log.warn("【{}】任務建立異常,【{}】進行復原!", entry.getKey(), name);
/********** 資料清理的操作模拟 ****************/
}
});
}
}
}
Copy
栗子2
以下栗子摘自《Thinking in Java》,作為經典的入門書籍,第一次認識到CyclicBarrier也是通過這本書,當時就覺得這個栗子很有意思。
下面的代碼主要是模拟了賽馬比賽,使用"==="表示栅欄,"***"表示目前馬兒跑過的距離,*最右邊是每批馬兒的編号,可以拷貝代碼導自己的工程後執行main方法看下運作效果。
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 這裡模拟了賽馬,使用"==="表示栅欄,"***"表示目前馬兒跑過的距離,*最右邊是每批馬兒的編号
* 如以下例子,則表示6号馬兒跑在最前面
* 可以執行main方法看下運作效果
* ==================================================
* **********************************************1
* *****************************2
* *********************************************3
* ****************************************4
* *********************************************5
* ************************************************6
* *****************************************7
* ==================================================
**/
class Horse implements Runnable{
private static int counter = 1;
// 馬的編号
private final int id = counter++;
// 目前馬所走的步數
private int strides = 0;
private static Random rand = new Random(47);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier b){
barrier = b;
}
// 傳回目前馬所跑的步數
public synchronized int getStrides(){
return strides;
}
public void run(){
try
{
while(!Thread.interrupted()){
synchronized(this){
strides += rand.nextInt(3);
}
barrier.await();
}
}catch(InterruptedException ex){
System.out.println(this+ " 通過中斷異常退出");
}catch(BrokenBarrierException e){
throw new RuntimeException();
}
}
public String toString(){
return "Horse " + id + " ";
}
// 使用"*"表示目前馬的軌迹
public String tracks(){
StringBuilder s = new StringBuilder();
for(int i = 0 ; i < getStrides(); i++){
s.append("*");
}
s.append(id);
return s.toString();
}
}
public class HorseRace {
static final int FINISH_LINE = 50;
private List<Horse> horses = new ArrayList<>();
private ExecutorService exec = Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nHorse,final int pause){
// 初始化栅欄, nHorse即為模拟的馬兒數量,代表nHorse隻馬兒都到達await位置後,執行Runnable線程的方法
barrier = new CyclicBarrier(nHorse,new Runnable(){
public void run(){
StringBuilder s = new StringBuilder();
for(int i = 0; i < FINISH_LINE; i++){
s.append("=");
}
System.out.println(s);
for(Horse horse : horses){
System.out.println(horse.tracks());
}
for(Horse horse : horses){
if(horse.getStrides() >= FINISH_LINE){
System.out.println(horse + " won");
exec.shutdownNow();
return;
}
}
try{
// 睡眠一段時間,模拟馬奔跑的時間
TimeUnit.MILLISECONDS.sleep(pause);
}catch(InterruptedException ex){
ex.printStackTrace();
}
}
});
for(int i=0; i < nHorse;i++){
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
public static void main(String[] args){
// 配置初始7匹馬進行競賽
int nHorses = 7;
// 每200毫秒更新一次馬兒的移動位置
int pause = 200;
new HorseRace(nHorses,pause);
}
}
注意事項
從以上馬兒的例子可以看出,CyclicBarrier是可以重複使用的。