CountDownLatch介紹
CountDownLatch稱之為閉鎖,它可以使一個或一批線程在閉鎖上等待,等到其他線程執行完相應操作後,閉鎖打開,這些等待的線程才可以繼續執行。确切的說,閉鎖在内部維護了一個倒計數器。通過該計數器的值來決定閉鎖的狀态,進而決定是否允許等待的線程繼續執行。
常用方法:
public CountDownLatch(int count):構造方法,count表示計數器的值,不能小于0,否者會報異常。
**public void await() throws InterruptedException:**調用await()會讓目前線程等待,直到計數器為0的時候,方法才會傳回,此方法會響應線程中斷操作。
public boolean await(long timeout, TimeUnit unit) throws InterruptedException:限時等待,在逾時之前,計數器變為了0,方法傳回true,否者直到逾時,傳回false,此方法會響應線程中斷操作。
**public void countDown():**讓計數器減1
CountDownLatch使用步驟:
1.建立CountDownLatch對象
2.調用其執行個體方法 await(),讓目前線程等待
3.調用 countDown()方法,讓計數器減1
4.當計數器變為0的時候, await()方法會傳回
舉例:有3個人參見跑步比賽,需要先等指令員發指令槍後才能開跑,所有人都跑完之後,指令員喊一聲,大家跑完了
package com.example.demo.demo.CountDownLatchDemo;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 代碼中,t1、t2、t3啟動之後,都阻塞在 commanderCd.await();,
* 主線程模拟發槍準備操作耗時5秒,然後調用 commanderCd.countDown();
* 模拟發槍操作,此方法被調用以後,阻塞在 commanderCd.await();的3個線程會向下執行。
* 主線程調用 countDownLatch.await();之後進行等待,每個人跑完之後,調用 countDown.countDown();
* 通知一下 countDownLatch讓計數器減1,最後3個人都跑完了,主線程從 countDownLatch.await();傳回繼續向下執行
*/
public class CountDownLatchTest {
public static class T extends Thread{
//每個線程跑步耗費的時間
int runCostTime;
CountDownLatch commanderCd;
CountDownLatch countDownLatch;
public T(String name, int runCostTime, CountDownLatch commanderCd,CountDownLatch countDownLatch) {
super(name);
this.runCostTime = runCostTime;
this.commanderCd = commanderCd;
this.countDownLatch=countDownLatch;
}
@Override
public void run() {
//準備跑步,等待發槍信号
try {
commanderCd.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 開始時間
long startTime = System.currentTimeMillis();
System.out.println(startTime+","+Thread.currentThread().getName()+"開始跑!");
try {
//跑步消耗的時間
TimeUnit.SECONDS.sleep(this.runCostTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
countDownLatch.countDown();
}
long endTime = System.currentTimeMillis();
System.out.println(endTime+","+Thread.currentThread().getName()+"跑步結束,耗時:"+(endTime-startTime));
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println(System.currentTimeMillis()+","+Thread.currentThread()+"啟動!");
CountDownLatch commanderCd = new CountDownLatch(1);
CountDownLatch countDownLatch = new CountDownLatch(3);
long startTime = System.currentTimeMillis();//比賽開始時間
T t1= new T("小張",2,commanderCd,countDownLatch);
t1.start();
T t2= new T("小紅",5,commanderCd,countDownLatch);
t2.start();
T t3= new T("小黃",7,commanderCd,countDownLatch);
t3.start();
//線程在隊列中等待,讓主線程等待,準備發槍,其餘線程進入就緒
TimeUnit.SECONDS.sleep(5);
System.out.println(System.currentTimeMillis()+",槍響了,開始跑");
commanderCd.countDown();//賽道上比賽開始跑起來
countDownLatch.await();
long endTime = System.currentTimeMillis();
System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()+"結束,耗時"+(endTime-startTime));
}
}
1591104244695,Thread[main,5,main]啟動!
1591104249699,槍響了,開始跑
1591104249699,小張開始跑!
1591104249699,小紅開始跑!
1591104249699,小黃開始跑!
1591104251700,小張跑步結束,耗時:2001
1591104254699,小紅跑步結束,耗時:5000
1591104256700,小黃跑步結束,耗時:7001
1591104256700,main結束,耗時12003
手寫一個并行處理任務的工具類
package com.example.demo.demo.CountDownLatchDemo;
import org.springframework.util.CollectionUtils;
import java.sql.Time;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* 并行處理任務的工具類
*/
public class TaskDisposeUtils {
//并發線程數
public static final int Pool_size;
static {
Pool_size = Integer.max(Runtime.getRuntime().availableProcessors(),5);
}
/**
* 并行性處理,等待結束
*/
public static <T> void dispose(List<T> taskList, Consumer<T> consumer) throws InterruptedException{
dispose(true,Pool_size,taskList,consumer);
}
/**
* 并行處理,并等待結束
* @param moreThread 是否多線程執行
* @param pool_size 線程池大小
* @param taskList 任務清單
* @param consumer 消費者
* @param <T>
*/
public static <T> void dispose(boolean moreThread, int pool_size, List<T> taskList, Consumer<T> consumer) throws InterruptedException {
if (CollectionUtils.isEmpty(taskList)){
return;
}
if(moreThread && pool_size>1){//多線程
pool_size = Math.min(pool_size,taskList.size());
ExecutorService executorService = null;
try{
executorService = Executors.newFixedThreadPool(pool_size);//建立線程池
CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
for (T item:taskList) {
executorService.execute(() ->{
try {
consumer.accept(item);
}
finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
}
finally {
if(executorService !=null){
executorService.shutdown();
}
}
}else {//單線程
for (T item:taskList) {
consumer.accept(item);
}
}
}
public static void main(String[] args) throws InterruptedException {
//生成1-10的數字,放在list中,相當于10個任務
List<Integer> list = Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList());
//啟動多線程處理list中的資料,每個任務休眠時間為list中的資料
TaskDisposeUtils.dispose(list,item->{
try{//實際開發,并行任務處理在try中
long startTime = System.currentTimeMillis();
TimeUnit.SECONDS.sleep(item);
long endTime = System.currentTimeMillis();
System.out.println(System.currentTimeMillis()+".任務"+item+"執行完畢,耗時"+(endTime-startTime));
}catch (InterruptedException e){
e.printStackTrace();
}
});
System.out.println(list+"中的任務處理完畢");
}
}
1591106690601.任務1執行完畢,耗時1000
1591106691602.任務2執行完畢,耗時2001
1591106692601.任務3執行完畢,耗時3000
1591106693601.任務4執行完畢,耗時4000
1591106694602.任務5執行完畢,耗時5000
1591106695603.任務6執行完畢,耗時6001
1591106696602.任務7執行完畢,耗時7000
1591106697602.任務8執行完畢,耗時8000
1591106698602.任務9執行完畢,耗時9000
1591106699602.任務10執行完畢,耗時10000
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]中的任務處理完畢