天天看點

ThreadPoolExecutor單例線程池在業務中的應用

業務中的場景:接口對外開放,對方調接口,推資料過來,前期可能對方接入的業務方比較少,開啟的線程也少,是以對我們這邊的時間不做要求,是以我們接口是處理了大批資料入庫,存雲,處理下來整個代碼走完要好幾分鐘,再給對方響應,但是前幾天對方突然在微信群裡限制了接口的逾時時間,逾時了之後對方就預設該批資料處理失敗,會重新推送這一批資料,導緻我們生産庫中出現了重複推過來的資料.是以就需要開啟另外一條線程執行業務代碼,接口的主線程校驗完資料之後直接傳回響應,以達到逾時要求.

可優化的點:這裡由于之前同僚的代碼的線程池都是随用随建立:

ExecutorService executor = Executors.newFixedThreadPool(20);

之後可能會建立多個線程池,是以自己加了個單例的線程池,程式啟動則加載,節省了重複建立線程池所消耗的資源,需要線程直接從建立好的線程池取就可以了.

單例的線程池:

/**
 * @author zsc
 * @ClassName GlobalThreadPool
 * @date 2019/11/7 16:07
 */
@Log4j2
public final class GlobalThreadPool {
    /**
     * 核心線程數
     */
    private static final int CORE_POOL_SIZE = 30;
    /**
     * 最大線程數
     */
    private static final int MAXIMUM_POOL_SIZE = 60;
    /**
     * 空閑線程的存活時間
     */
    private static final int KEEP_ALIVE_TIME = 1000;
    /**
     * 任務隊列的大小
     */
    private static final int  BLOCKING_QUEUE_SIZE = 1000;

    private GlobalThreadPool() {
    }
    /**
     * 餓漢初始化
     */
    private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE),
            new ThreadFactoryBuilder().setNameFormat("global-thread-pool-%d").build(),
            new  ThreadPoolExecutor.CallerRunsPolicy());
    public static ThreadPoolExecutor getExecutor() {
        return EXECUTOR;
    }
}
           

這裡采用的任務拒絕政策為由調用該線程處執行該任務.

調用處的代碼:

param.setStatus("1");
                bwDataExchangeDAO.insertBwData(param);
                //周遊資料,拿到輸入流,解析輸入流
                log.info("目前線程為:" + Thread.currentThread().getName());
                //開啟另一條線程去執行資料解析,以防逾時
                GlobalThreadPool.getExecutor().submit(() ->
                        resolveData(param));
            } else {
                bwDataExchangeDAO.insertBwData(param);
            }
           

主線程調用線程池開啟完線程之後就直接把任務扔給這個線程去處理,直接rtn就完事了.

下面看測試代碼:

@Test
    public void test11(){
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            GlobalThreadPool.getExecutor().submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("目前線程為:" + Thread.currentThread().getName());
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        //計數減一
                        countDownLatch.countDown();
                    }
                }
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("5個線程已執行完畢");
    }
           

控制台的輸出:

目前線程為:global-thread-pool-0
目前線程為:global-thread-pool-1
目前線程為:global-thread-pool-2
目前線程為:global-thread-pool-3
目前線程為:global-thread-pool-4

5個線程已執行完畢
           

繼續閱讀