天天看點

JUC中等待多線程完成的工具類CountDownLatch

CountDownLatch介紹

CountDownLatch稱之為閉鎖,它可以使一個或一批線程在閉鎖上等待,等到其他線程執行完相應操作後,閉鎖打開,這些等待的線程才可以繼續執行。确切的說,閉鎖在内部維護了一個倒計數器。通過該計數器的值來決定閉鎖的狀态,進而決定是否允許等待的線程繼續執行。

常用方法:

public CountDownLatch(int count):構造方法,count表示計數器的值,不能小于0,否者會報異常。

**public void await() throws InterruptedException:**調用await()會讓目前線程等待,直到計數器為0的時候,方法才會傳回,此方法會響應線程中斷操作。

public boolean await(long timeout, TimeUnit unit) throws InterruptedException:限時等待,在逾時之前,計數器變為了0,方法傳回true,否者直到逾時,傳回false,此方法會響應線程中斷操作。

**public void countDown():**讓計數器減1

CountDownLatch使用步驟:

1.建立CountDownLatch對象

2.調用其執行個體方法 await(),讓目前線程等待

3.調用 countDown()方法,讓計數器減1

4.當計數器變為0的時候, await()方法會傳回

舉例:有3個人參見跑步比賽,需要先等指令員發指令槍後才能開跑,所有人都跑完之後,指令員喊一聲,大家跑完了

package com.example.demo.demo.CountDownLatchDemo;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 代碼中,t1、t2、t3啟動之後,都阻塞在 commanderCd.await();,
 * 主線程模拟發槍準備操作耗時5秒,然後調用 commanderCd.countDown();
 * 模拟發槍操作,此方法被調用以後,阻塞在 commanderCd.await();的3個線程會向下執行。
 * 主線程調用 countDownLatch.await();之後進行等待,每個人跑完之後,調用 countDown.countDown();
 * 通知一下 countDownLatch讓計數器減1,最後3個人都跑完了,主線程從 countDownLatch.await();傳回繼續向下執行
 */
public class CountDownLatchTest {
    public static class T extends  Thread{
        //每個線程跑步耗費的時間
        int runCostTime;
        CountDownLatch commanderCd;
        CountDownLatch countDownLatch;

        public T(String name, int runCostTime, CountDownLatch commanderCd,CountDownLatch countDownLatch) {
            super(name);
            this.runCostTime = runCostTime;
            this.commanderCd = commanderCd;
            this.countDownLatch=countDownLatch;
        }

        @Override
        public void run() {
            //準備跑步,等待發槍信号
            try {
                commanderCd.await();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
//            開始時間
            long startTime = System.currentTimeMillis();
            System.out.println(startTime+","+Thread.currentThread().getName()+"開始跑!");
            try {
                //跑步消耗的時間
                TimeUnit.SECONDS.sleep(this.runCostTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            finally {
                countDownLatch.countDown();
            }
            long endTime = System.currentTimeMillis();
            System.out.println(endTime+","+Thread.currentThread().getName()+"跑步結束,耗時:"+(endTime-startTime));
        }
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println(System.currentTimeMillis()+","+Thread.currentThread()+"啟動!");
        CountDownLatch commanderCd = new CountDownLatch(1);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        long startTime = System.currentTimeMillis();//比賽開始時間
        T t1= new T("小張",2,commanderCd,countDownLatch);
        t1.start();
        T t2= new T("小紅",5,commanderCd,countDownLatch);
        t2.start();
        T t3= new T("小黃",7,commanderCd,countDownLatch);
        t3.start();
        //線程在隊列中等待,讓主線程等待,準備發槍,其餘線程進入就緒
        TimeUnit.SECONDS.sleep(5);
        System.out.println(System.currentTimeMillis()+",槍響了,開始跑");
        commanderCd.countDown();//賽道上比賽開始跑起來
        countDownLatch.await();
        long endTime = System.currentTimeMillis();
        System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()+"結束,耗時"+(endTime-startTime));
    }
}

           
1591104244695,Thread[main,5,main]啟動!
1591104249699,槍響了,開始跑
1591104249699,小張開始跑!
1591104249699,小紅開始跑!
1591104249699,小黃開始跑!
1591104251700,小張跑步結束,耗時:2001
1591104254699,小紅跑步結束,耗時:5000
1591104256700,小黃跑步結束,耗時:7001
1591104256700,main結束,耗時12003
           

手寫一個并行處理任務的工具類

package com.example.demo.demo.CountDownLatchDemo;

import org.springframework.util.CollectionUtils;

import java.sql.Time;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * 并行處理任務的工具類
 */
public class TaskDisposeUtils {
    //并發線程數
    public static  final int Pool_size;
    static {
        Pool_size = Integer.max(Runtime.getRuntime().availableProcessors(),5);
    }
    /**
     * 并行性處理,等待結束
     */
    public static <T> void dispose(List<T> taskList, Consumer<T> consumer) throws InterruptedException{
        dispose(true,Pool_size,taskList,consumer);
    }

    /**
     * 并行處理,并等待結束
     * @param moreThread 是否多線程執行
     * @param pool_size 線程池大小
     * @param taskList 任務清單
     * @param consumer 消費者
     * @param <T>
     */
    public  static <T> void dispose(boolean moreThread, int pool_size, List<T> taskList, Consumer<T> consumer) throws InterruptedException {
        if (CollectionUtils.isEmpty(taskList)){
            return;
        }
        if(moreThread && pool_size>1){//多線程
            pool_size = Math.min(pool_size,taskList.size());
            ExecutorService executorService = null;

            try{
                executorService = Executors.newFixedThreadPool(pool_size);//建立線程池
                CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
                for (T item:taskList) {
                    executorService.execute(() ->{
                        try {
                            consumer.accept(item);
                        }
                        finally {
                            countDownLatch.countDown();
                        }
                    });
                }
                countDownLatch.await();
            }
            finally {
                if(executorService !=null){
                    executorService.shutdown();
                }
            }
        }else {//單線程
            for (T item:taskList) {
                consumer.accept(item);
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //生成1-10的數字,放在list中,相當于10個任務
        List<Integer> list = Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList());
        //啟動多線程處理list中的資料,每個任務休眠時間為list中的資料
        TaskDisposeUtils.dispose(list,item->{
            try{//實際開發,并行任務處理在try中
                long startTime = System.currentTimeMillis();
                TimeUnit.SECONDS.sleep(item);
                long endTime = System.currentTimeMillis();
                
                System.out.println(System.currentTimeMillis()+".任務"+item+"執行完畢,耗時"+(endTime-startTime));
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        });   
        System.out.println(list+"中的任務處理完畢");
    }

}

           
1591106690601.任務1執行完畢,耗時1000
1591106691602.任務2執行完畢,耗時2001
1591106692601.任務3執行完畢,耗時3000
1591106693601.任務4執行完畢,耗時4000
1591106694602.任務5執行完畢,耗時5000
1591106695603.任務6執行完畢,耗時6001
1591106696602.任務7執行完畢,耗時7000
1591106697602.任務8執行完畢,耗時8000
1591106698602.任務9執行完畢,耗時9000
1591106699602.任務10執行完畢,耗時10000
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]中的任務處理完畢
           

繼續閱讀