天天看點

美團動态線程池實踐思路,開源了

基于配置中心對線程池ThreadPoolExecutor做一些擴充,實作對運作中線程池參數的動态修改,實時生效;以及實時監控線程池的運作狀态,觸發設定的報警政策時報警,報警資訊會推送辦公平台(釘釘、企微等)。報警次元包括(隊列容量、線程池活性、拒絕觸發等);同時也會定時采集線程池名額資料供監控平台可視化使用。使我們能時刻感覺到線程池的負載,根據情況及時調整,避免出現問題影響線上業務。

大家好,今天我們來聊一個比較實用的話題,動态可監控的線程池實踐,全新開源項目(DynamicTp)位址在文章末尾,歡迎交流學習。

寫在前面

稍微有些Java程式設計經驗的小夥伴都知道,Java的精髓在juc包,這是大名鼎鼎的Doug Lea老爺

子的傑作,評價一個程式員Java水準怎麼樣,一定程度上看他對juc包下的一些技術掌握的怎麼樣,這也是面試中的基本上必問的一些技術點之一。

juc包主要包括:

1.原子類(AtomicXXX)

2.鎖類(XXXLock)

3.線程同步類(AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger)

4.任務執行器類(Executor體系類,包括今天的主角ThreadPoolExecutor)

5.并發集合類(ConcurrentXXX、CopyOnWriteXXX)相關集合類

6.阻塞隊列類(BlockingQueue繼承體系類)

7.Future相關類

8.其他一些輔助工具類

多線程程式設計場景下,這些類都是必備技能,會這些可以幫助我們寫出高品質、高性能、少bug的代碼,同時這些也是Java中比較難啃的一些技術,需要持之以恒,學以緻用,在使用中感受他們帶來的奧妙。

上邊簡單羅列了下juc包下功能分類,這篇文章我們主要來介紹動态可監控線程池的,是以具體内容也就不展開講了,以後有時間單獨來聊吧。看這篇文章前,希望讀者最好有一定的線程池ThreadPoolExecutor使用經驗,不然看起來會有點懵。

如果你對ThreadPoolExecutor不是很熟悉,推薦閱讀下面兩篇文章

javadoop: https://www.javadoop.com/post/java-thread-pool

美團技術部落格: https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

背景

使用ThreadPoolExecutor過程中你是否有以下痛點呢?

1.代碼中建立了一個ThreadPoolExecutor,但是不知道那幾個核心參數設定多少比較合适

2.憑經驗設定參數值,上線後發現需要調整,改代碼重新開機服務,非常麻煩

3.線程池相對開發人員來說是個黑盒,運作情況不能感覺到,直到出現問題

如果你有以上痛點,這篇文章要介紹的動态可監控線程池(DynamicTp)或許能幫助到你。

如果看過ThreadPoolExecutor的源碼,大概可以知道其實它有提供一些set方法,可以在運作時動态去修改相應的值,這些方法有:

public void setCorePoolSize(int corePoolSize);
public void setMaximumPoolSize(int maximumPoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);
public void setThreadFactory(ThreadFactory threadFactory);
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);
           

現在大多數的網際網路項目其實都會微服務化部署,有一套自己的服務治理體系,微服務元件中的分布式配置中心扮演的就是動态修改配置,實時生效的角色。那麼我們是否可以結合配置中心來做運作時線程池參數的動态調整呢?答案是肯定的,而且配置中心相對都是高可用的,使用它也不用過于擔心配置推送出現問題這類事兒,而且也能減少研發動态線程池元件的難度和工作量。

綜上,我們總結出以下的背景

  • 廣泛性:在Java開發中,想要提高系統性能,線程池已經是一個90%以上的人都會選擇使用的基礎工具
  • 不确定性:項目中可能會建立很多線程池,既有IO密集型的,也有CPU密集型的,但線程池的參數并不好确定;需要有套機制在運作過程中動态去調整參數
  • 無感覺性,線程池運作過程中的各項名額一般感覺不到;需要有套監控報警機制在事前、事中就能讓開發人員感覺到線程池的運作狀況,及時處理
  • 高可用性,配置變更需要及時推送到用戶端;需要有高可用的配置管理推送服務,配置中心是現在大多數網際網路系統都會使用的元件,與之結合可以大幅度減少開發量及接入難度

簡介

我們基于配置中心對線程池ThreadPoolExecutor做一些擴充,實作對運作中線程池參數的動态修改,實時生效;以及實時監控線程池的運作狀态,觸發設定的報警政策時報警,報警資訊會推送辦公平台(釘釘、企微等)。報警次元包括(隊列容量、線程池活性、拒絕觸發等);同時也會定時采集線程池名額資料供監控平台可視化使用。使我們能時刻感覺到線程池的負載,根據情況及時調整,避免出現問題影響線上業務。

|  __ \                            (_) |__   __|
    | |  | |_   _ _ __   __ _ _ __ ___  _  ___| |_ __  
    | |  | | | | | '_ \ / _` | '_ ` _ | |/ __| | '_ \ 
    | |__| | |_| | | | | (_| | | | | | | | (__| | |_) |
    |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ 
             __/ |                              | |    
            |___/                               |_|    
     :: Dynamic Thread Pool :: 
           

特性

  • 參考美團線程池實踐 ,對線程池參數動态化管理,增加監控、報警功能
  • 基于Spring架構,現隻支援SpringBoot項目使用,輕量級,引入starter即可食用
  • 基于配置中心實作線程池參數動态調整,實時生效;內建主流配置中心,預設支援Nacos、Apollo,同時也提供SPI接口可自定義擴充實作
  • 内置通知報警功能,提供多種報警次元(配置變更通知、活性報警、容量門檻值報警、拒絕政策觸發報警),預設支援企業微信、釘釘報警,同時提供SPI接口可自定義擴充實作
  • 内置線程池名額采集功能,支援通過MicroMeter、JsonLog日志輸出、Endpoint三種方式,可通過SPI接口自定義擴充實作
  • 內建管理常用第三方元件的線程池,已內建SpringBoot内置WebServer(Tomcat、Undertow、Jetty)的線程池管理

架構設計

主要分四大子產品

  • 配置變更監聽子產品:

    1.監聽特定配置中心的指定配置檔案(預設實作Nacos、Apollo),可通過内部提供的SPI接口擴充其他實作

    2.解析配置檔案内容,内置實作yml、properties配置檔案的解析,可通過内部提供的SPI接口擴充其他實作

    3.通知線程池管理子產品實作重新整理

  • 線程池管理子產品:

    1.服務啟動時從配置中心拉取配置資訊,生成線程池執行個體注冊到内部線程池注冊中心中

    2.監聽子產品監聽到配置變更時,将變更資訊傳遞給管理子產品,實作線程池參數的重新整理

    3.代碼中通過getExecutor()方法根據線程池名稱來擷取線程池對象執行個體

  • 監控子產品:

    實作監控名額采集以及輸出,預設提供以下三種方式,也可通過内部提供的SPI接口擴充其他實作

    1.預設實作Json log輸出到磁盤

    2.MicroMeter采集,引入MicroMeter相關依賴

    3.暴雷Endpoint端點,可通過http方式通路

  • 通知告警子產品:

    對接辦公平台,實作通告告警功能,預設實作釘釘、企微,可通過内部提供的SPI接口擴充其他實作,通知告警類型如下

    1.線程池參數變更通知

    2.阻塞隊列容量達到設定門檻值告警

    3.線程池活性達到設定門檻值告警

    4.觸發拒絕政策告警

美團動态線程池實踐思路,開源了

使用

  • maven依賴
  1. apollo應用用接入用此依賴
    <dependency>
            <groupId>io.github.lyh200</groupId>
            <artifactId>dynamic-tp-spring-boot-starter-apollo</artifactId>
            <version>1.0.1</version>
        </dependency>
               
  2. spring-cloud場景下的nacos應用接入用此依賴
    <dependency>
            <groupId>io.github.lyh200</groupId>
            <artifactId>dynamic-tp-spring-cloud-starter-nacos</artifactId>
            <version>1.0.1</version>
        </dependency>
               
  3. 非spring-cloud場景下的nacos應用接入用此依賴
    <dependency>
            <groupId>io.github.lyh200</groupId>
            <artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId>
            <version>1.0.1</version>
        </dependency>
               
  • 線程池配置
    spring:
      dynamic:
        tp:
          enabled: true
          enabledBanner: true        # 是否開啟banner列印,預設true
          enabledCollect: false      # 是否開啟監控名額采集,預設false
          collectorType: logging     # 監控資料采集器類型(JsonLog | MicroMeter),預設logging
          logPath: /home/logs        # 監控日志資料路徑,預設${user.home}/logs
          monitorInterval: 5         # 監控時間間隔(報警判斷、名額采集),預設5s
          nacos:                     # nacos配置,不配置有預設值(規則name-dev.yml這樣)
            dataId: dynamic-tp-demo-dev.yml
            group: DEFAULT_GROUP
          apollo:                    # apollo配置,不配置預設拿apollo配置第一個namespace
            namespace: dynamic-tp-demo-dev.yml
          configType: yml            # 配置檔案類型
          platforms:                 # 通知報警平台配置
            - platform: wechat
              urlKey: 3a7500-1287-4bd-a798-c5c3d8b69c  # 替換
              receivers: test1,test2                   # 接受人企微名稱
            - platform: ding
              urlKey: f80dad441fcd655438f4a08dcd6a     # 替換
              secret: SECb5441fa6f375d5b9d21           # 替換,非sign模式可以沒有此值
              receivers: 15810119805                   # 釘釘賬号手機号    
          tomcatTp:                                    # tomcat web server線程池配置
              minSpare: 100
              max: 400      
          jettyTp:                                     # jetty web server線程池配置
              min: 100
              max: 400     
          undertowTp:                                  # undertow web server線程池配置
              ioThreads: 100
              workerThreads: 400      
          executors:                                   # 動态線程池配置
            - threadPoolName: dynamic-tp-test-1
              corePoolSize: 6
              maximumPoolSize: 8
              queueCapacity: 200
              queueType: VariableLinkedBlockingQueue   # 任務隊列,檢視源碼QueueTypeEnum枚舉類
              rejectedHandlerType: CallerRunsPolicy    # 拒絕政策,檢視RejectedTypeEnum枚舉類
              keepAliveTime: 50
              allowCoreThreadTimeOut: false
              threadNamePrefix: test           # 線程名字首
              notifyItems:                     # 報警項,不配置自動會配置(變更通知、容量報警、活性報警、拒絕報警)
                - type: capacity               # 報警項類型,檢視源碼 NotifyTypeEnum枚舉類
                  enabled: true
                  threshold: 80                # 報警門檻值
                  platforms: [ding,wechat]     # 可選配置,不配置預設拿上層platforms配置的是以平台
                  interval: 120                # 報警間隔(機關:s)
                - type: change
                  enabled: true
                - type: liveness
                  enabled: true
                  threshold: 80
                - type: reject
                  enabled: true
                  threshold: 1
               
  • 代碼方式生成,服務啟動會自動注冊
    @Configuration
    public class DtpConfig {
    
       @Bean
       public DtpExecutor demo1Executor() {
           return DtpCreator.createDynamicFast("demo1-executor");
      }
    
       @Bean
       public ThreadPoolExecutor demo2Executor() {
           return ThreadPoolBuilder.newBuilder()
                  .threadPoolName("demo2-executor")
                  .corePoolSize(8)
                  .maximumPoolSize(16)
                  .keepAliveTime(50)
                  .allowCoreThreadTimeOut(true)
                  .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)
                  .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName())
                  .buildDynamic();
      }
    }
               
  • 代碼調用,根據線程池名稱擷取
    public static void main(String[] args) {
           DtpExecutor dtpExecutor = DtpRegistry.getExecutor("dynamic-tp-test-1");
           dtpExecutor.execute(() -> System.out.println("test"));
    }
               

注意事項

  1. 配置檔案配置的參數會覆寫通過代碼生成方式配置的參數
  2. 阻塞隊列隻有VariableLinkedBlockingQueue類型可以修改capacity,該類型功能和LinkedBlockingQueue相似,隻是capacity不是final類型,可以修改,

    VariableLinkedBlockingQueue參考RabbitMq的實作

  3. 啟動看到如下日志輸出證明接入成功
    |  __ \                            (_) |__   __|   
    | |  | |_   _ _ __   __ _ _ __ ___  _  ___| |_ __  
    | |  | | | | | '_ \ / _` | '_ ` _ | |/ __| | '_ \ 
    | |__| | |_| | | | | (_| | | | | | | | (__| | |_) |
    |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ 
             __/ |                              | |    
            |___/                               |_|    
     :: Dynamic Thread Pool :: 
    
    DynamicTp register, executor: DtpMainPropWrapper(dtpName=dynamic-tp-test-1, corePoolSize=6, maxPoolSize=8, keepAliveTime=50, queueType=VariableLinkedBlockingQueue, queueCapacity=200, rejectType=RejectedCountableCallerRunsPolicy, allowCoreThreadTimeOut=false)
               
  4. 配置變更會推送通知消息,且會高亮變更的字段
    DynamicTp [dynamic-tp-test-2] refresh end, changed keys: [corePoolSize, queueCapacity], corePoolSize: [6 => 4], maxPoolSize: [8 => 8], queueType: [VariableLinkedBlockingQueue => VariableLinkedBlockingQueue], queueCapacity: [200 => 2000], keepAliveTime: [50s => 50s], rejectedType: [CallerRunsPolicy => CallerRunsPolicy], allowsCoreThreadTimeOut: [false => false]
               

通知報警

觸發報警門檻值會推送相應報警消息,且會高亮顯示相關字段,活性告警、容量告警、拒絕告警

美團動态線程池實踐思路,開源了

配置變更會推送通知消息,且會高亮變更的字段

美團動态線程池實踐思路,開源了

監控日志

通過主配置檔案collectType屬性配置名額采集類型,預設值:logging

  • micrometer方式:通過引入micrometer相關依賴采集到相應的平台

    (如Prometheus,InfluxDb...)

  • logging:名額資料以json格式輸出日志到磁盤,位址${logPath}/ dynamictp/${appName}.monitor.log
    2022-01-16 15:25:20.599 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":100,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":10,"taskCount":120,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1078,"dtpName":"remoting-call","maximumPoolSize":8}
    2022-01-16 15:25:25.603 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":120,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":20,"taskCount":140,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1459,"dtpName":"remoting-call","maximumPoolSize":8}
    2022-01-16 15:25:30.609 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":140,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":89,"taskCount":180,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":1890,"dtpName":"remoting-call","maximumPoolSize":8}
    2022-01-16 15:25:35.613 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":160,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":99,"taskCount":230,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":2780,"dtpName":"remoting-call","maximumPoolSize":8}
    2022-01-16 15:25:40.616 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":2,"queueSize":230,"largestPoolSize":4,"poolSize":2,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":300,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":4030,"dtpName":"remoting-call","maximumPoolSize":8}
               
  • 暴露EndPoint端點(dynamic-tp),可以通過http方式請求
    [
        {
            "dtp_name": "remoting-call",
            "core_pool_size": 8,
            "maximum_pool_size": 16,
            "queue_type": "SynchronousQueue",
            "queue_capacity": 0,
            "queue_size": 0,
            "fair": false,
            "queue_remaining_capacity": 0,
            "active_count": 2,
            "task_count": 2760,
            "completed_task_count": 2760,
            "largest_pool_size": 16,
            "pool_size": 8,
            "wait_task_count": 0,
            "reject_count": 12462,
            "reject_handler_name": "CallerRunsPolicy"
        },
        {
            "max_memory": "220 MB",
            "total_memory": "140 MB",
            "free_memory": "44 MB",
            "usable_memory": "125 MB"
        }
    ]
               

項目位址

gitee位址:https://gitee.com/yanhom/dynamic-tp

github位址:https://github.com/lyh200/dynamic-tp

聯系我

對項目有什麼想法或者建議,可以加我微信交流,或者建立issues,一起完善項目

公衆号:CodeFox

微信:yanhom1314