天天看點

可動态調節參數的線程池實作背景線程池任務排程機制動态調節線程池參數實作

背景

線程池是一種基于池化思想管理線程的工具,使用線程池可以減少建立銷毀線程的開銷,避免線程過多導緻系統資源耗盡。在高并發的任務處理場景,線程池的使用是必不可少的。在雙11主圖價格表達項目中為了提升處理性能,很多地方使用到了線程池。随着線程池的使用,逐漸發現一個問題,線程池的參數如何設定?

線程池參數中有三個比較關鍵的參數,分别是corePoolSize(核心線程數)、maximumPoolSize(最大線程數)、workQueueSzie(工作隊列大小)。根據任務的類型可以區分為IO密集型和CPU密集型,對于CPU密集型,一般經驗是設定corePoolSize=CPU核數+1,對于IO密集型需要根據具體的RT和流量來設定,沒有普适的經驗值。然而,我們一般遇到的情況多數是處理IO密集型任務,如果線程池參數不可動态調節,就沒辦法根據實際情況實時調整處理速度,隻能通過釋出代碼調整參數。

如果線程池參數不合理會導緻什麼問題呢?下面列舉幾種可能出現的場景:

  1. 最大線程數設定偏小,工作隊列大小設定偏小,導緻服務接口大量抛出RejectedExecutionException。
  2. 最大線程數設定偏小,工作隊列大小設定過大,任務堆積過度,接口響應時長變長。
  3. 最大線程數設定過大,線程排程開銷增大,處理速度反而下降。
  4. 核心線程數設定過小,流量突增時需要先建立線程,導緻響應時長過大。
  5. 核心線程數設定過大,空閑線程太多,占用系統資源。

線程池任務排程機制

要明白線程池參數對運作時的影響,就必須了解其中的原理,是以下面先簡單總結了線程池的核心原理。

Java中的線程池核心實作類是ThreadPoolExecutor,ThreadPoolExecutor一方面維護自身的生命周期,另一方面同時管理線程和任務,使兩者良好的結合進而執行并行任務。使用者無需關注如何建立線程,如何排程線程來執行任務,使用者隻需提供Runnable對象,将任務的運作邏輯送出到執行器(Executor)中,由Executor架構完成線程的調配和任務的執行部分。

ThreadPoolExecutor是如何運作,如何同時維護線程和執行任務的呢?其運作機制如下圖所示:

可動态調節參數的線程池實作背景線程池任務排程機制動态調節線程池參數實作

所有任務的排程都是由execute方法完成的,這部分完成的工作是:檢查現線上程池的運作狀态、運作線程數、運作政策,決定接下來執行的流程,是直接申請線程執行,或是緩沖到隊列中執行,亦或是直接拒絕該任務。其執行過程如下:

  1. 首先檢測線程池運作狀态,如果不是RUNNING,則直接拒絕,線程池要保證在RUNNING的狀态下執行任務。
  2. 如果workerCount < corePoolSize,則建立并啟動一個線程來執行新送出的任務。
  3. 如果workerCount >= corePoolSize,且線程池内的阻塞隊列未滿,則将任務添加到該阻塞隊列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池内的阻塞隊列已滿,則建立并啟動一個線程來執行新送出的任務。
  5. 如果workerCount >= maximumPoolSize,并且線程池内的阻塞隊列已滿, 則根據拒絕政策來處理該任務, 預設的處理方式是直接抛異常。

其執行流程如下圖所示:

可動态調節參數的線程池實作背景線程池任務排程機制動态調節線程池參數實作

動态調節線程池參數實作

線程池相關的重要參數有三個,分别是核心線程數、最大線程數和工作隊列大小,接下來将闡述如何實作動态調節線程池參數。

調節核心和最大線程數的原理

ThreadPoolExecutor已經提供了兩個方法在運作時設定核心線程數和最大線程數,分别是

ThreadPoolExecutor.setCorePoolSize()

ThreadPoolExecutor.setMaximumPoolSize()

setCorePoolSize方法的執行流程是:首先會覆寫之前構造函數設定的corePoolSize,然後,如果新的值比原始值要小,當多餘的工作線程下次變成空閑狀态的時候會被中斷并銷毀,如果新的值比原來的值要大且工作隊列不為空,則會建立新的工作線程。流程圖如下:

可動态調節參數的線程池實作背景線程池任務排程機制動态調節線程池參數實作

setMaximumPoolSize方法執行流程是:首先會覆寫之前構造函數設定的maximumPoolSize,然後,如果新的值比原來的值要小,當多餘的工作線程下次變成空閑狀态的時候會被中斷并銷毀。

調節工作隊列大小的原理

線程池中是以生産者消費者模式,通過一個阻塞隊列來緩存任務,工作線程從阻塞隊列中擷取任務。工作隊列的接口是阻塞隊列(BlockingQueue),在隊列為空時,擷取元素的線程會等待隊列變為非空,當隊列滿時,存儲元素的線程會等待隊列可用。

目前JDK提供了以下阻塞隊列的實作:

可動态調節參數的線程池實作背景線程池任務排程機制動态調節線程池參數實作

但是很不幸,這些阻塞隊列的實作都不支援動态調整大小,那麼為什麼不自己實作一個可動态調整大小的阻塞隊列呢。重複造輪子是不可取的,是以我選擇改造輪子。

LinkedBlockingQueue

是比較常用的一個阻塞隊列,它無法修改大小的原因是capacity字段設定成了final

private final int capacity;

。如果我把final去掉,并提供修改capacity的方法,是不是就滿足我們的需求呢?事實證明是可行的,文章末尾上傳了ResizeLinkedBlockingQueue的實作。

結合Diamond進行實作

Diamond可以管理我們的配置,如果可以通過Diamond實作線程池參數管理那就再好不過了。接下來就開始上代碼了,首先實作一個Diamond配置管理類

DispatchConfig

,然後,實作一個線程池管理的工廠方法

StreamExecutorFactory

DispatchConfig

類是一個靜态類,在初始化的時候擷取了對應Diamond的内容并設定了監聽,使用的時候隻需要

DispatchConfig.getConfig().getCorePoolSize()

/**
 * @author moda
 */
@Slf4j
@Data
public class DispatchConfig {
    public static final String DATA_ID = "com.alibaba.mkt.turbo.DispatchConfig";
    public static final String GROUP_ID = "mkt-turbo";
    private static DispatchConfig config;

    static {
        try {
            String content = Diamond.getConfig(DATA_ID, GROUP_ID, 3000);
            config = JSON.parseObject(content, DispatchConfig.class);
            Diamond.addListener(DATA_ID, GROUP_ID, new ManagerListenerAdapter() {
                @Override
                public void receiveConfigInfo(String content) {
                    try {
                        config = JSON.parseObject(content, DispatchConfig.class);
                    } catch (Throwable t) {
                        log.error("[DispatchConfig] receiveConfigInfo an exception occurs,", t);
                    }
                }
            });
        } catch (Exception e) {
            log.error(String.format("[DispatchConfig - init] dataId:%s, groupId:%s ", DATA_ID, GROUP_ID), e);
        }
    }

    public static DispatchConfig getConfig() {
        return config;
    }

    private int corePoolSize = 10;

    private int maximumPoolSize = 30;

    private int workQueueSize = 1024;

    /**
     * 商品分批處理每批大小
     */
    private int itemBatchProcessPageSize = 200;
}           

StreamExecutorFactory

是一個靜态類,維護了一個靜态屬性

executor

,并通過

initExecutor()

進行初始化。在初始化的時候,工作隊列使用了可調節大小的阻塞隊列

ResizeLinkedBlockingQueue

,并設定了監聽Diamond變更。Diamond發生變更的時候通過在callback中對比值是否發生改變,如果發生改變則調整workQueueSize、corePoolSize、maximumPoolSize。使用的時候隻需要

StreamExecutorFactory.getExecutor()

,修改Diamond配置就能動态修改線程池參數。

/**
 * @author moda
 */
@Slf4j
public class StreamExecutorFactory {
    private static final String THREAD_NAME = "mkt-turbo_stream_dispatch";

    private static ThreadPoolExecutor executor = initExecutor();

    private static ThreadPoolExecutor initExecutor() {
        ThreadFactory nameThreadFactory = new ThreadFactoryBuilder().setNameFormat(THREAD_NAME).build();
        ResizeLinkedBlockingQueue<Runnable> workQueue = new ResizeLinkedBlockingQueue<>(DispatchConfig.getConfig().getWorkQueueSize());
        //拒絕政策,調用者線程處理
        RejectedExecutionHandler rejectedExecutionHandler = (r, e) -> {
            String msg = String.format("[S.E.F - rejectedHandler] Thread pool is EXHAUSTED!" +
                    " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                    " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)",
                THREAD_NAME, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
            log.warn(msg);
            if (!e.isShutdown()) {
                r.run();
            }
        };
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            DispatchConfig.getConfig().getCorePoolSize(),
            DispatchConfig.getConfig().getMaximumPoolSize(),
            10,
            TimeUnit.SECONDS,
            workQueue,
            nameThreadFactory,
            rejectedExecutionHandler
        );

        Diamond.addListener(DispatchConfig.DATA_ID, DispatchConfig.GROUP_ID, new ManagerListenerAdapter() {
            @Override
            public void receiveConfigInfo(String content) {
                try {
                    DispatchConfig config = JSON.parseObject(content, DispatchConfig.class);
                    if (workQueue.getCapacity() != config.getWorkQueueSize()) {
                        workQueue.setCapacity(config.getWorkQueueSize());
                    }
                    if (threadPoolExecutor.getCorePoolSize() != config.getCorePoolSize()) {
                        threadPoolExecutor.setCorePoolSize(config.getCorePoolSize());
                    }
                    if (threadPoolExecutor.getMaximumPoolSize() != config.getMaximumPoolSize()) {
                        threadPoolExecutor.setMaximumPoolSize(config.getMaximumPoolSize());
                    }
                } catch (Throwable t) {
                    log.error("[S.E.F-receiveConfigInfo] an exception occurs,", t);
                }
            }
        });

        return threadPoolExecutor;
    }

    public static Executor getExecutor() {
        return executor;
    }
}