背景
線程池是一種基于池化思想管理線程的工具,使用線程池可以減少建立銷毀線程的開銷,避免線程過多導緻系統資源耗盡。在高并發的任務處理場景,線程池的使用是必不可少的。在雙11主圖價格表達項目中為了提升處理性能,很多地方使用到了線程池。随着線程池的使用,逐漸發現一個問題,線程池的參數如何設定?
線程池參數中有三個比較關鍵的參數,分别是corePoolSize(核心線程數)、maximumPoolSize(最大線程數)、workQueueSzie(工作隊列大小)。根據任務的類型可以區分為IO密集型和CPU密集型,對于CPU密集型,一般經驗是設定corePoolSize=CPU核數+1,對于IO密集型需要根據具體的RT和流量來設定,沒有普适的經驗值。然而,我們一般遇到的情況多數是處理IO密集型任務,如果線程池參數不可動态調節,就沒辦法根據實際情況實時調整處理速度,隻能通過釋出代碼調整參數。
如果線程池參數不合理會導緻什麼問題呢?下面列舉幾種可能出現的場景:
- 最大線程數設定偏小,工作隊列大小設定偏小,導緻服務接口大量抛出RejectedExecutionException。
- 最大線程數設定偏小,工作隊列大小設定過大,任務堆積過度,接口響應時長變長。
- 最大線程數設定過大,線程排程開銷增大,處理速度反而下降。
- 核心線程數設定過小,流量突增時需要先建立線程,導緻響應時長過大。
- 核心線程數設定過大,空閑線程太多,占用系統資源。
線程池任務排程機制
要明白線程池參數對運作時的影響,就必須了解其中的原理,是以下面先簡單總結了線程池的核心原理。
Java中的線程池核心實作類是ThreadPoolExecutor,ThreadPoolExecutor一方面維護自身的生命周期,另一方面同時管理線程和任務,使兩者良好的結合進而執行并行任務。使用者無需關注如何建立線程,如何排程線程來執行任務,使用者隻需提供Runnable對象,将任務的運作邏輯送出到執行器(Executor)中,由Executor架構完成線程的調配和任務的執行部分。
ThreadPoolExecutor是如何運作,如何同時維護線程和執行任務的呢?其運作機制如下圖所示:

所有任務的排程都是由execute方法完成的,這部分完成的工作是:檢查現線上程池的運作狀态、運作線程數、運作政策,決定接下來執行的流程,是直接申請線程執行,或是緩沖到隊列中執行,亦或是直接拒絕該任務。其執行過程如下:
- 首先檢測線程池運作狀态,如果不是RUNNING,則直接拒絕,線程池要保證在RUNNING的狀态下執行任務。
- 如果workerCount < corePoolSize,則建立并啟動一個線程來執行新送出的任務。
- 如果workerCount >= corePoolSize,且線程池内的阻塞隊列未滿,則将任務添加到該阻塞隊列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池内的阻塞隊列已滿,則建立并啟動一個線程來執行新送出的任務。
- 如果workerCount >= maximumPoolSize,并且線程池内的阻塞隊列已滿, 則根據拒絕政策來處理該任務, 預設的處理方式是直接抛異常。
其執行流程如下圖所示:
動态調節線程池參數實作
線程池相關的重要參數有三個,分别是核心線程數、最大線程數和工作隊列大小,接下來将闡述如何實作動态調節線程池參數。
調節核心和最大線程數的原理
ThreadPoolExecutor已經提供了兩個方法在運作時設定核心線程數和最大線程數,分别是
ThreadPoolExecutor.setCorePoolSize()
和
ThreadPoolExecutor.setMaximumPoolSize()
。
setCorePoolSize方法的執行流程是:首先會覆寫之前構造函數設定的corePoolSize,然後,如果新的值比原始值要小,當多餘的工作線程下次變成空閑狀态的時候會被中斷并銷毀,如果新的值比原來的值要大且工作隊列不為空,則會建立新的工作線程。流程圖如下:
setMaximumPoolSize方法執行流程是:首先會覆寫之前構造函數設定的maximumPoolSize,然後,如果新的值比原來的值要小,當多餘的工作線程下次變成空閑狀态的時候會被中斷并銷毀。
調節工作隊列大小的原理
線程池中是以生産者消費者模式,通過一個阻塞隊列來緩存任務,工作線程從阻塞隊列中擷取任務。工作隊列的接口是阻塞隊列(BlockingQueue),在隊列為空時,擷取元素的線程會等待隊列變為非空,當隊列滿時,存儲元素的線程會等待隊列可用。
目前JDK提供了以下阻塞隊列的實作:
但是很不幸,這些阻塞隊列的實作都不支援動态調整大小,那麼為什麼不自己實作一個可動态調整大小的阻塞隊列呢。重複造輪子是不可取的,是以我選擇改造輪子。
LinkedBlockingQueue
是比較常用的一個阻塞隊列,它無法修改大小的原因是capacity字段設定成了final
private final int capacity;
。如果我把final去掉,并提供修改capacity的方法,是不是就滿足我們的需求呢?事實證明是可行的,文章末尾上傳了ResizeLinkedBlockingQueue的實作。
結合Diamond進行實作
Diamond可以管理我們的配置,如果可以通過Diamond實作線程池參數管理那就再好不過了。接下來就開始上代碼了,首先實作一個Diamond配置管理類
DispatchConfig
,然後,實作一個線程池管理的工廠方法
StreamExecutorFactory
DispatchConfig
類是一個靜态類,在初始化的時候擷取了對應Diamond的内容并設定了監聽,使用的時候隻需要
DispatchConfig.getConfig().getCorePoolSize()
/**
* @author moda
*/
@Slf4j
@Data
public class DispatchConfig {
public static final String DATA_ID = "com.alibaba.mkt.turbo.DispatchConfig";
public static final String GROUP_ID = "mkt-turbo";
private static DispatchConfig config;
static {
try {
String content = Diamond.getConfig(DATA_ID, GROUP_ID, 3000);
config = JSON.parseObject(content, DispatchConfig.class);
Diamond.addListener(DATA_ID, GROUP_ID, new ManagerListenerAdapter() {
@Override
public void receiveConfigInfo(String content) {
try {
config = JSON.parseObject(content, DispatchConfig.class);
} catch (Throwable t) {
log.error("[DispatchConfig] receiveConfigInfo an exception occurs,", t);
}
}
});
} catch (Exception e) {
log.error(String.format("[DispatchConfig - init] dataId:%s, groupId:%s ", DATA_ID, GROUP_ID), e);
}
}
public static DispatchConfig getConfig() {
return config;
}
private int corePoolSize = 10;
private int maximumPoolSize = 30;
private int workQueueSize = 1024;
/**
* 商品分批處理每批大小
*/
private int itemBatchProcessPageSize = 200;
}
StreamExecutorFactory
是一個靜态類,維護了一個靜态屬性
executor
,并通過
initExecutor()
進行初始化。在初始化的時候,工作隊列使用了可調節大小的阻塞隊列
ResizeLinkedBlockingQueue
,并設定了監聽Diamond變更。Diamond發生變更的時候通過在callback中對比值是否發生改變,如果發生改變則調整workQueueSize、corePoolSize、maximumPoolSize。使用的時候隻需要
StreamExecutorFactory.getExecutor()
,修改Diamond配置就能動态修改線程池參數。
/**
* @author moda
*/
@Slf4j
public class StreamExecutorFactory {
private static final String THREAD_NAME = "mkt-turbo_stream_dispatch";
private static ThreadPoolExecutor executor = initExecutor();
private static ThreadPoolExecutor initExecutor() {
ThreadFactory nameThreadFactory = new ThreadFactoryBuilder().setNameFormat(THREAD_NAME).build();
ResizeLinkedBlockingQueue<Runnable> workQueue = new ResizeLinkedBlockingQueue<>(DispatchConfig.getConfig().getWorkQueueSize());
//拒絕政策,調用者線程處理
RejectedExecutionHandler rejectedExecutionHandler = (r, e) -> {
String msg = String.format("[S.E.F - rejectedHandler] Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)",
THREAD_NAME, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
log.warn(msg);
if (!e.isShutdown()) {
r.run();
}
};
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
DispatchConfig.getConfig().getCorePoolSize(),
DispatchConfig.getConfig().getMaximumPoolSize(),
10,
TimeUnit.SECONDS,
workQueue,
nameThreadFactory,
rejectedExecutionHandler
);
Diamond.addListener(DispatchConfig.DATA_ID, DispatchConfig.GROUP_ID, new ManagerListenerAdapter() {
@Override
public void receiveConfigInfo(String content) {
try {
DispatchConfig config = JSON.parseObject(content, DispatchConfig.class);
if (workQueue.getCapacity() != config.getWorkQueueSize()) {
workQueue.setCapacity(config.getWorkQueueSize());
}
if (threadPoolExecutor.getCorePoolSize() != config.getCorePoolSize()) {
threadPoolExecutor.setCorePoolSize(config.getCorePoolSize());
}
if (threadPoolExecutor.getMaximumPoolSize() != config.getMaximumPoolSize()) {
threadPoolExecutor.setMaximumPoolSize(config.getMaximumPoolSize());
}
} catch (Throwable t) {
log.error("[S.E.F-receiveConfigInfo] an exception occurs,", t);
}
}
});
return threadPoolExecutor;
}
public static Executor getExecutor() {
return executor;
}
}