天天看點

開源輕量級動态可監控線程池DynamicTp介紹

作者:架構成長指南

前言

使用線程池 ThreadPoolExecutor 過程中你是否有以下痛點呢?

  1. 代碼中建立了一個 ThreadPoolExecutor,但是不知道那幾個核心參數設定多少比較合适
  2. 憑經驗設定參數值,上線後發現需要調整,改代碼重新釋出服務,非常麻煩
  3. 線程池相對開發人員來說是個黑盒,運作情況不能及時感覺到,直到出現問題

如果你有以上痛點,這塊開源的動态可監控線程池架構(DynamicTp)或許能幫助到你

此項目由Dromara社群開源,基于配置中心的輕量級動态可監控線程池,項目位址:https://dynamictp.cn/

開源輕量級動态可監控線程池DynamicTp介紹

功能特性

  • 代碼零侵入:所有配置都放在配置中心,對業務代碼零侵入
  • 輕量簡單:基于 springboot 實作,引入 starter,接入隻需簡單4步就可完成,順利3分鐘搞定
  • 高可擴充:架構核心功能都提供 SPI 接口供使用者自定義個性化實作(配置中心、配置檔案解析、通知告警、監控資料采集、任務包裝等等)
  • 線上大規模應用:參考美團線程池實踐open in new window,美團内部已經有該理論成熟的應用經驗
  • 多平台通知報警:提供多種報警次元(配置變更通知、活性報警、容量門檻值報警、拒絕觸發報警、任務執行或等待逾時報警),已支援企業微信、釘釘、飛書報警,同時提供 SPI 接口可自定義擴充實作
  • 監控:定時采集線程池名額資料,支援通過 MicroMeter、JsonLog 日志輸出、Endpoint 三種方式,可通過 SPI 接口自定義擴充實作
  • 任務增強:提供任務包裝功能,實作TaskWrapper接口即可,如 MdcTaskWrapper、TtlTaskWrapper、SwTraceTaskWrapper,可以支援線程池上下文資訊傳遞
  • 相容性:JUC 普通線程池和 Spring 中的 ThreadPoolTaskExecutor 也可以被架構監控,@Bean 定義時加 @DynamicTp 注解即可
  • 可靠性:架構提供的線程池實作 Spring 生命周期方法,可以在 Spring 容器關閉前盡可能多的處理隊列中的任務
  • 多模式:參考Tomcat線程池提供了 IO 密集型場景使用的 EagerDtpExecutor 線程池
  • 支援多配置中心:基于主流配置中心實作線程池參數動态調整,實時生效,已支援 Nacos、Apollo、Zookeeper、Consul、Etcd,同時也提供 SPI 接口可自定義擴充實作
  • 中間件線程池管理:內建管理常用第三方元件的線程池,已內建Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix等元件的線程池管理(調參、監控報警)

系統架構圖

開源輕量級動态可監控線程池DynamicTp介紹

接入指南

Maven 依賴

下面隻介紹幾個常用場景接入pom,zk、consul這些請檢視官網

  • apollo 應用用接入用此依賴
<dependency>
<groupId>cn.dynamictp</groupId>
<artifactId>dynamic-tp-spring-boot-starter-apollo</artifactId>
<version>1.0.8</version>
</dependency>           
  • spring-cloud 場景下的 nacos 應用接入用此依賴
<dependency>
<groupId>cn.dynamictp</groupId>
<artifactId>dynamic-tp-spring-cloud-starter-nacos</artifactId>
<version>1.0.8</version>
</dependency>           
  • 非 spring-cloud 場景下的 nacos 應用接入用此依賴
<dependency>
<groupId>cn.dynamictp</groupId>
<artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId>
<version>1.0.8</version>
</dependency>           

配置檔案

線程池定義可以配置在檔案中,然後在Spring應用中可以通過@Resource、@Autowire、或通過工具類來擷取執行個體

以下配置檔案,除了維護公共屬性還定義了名稱為austin.im.notice、execute-xxl-thread-pool線程池,其中格式支援yml、properties 類型、json 類型、zk檔案

spring:
  dynamic:
    tp:
      enabled: true
      enabledBanner: true           # 是否開啟banner列印,預設true
      enabledCollect: true          # 是否開啟監控名額采集,預設false
      collectorType: micrometer     # 監控資料采集器類型(JsonLog | MicroMeter),預設logging
      monitorInterval: 5            # 監控時間間隔(報警判斷、名額采集),預設5s
      apollo:                       # apollo配置,不配置預設拿apollo配置第一個namespace
        namespace: dynamic-tp-apollo-dtp.yml
      configType: yml
      platforms:
        - platform: wechat
          urlKey: 38aa7eff500-1287
          receivers: apollo
        - platform: ding
          urlKey: f80dad441fcd65bac48473d4a88dcd6a
          secret: SECb544445a6a34f0315d08b17de41
          receivers: 18888888888
      executors:
        - threadPoolName: austin.im.notice
          corePoolSize: 6
          maximumPoolSize: 8
          queueCapacity: 200
          queueType: VariableLinkedBlockingQueue   # 任務隊列,檢視源碼QueueTypeEnum枚舉類
          rejectedHandlerType: CallerRunsPolicy    # 拒絕政策,檢視RejectedTypeEnum枚舉類
          keepAliveTime: 50
          allowCoreThreadTimeOut: false
          threadNamePrefix: austin-           # 線程名字首
        - threadPoolName: execute-xxl-thread-pool
          corePoolSize: 3
          maximumPoolSize: 3
          queueCapacity: 200
          queueType: VariableLinkedBlockingQueue   # 任務隊列,檢視源碼QueueTypeEnum枚舉類
          rejectedHandlerType: CallerRunsPolicy    # 拒絕政策,檢視RejectedTypeEnum枚舉類
          keepAliveTime: 50
          allowCoreThreadTimeOut: false
          threadNamePrefix: austin-           # 線程名字首
          notifyItems: # 報警項,不配置自動會配置(變更通知、容量報警、活性報警、拒絕報警)
            - type: capacity               # 報警項類型,檢視源碼 NotifyTypeEnum枚舉類
              enabled: true
              threshold: 80                # 報警門檻值
              platforms: [ding,wechat]     # 可選配置,不配置預設拿上層platforms配置的是以平台
              interval: 120                # 報警間隔(機關:s)
            - type: change
              enabled: true
            - type: liveness
              enabled: true
              threshold: 80
            - type: reject
              enabled: true
              threshold: 1           

代碼使用

線程池執行個體定義

如果不再配置檔案中進行定義,也可以在代碼中直接定義

@Configuration
public class DtpConfig {  
  
  /**
   * 通過{@link DynamicTp} 注解定義普通juc線程池,會享受到該架構監控功能,注解名稱優先級高于方法名
   *
   * @return 線程池執行個體
   */
  @DynamicTp("commonExecutor")
  @Bean
  public ThreadPoolExecutor commonExecutor() {
      return (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
  }

  /**
   * 通過{@link ThreadPoolCreator} 快速建立一些簡單配置的動态線程池
   * tips: 建議直接在配置中心配置就行,不用@Bean聲明
   *
   * @return 線程池執行個體
   */
  @Bean
  public DtpExecutor dtpExecutor1() {
      return ThreadPoolCreator.createDynamicFast("dtpExecutor1");
  }

  /**
   * 通過{@link ThreadPoolBuilder} 設定詳細參數建立動态線程池(推薦方式),
   * ioIntensive,參考tomcat線程池設計,實作了處理io密集型任務的線程池,具體參數可以看代碼注釋
   *
   * tips: 建議直接在配置中心配置就行,不用@Bean聲明
   * @return 線程池執行個體
   */
  @Bean
  public DtpExecutor ioIntensiveExecutor() {
      return ThreadPoolBuilder.newBuilder()
              .threadPoolName("ioIntensiveExecutor")
              .corePoolSize(20)
              .maximumPoolSize(50)
              .queueCapacity(2048)
              .ioIntensive(true)
              .buildDynamic();
  }

  /**
   * tips: 建議直接在配置中心配置就行,不用@Bean聲明
   * @return 線程池執行個體
   */
  @Bean
  public ThreadPoolExecutor dtpExecutor2() {
      return ThreadPoolBuilder.newBuilder()
              .threadPoolName("dtpExecutor2")
              .corePoolSize(10)
              .maximumPoolSize(15)
              .keepAliveTime(50)
              .timeUnit(TimeUnit.MILLISECONDS)
              .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)
              .waitForTasksToCompleteOnShutdown(true)
              .awaitTerminationSeconds(5)
              .buildDynamic();
  }
}
           

代碼調用

從DtpRegistry中根據線程池名稱擷取,或者通過依賴注入方式(推薦,更優雅)

  1. 依賴注入方式使用,優先推薦依賴注入方式,不能使用依賴注入的場景可以使用方式2
@Resource
private ThreadPoolExecutor dtpExecutor1;

public void exec() {
   dtpExecutor1.execute(() -> System.out.println("test"));
}
           
  1. 通過DtpRegistry注冊器擷取
public static void main(String[] args) {
   DtpExecutor dtpExecutor = DtpRegistry.getDtpExecutor("dtpExecutor1");
   dtpExecutor.execute(() -> System.out.println("test"));
}
           

通知報警

調參通知

開源輕量級動态可監控線程池DynamicTp介紹

運作報警

線程池活躍度告警

活躍度 = activeCount / maximumPoolSize

開源輕量級動态可監控線程池DynamicTp介紹

隊列容量告警

容量使用率 = queueSize / queueCapacity

開源輕量級動态可監控線程池DynamicTp介紹

拒絕政策告警

線程池線程數達到配置的最大線程數,且任務隊列已滿,再送出任務會觸發拒絕政策

開源輕量級動态可監控線程池DynamicTp介紹

任務隊列逾時告警

重寫ThreadPoolExecutor的execute()方法和beforeExecute()方法,如果配置了執行逾時或排隊逾時值,則會進行報警

任務執行逾時告警

重寫ThreadPoolExecutor的afterExecute()方法,根據目前時間和beforeExecute()中設定的startTime的內插補點即可算出任務的實際執行時間,然後判斷如果內插補點大于配置的runTimeout則累加排隊逾時任務數量,則會進行告警

監控

支援接入prometheus+grafana做監控,效果如下

開源輕量級動态可監控線程池DynamicTp介紹

總結

DynamicTcp是一個功能實用,上手簡單動态線程池元件,很輕量對業務無侵入,目前我們業務系統已經開始介入,讀者朋友們趕緊使用起來吧

繼續閱讀