前言
使用線程池 ThreadPoolExecutor 過程中你是否有以下痛點呢?
- 代碼中建立了一個 ThreadPoolExecutor,但是不知道那幾個核心參數設定多少比較合适
- 憑經驗設定參數值,上線後發現需要調整,改代碼重新釋出服務,非常麻煩
- 線程池相對開發人員來說是個黑盒,運作情況不能及時感覺到,直到出現問題
如果你有以上痛點,這塊開源的動态可監控線程池架構(DynamicTp)或許能幫助到你
此項目由Dromara社群開源,基于配置中心的輕量級動态可監控線程池,項目位址:https://dynamictp.cn/
功能特性
- 代碼零侵入:所有配置都放在配置中心,對業務代碼零侵入
- 輕量簡單:基于 springboot 實作,引入 starter,接入隻需簡單4步就可完成,順利3分鐘搞定
- 高可擴充:架構核心功能都提供 SPI 接口供使用者自定義個性化實作(配置中心、配置檔案解析、通知告警、監控資料采集、任務包裝等等)
- 線上大規模應用:參考美團線程池實踐open in new window,美團内部已經有該理論成熟的應用經驗
- 多平台通知報警:提供多種報警次元(配置變更通知、活性報警、容量門檻值報警、拒絕觸發報警、任務執行或等待逾時報警),已支援企業微信、釘釘、飛書報警,同時提供 SPI 接口可自定義擴充實作
- 監控:定時采集線程池名額資料,支援通過 MicroMeter、JsonLog 日志輸出、Endpoint 三種方式,可通過 SPI 接口自定義擴充實作
- 任務增強:提供任務包裝功能,實作TaskWrapper接口即可,如 MdcTaskWrapper、TtlTaskWrapper、SwTraceTaskWrapper,可以支援線程池上下文資訊傳遞
- 相容性:JUC 普通線程池和 Spring 中的 ThreadPoolTaskExecutor 也可以被架構監控,@Bean 定義時加 @DynamicTp 注解即可
- 可靠性:架構提供的線程池實作 Spring 生命周期方法,可以在 Spring 容器關閉前盡可能多的處理隊列中的任務
- 多模式:參考Tomcat線程池提供了 IO 密集型場景使用的 EagerDtpExecutor 線程池
- 支援多配置中心:基于主流配置中心實作線程池參數動态調整,實時生效,已支援 Nacos、Apollo、Zookeeper、Consul、Etcd,同時也提供 SPI 接口可自定義擴充實作
- 中間件線程池管理:內建管理常用第三方元件的線程池,已內建Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix等元件的線程池管理(調參、監控報警)
系統架構圖
接入指南
Maven 依賴
下面隻介紹幾個常用場景接入pom,zk、consul這些請檢視官網
- apollo 應用用接入用此依賴
<dependency>
<groupId>cn.dynamictp</groupId>
<artifactId>dynamic-tp-spring-boot-starter-apollo</artifactId>
<version>1.0.8</version>
</dependency>
- spring-cloud 場景下的 nacos 應用接入用此依賴
<dependency>
<groupId>cn.dynamictp</groupId>
<artifactId>dynamic-tp-spring-cloud-starter-nacos</artifactId>
<version>1.0.8</version>
</dependency>
- 非 spring-cloud 場景下的 nacos 應用接入用此依賴
<dependency>
<groupId>cn.dynamictp</groupId>
<artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId>
<version>1.0.8</version>
</dependency>
配置檔案
線程池定義可以配置在檔案中,然後在Spring應用中可以通過@Resource、@Autowire、或通過工具類來擷取執行個體
以下配置檔案,除了維護公共屬性還定義了名稱為austin.im.notice、execute-xxl-thread-pool線程池,其中格式支援yml、properties 類型、json 類型、zk檔案
spring:
dynamic:
tp:
enabled: true
enabledBanner: true # 是否開啟banner列印,預設true
enabledCollect: true # 是否開啟監控名額采集,預設false
collectorType: micrometer # 監控資料采集器類型(JsonLog | MicroMeter),預設logging
monitorInterval: 5 # 監控時間間隔(報警判斷、名額采集),預設5s
apollo: # apollo配置,不配置預設拿apollo配置第一個namespace
namespace: dynamic-tp-apollo-dtp.yml
configType: yml
platforms:
- platform: wechat
urlKey: 38aa7eff500-1287
receivers: apollo
- platform: ding
urlKey: f80dad441fcd65bac48473d4a88dcd6a
secret: SECb544445a6a34f0315d08b17de41
receivers: 18888888888
executors:
- threadPoolName: austin.im.notice
corePoolSize: 6
maximumPoolSize: 8
queueCapacity: 200
queueType: VariableLinkedBlockingQueue # 任務隊列,檢視源碼QueueTypeEnum枚舉類
rejectedHandlerType: CallerRunsPolicy # 拒絕政策,檢視RejectedTypeEnum枚舉類
keepAliveTime: 50
allowCoreThreadTimeOut: false
threadNamePrefix: austin- # 線程名字首
- threadPoolName: execute-xxl-thread-pool
corePoolSize: 3
maximumPoolSize: 3
queueCapacity: 200
queueType: VariableLinkedBlockingQueue # 任務隊列,檢視源碼QueueTypeEnum枚舉類
rejectedHandlerType: CallerRunsPolicy # 拒絕政策,檢視RejectedTypeEnum枚舉類
keepAliveTime: 50
allowCoreThreadTimeOut: false
threadNamePrefix: austin- # 線程名字首
notifyItems: # 報警項,不配置自動會配置(變更通知、容量報警、活性報警、拒絕報警)
- type: capacity # 報警項類型,檢視源碼 NotifyTypeEnum枚舉類
enabled: true
threshold: 80 # 報警門檻值
platforms: [ding,wechat] # 可選配置,不配置預設拿上層platforms配置的是以平台
interval: 120 # 報警間隔(機關:s)
- type: change
enabled: true
- type: liveness
enabled: true
threshold: 80
- type: reject
enabled: true
threshold: 1
代碼使用
線程池執行個體定義
如果不再配置檔案中進行定義,也可以在代碼中直接定義
@Configuration
public class DtpConfig {
/**
* 通過{@link DynamicTp} 注解定義普通juc線程池,會享受到該架構監控功能,注解名稱優先級高于方法名
*
* @return 線程池執行個體
*/
@DynamicTp("commonExecutor")
@Bean
public ThreadPoolExecutor commonExecutor() {
return (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
}
/**
* 通過{@link ThreadPoolCreator} 快速建立一些簡單配置的動态線程池
* tips: 建議直接在配置中心配置就行,不用@Bean聲明
*
* @return 線程池執行個體
*/
@Bean
public DtpExecutor dtpExecutor1() {
return ThreadPoolCreator.createDynamicFast("dtpExecutor1");
}
/**
* 通過{@link ThreadPoolBuilder} 設定詳細參數建立動态線程池(推薦方式),
* ioIntensive,參考tomcat線程池設計,實作了處理io密集型任務的線程池,具體參數可以看代碼注釋
*
* tips: 建議直接在配置中心配置就行,不用@Bean聲明
* @return 線程池執行個體
*/
@Bean
public DtpExecutor ioIntensiveExecutor() {
return ThreadPoolBuilder.newBuilder()
.threadPoolName("ioIntensiveExecutor")
.corePoolSize(20)
.maximumPoolSize(50)
.queueCapacity(2048)
.ioIntensive(true)
.buildDynamic();
}
/**
* tips: 建議直接在配置中心配置就行,不用@Bean聲明
* @return 線程池執行個體
*/
@Bean
public ThreadPoolExecutor dtpExecutor2() {
return ThreadPoolBuilder.newBuilder()
.threadPoolName("dtpExecutor2")
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(50)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)
.waitForTasksToCompleteOnShutdown(true)
.awaitTerminationSeconds(5)
.buildDynamic();
}
}
代碼調用
從DtpRegistry中根據線程池名稱擷取,或者通過依賴注入方式(推薦,更優雅)
- 依賴注入方式使用,優先推薦依賴注入方式,不能使用依賴注入的場景可以使用方式2
@Resource
private ThreadPoolExecutor dtpExecutor1;
public void exec() {
dtpExecutor1.execute(() -> System.out.println("test"));
}
- 通過DtpRegistry注冊器擷取
public static void main(String[] args) {
DtpExecutor dtpExecutor = DtpRegistry.getDtpExecutor("dtpExecutor1");
dtpExecutor.execute(() -> System.out.println("test"));
}
通知報警
調參通知
運作報警
線程池活躍度告警
活躍度 = activeCount / maximumPoolSize
隊列容量告警
容量使用率 = queueSize / queueCapacity
拒絕政策告警
線程池線程數達到配置的最大線程數,且任務隊列已滿,再送出任務會觸發拒絕政策
任務隊列逾時告警
重寫ThreadPoolExecutor的execute()方法和beforeExecute()方法,如果配置了執行逾時或排隊逾時值,則會進行報警
任務執行逾時告警
重寫ThreadPoolExecutor的afterExecute()方法,根據目前時間和beforeExecute()中設定的startTime的內插補點即可算出任務的實際執行時間,然後判斷如果內插補點大于配置的runTimeout則累加排隊逾時任務數量,則會進行告警
監控
支援接入prometheus+grafana做監控,效果如下
總結
DynamicTcp是一個功能實用,上手簡單動态線程池元件,很輕量對業務無侵入,目前我們業務系統已經開始介入,讀者朋友們趕緊使用起來吧