天天看點

ThreadPoolExecutor線程池原理

ThreadPoolExecutor線程池原理

    • 線程池原理
      • 1. 線程池的簡單介紹
        • 1.1 線程池是什麼
        • 1.2 線程池解決的核心問題是什麼
      • 2. 線程池的實作原理
        • 2.1 線程池的執行流程
        • 2.2 源碼分析
      • 3. 線程池的使用
        • 3.1 線程池的建立
        • 3.2 向線程池送出任務
        • 3.3 生命周期管理
        • 3.4 關閉線程池
        • 3.5 合理地配置線程池
          • 如何判斷是 CPU 密集任務還是 IO 密集任務?
      • 4. 線程池的監控

線程池原理

随着計算機行業的飛速發展,摩爾定律逐漸失效,多核CPU成為主流。使用多線程并行計算逐漸成為開發人員提升伺服器性能的基本武器。J.U.C提供的線程池:ThreadPoolExecutor類,幫助開發人員管理線程并友善地執行并行任務。了解并合理使用線程池,是一個開發人員必修的基本功。

1. 線程池的簡單介紹

1.1 線程池是什麼

線程池(Thread Pool)是一種基于池化思想管理線程的工具,經常出現在多線程伺服器中,如MySQL。

線程過多會帶來額外的開銷,其中包括建立銷毀線程的開銷、排程線程的開銷等等,同時也降低了計算機的整體性能。線程池維護多個線程,等待監督管理者配置設定可并發執行的任務。這種做法,一方面避免了處理任務時建立銷毀線程開銷的代價,另一方面避免了線程數量膨脹導緻的過分排程問題,保證了對核心的充分利用。

本文描述線程池是JDK中提供的ThreadPoolExecutor類。

Java中的線程池是運用場景最多的并發架構,幾乎所有需要異步或并發執行任務的程式都可以使用線程池。在開發過程中,合理地使用線程池能夠帶來一系列好處:

  • 降低資源消耗:通過池化技術重複利用已建立的線程,降低線程建立和銷毀造成的損耗
  • 提高響應速度:當任務到達時,任務可以不需要等到線程建立就能立即執行
  • 提高線程的可管理性:線程是稀缺資源,如果無限制地建立,不僅會消耗系統資源, 還會降低系統的穩定性,使用線程池可以進行統一配置設定、調優和監控。但是,要做到合理利用線程池,必須對其實作原理了如指掌
  • 提供更多更強大的功能:線程池具備可拓展性,允許開發人員向其中增加更多的功能。比如延時定時線程池ScheduledThreadPoolExecutor,就允許任務延期執行或定期執行

1.2 線程池解決的核心問題是什麼

線程池解決的核心問題就是資源管理問題。在并發環境下,系統不能夠确定在任意時刻中,有多少任務需要執行,有多少資源需要投入。這種不确定性将帶來以下若幹問題:

  • 頻繁申請/銷毀資源和排程資源,将帶來額外的消耗,可能會非常巨大。
  • 對資源無限申請缺少抑制手段,易引發系統資源耗盡的風險。
  • 系統無法合理管理内部的資源分布,會降低系統的穩定性。

為解決資源配置設定這個問題,線程池采用了“池化”(Pooling)思想。池化,顧名思義,是為了最大化收益并最小化風險,而将資源統一在一起管理的一種思想。

Pooling is the grouping together of resources (assets, equipment, personnel, effort, etc.) for the purposes of maximizing advantage or minimizing risk to the users. The term is used in finance, computing and equipment management.——wikipedia

2. 線程池的實作原理

Java中的線程池核心實作類是ThreadPoolExecutor,首先來看一下ThreadPoolExecutor的UML類圖,了解下ThreadPoolExecutor的繼承關系。

ThreadPoolExecutor線程池原理

Executor架構的使用示意圖

ThreadPoolExecutor線程池原理
  • ThreadPoolExecutor實作的頂層接口是Executor,頂層接口Executor提供了一種思想:将任務送出和任務執行進行解耦。使用者無需關注如何建立線程,如何排程線程來執行任務,使用者隻需提供Runnable對象,将任務的運作邏輯送出到執行器(Executor)中,由Executor架構完成線程的調配和任務的執行部分。
  • ExecutorService接口增加了一些能力:(1)擴充執行任務的能力,補充可以為一個或一批異步任務生成Future的方法;(2)提供了管控線程池的方法,比如停止線程池的運作。
  • AbstractExecutorService則是上層的抽象類,将執行任務的流程串聯了起來,保證下層的實作隻需關注一個執行任務的方法即可。
  • 最下層的實作類ThreadPoolExecutor實作最複雜的運作部分,ThreadPoolExecutor将會一方面維護自身的生命周期,另一方面同時管理線程和任務,使兩者良好的結合進而執行并行任務。

2.1 線程池的執行流程

介紹完線程池的作用以及解決的核心問題後,下面我們來看看當向線程池送出一個任務後,線程池是如何處理這個任務的?

ThreadPoolExecutor執行execute()方法的示意圖,如下圖所示。

ThreadPoolExecutor線程池原理

如上圖所示,線程池的處理流程如下:

  • 線程池會首先判斷核心線程池裡的線程是否都在執行任務。
    • 如果不是,建立一個新的核心線程來執行任務
    • 如果核心線程池裡的線程都在執行任務,則進入下一個流程
  • 線程池判斷工作隊列是否已滿
    • 如果工作隊列未滿,則将送出的任務放在工作隊列裡,等待核心線程去擷取執行
    • 如果工作隊列滿,進入下一個流程
  • 判斷線程池的線程是否都處于工作狀态
    • 如果沒有,則建立一個新的工作線程來執行任務
    • 如果已經滿了,則交給飽和政策來處理這個任務

介紹完大緻的流程,再來看看ThreadPoolExecutor執行示意圖

線程池在内部實際上建構了一個生産者消費者模型,将線程和任務兩者解耦,并不直接關聯,進而良好的緩沖任務,複用線程。線程池的運作主要分成兩部分:任務管理、線程管理。

ThreadPoolExecutor線程池原理

如上圖所示,ThreadPoolExecutor執行execute方法分下面4種情況:

  1. 如果目前運作的線程少于corePoolSize,則建立新線程來執行任務(注意,執行這一步驟 需要擷取全局鎖)
  2. 如果運作的線程等于或多于corePoolSize,則将任務加入BlockingQueue
  3. 如果無法将任務加入BlockingQueue(隊列已滿),則建立新的線程來處理任務(注意,執行這一步驟需要擷取全局鎖)
  4. 如果建立新線程将使目前運作的線程超出maximumPoolSize,任務将被拒絕,并調RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采取上述步驟的總體設計思路,是為了在執行execute()方法時,盡可能地避免擷取全局鎖(那将會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之後 (目前運作的線程數大于等于corePoolSize),幾乎所有的execute()方法調用都是執行步驟2,而步驟2不需要擷取全局鎖。

2.2 源碼分析

上面的流程分析讓我們很直覺地了解了線程池的工作原理,讓我們再通過源代碼來看看是如何實作的,線程池執行任務的方法如下。

public void execute(Runnable command) {
    if (command == null)
    	throw new NullPointerException();
    // 如果線程數小于基本線程數,則建立線程并執行目前任務
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        // 如線程數大于等于基本線程數或線程建立失敗,則将目前任務放到工作隊列中。
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
            ensureQueuedTaskHandled(command);
        }
        // 如果線程池不處于運作中或任務無法放入隊列,并且目前線程數量小于最大允許的線程數量,
        // 則建立一個線程執行任務。
        else if (!addIfUnderMaximumPoolSize(command))
            // 抛出RejectedExecutionException異常
            reject(command); // is shutdown or saturated
    }
}

           

工作線程:線程池建立線程時,會将線程封裝成工作線程Worker,Worker在執行完任務後,還會循環擷取工作隊列裡的任務來執行。我們可以從Worker類的run()方法裡看到這點。

public void run() {
    try {
        Runnable task = firstTask;
        firstTask = null;
        while (task != null || (task = getTask()) != null) {
            runTask(task);
            task = null;
    	}
    } finally {
        workerDone(this);
    }
}
           

Worker執行任務的模型如下圖所示:

ThreadPoolExecutor線程池原理

線程不斷從阻塞隊列中擷取任務,實作線程管理子產品和任務管理子產品之間的通信。這部分政策由getTask方法實作,其執行流程如下圖所示:

ThreadPoolExecutor線程池原理

ThreadPoolExecutor中線程執行任務的示意圖如下所示:

ThreadPoolExecutor線程池原理

線程池中的線程執行任務分兩種情況,如下:

  • 直接由新建立的線程執行:在execute()方法中建立一個線程時,會讓這個線程執行目前任務。
  • 線程從任務隊列中擷取任務然後執行:這個線程執行完上圖中1的任務後,會反複從BlockingQueue擷取任務來執行
  • 第一種情況僅出現線上程初始建立的時候,第二種是線程擷取任務絕大多數的情況。

3. 線程池的使用

3.1 線程池的建立

通過ThreadPoolExecutor來建立一個線程池

建立一個線程池時需要輸入幾個參數,如下:

  • corePoolSize(線程池的基本大小):當送出一個任務到線程池時,線程池會建立一個線程來執行任務,即其他空閑的基本線程能夠執行新任務也會建立線程,等到需要執行的任務數大于線程池基本大小時就不再建立。如果調用了線程池的prestartAllCoreThreads()方法, 線程池會提前建立并啟動所有基本線程。
  • runnableTaskQueue(任務隊列):用于儲存等待執行的任務的阻塞隊列。可以選擇以下幾個阻塞隊列:
    • ArrayBlockingQueue:是一個基于數組結構的有界阻塞隊列,此隊列按FIFO(先進先出)原則對元素進行排序。
    • LinkedBlockingQueue:一個基于連結清單結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。靜态工廠方法Executors.newFixedThreadPool()使用了這個隊列。
    • SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用 移除操作,否則插入操作一直處于阻塞狀态,吞吐量通常要高于LinkedBlockingQueue,靜态工 廠方法Executors.newCachedThreadPool使用了這個隊列。
    • PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。
  • maximumPoolSize(線程池最大數量):線程池允許建立的最大線程數。如果隊列滿了,并且已建立的線程數小于最大線程數,則線程池會再建立新的線程執行任務。值得注意的是,如果使用了無界的任務隊列這個參數就沒什麼效果。
  • ThreadFactory:用于設定建立線程的工廠,可以通過線程工廠給每個建立出來的線程設定更有意義的名字。使用開源架構guava提供的ThreadFactoryBuilder可以快速給線程池裡的線程設定有意義的名字,代碼如下:
  • RejectedExecutionHandler(飽和政策):當隊列和線程池都滿了,說明線程池處于飽和狀态,那麼必須采取一種政策處理送出的新任務。這個政策預設情況下是AbortPolicy,表示無法處理新任務時抛出異常。在JDK 1.5中Java線程池架構提供了以下4種政策:
    • AbortPolicy:直接抛出異常。
    • CallerRunsPolicy:隻用調用者所線上程來運作任務。
    • DiscardOldestPolicy:丢棄隊列裡最近的一個任務,并執行目前任務。
    • DiscardPolicy:不處理,丢棄掉

      當然,也可以根據應用場景需要來實作RejectedExecutionHandler接口自定義政策。如記錄日志或持久化存儲不能處理的任務。

  • keepAliveTime(線程活動保持時間):線程池的工作線程空閑後,保持存活的時間。是以, 如果任務很多,并且每個任務執行的時間比較短,可以調大時間,提高線程的使用率。
  • TimeUnit(線程活動保持時間的機關):可選的機關有天(DAYS)、小時(HOURS)、分鐘 (MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒 (NANOSECONDS,千分之一微秒)

3.2 向線程池送出任務

可以使用兩個方法向線程池送出任務,分别為execute()和submit()方法:

  • execute()方法用于送出不需要傳回值的任務,是以無法判斷任務是否被線程池執行成功
  • submit()方法用于送出需要傳回值的任務。線程池會傳回一個future類型的對象,通過這個future對象可以判斷任務是否執行成功

以下代碼可知execute()方法輸入的任務是一個Runnable類的執行個體

threadsPool.execute(new Runnable() {
    @Override
    public void run() {
    // TODO Auto-generated method stub
    }
});
           

通過sumbit方法得到的傳回值future對象,可以調用future的get()方法來擷取傳回值,get()方法會阻塞目前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞目前線程一段時間後立即傳回,這時候有可能任務沒有執行完

Future<Object> future = executor.submit(harReturnValuetask);
try {
    Object s = future.get();
} catch (InterruptedException e) {
    // 進行中斷異常
} catch (ExecutionException e) {
    // 處理無法執行任務異常
} finally {
    // 關閉線程池
    executor.shutdown();
}

           

3.3 生命周期管理

線程池運作的狀态,并不是使用者顯式設定的,而是伴随着線程池的運作,由内部來維護。線程池内部使用一個變量維護兩個值:運作狀态(runState)和線程數量 (workerCount)。在具體實作中,線程池将運作狀态(runState)、線程數量 (workerCount)兩個關鍵參數的維護放在了一起,如下代碼所示:

ctl

這個AtomicInteger類型,是對線程池的運作狀态和線程池中有效線程的數量進行控制的一個字段, 它同時包含兩部分的資訊:線程池的運作狀态 (runState) 和線程池内有效線程的數量 (workerCount),高3位儲存runState,低29位儲存workerCount,兩個變量之間互不幹擾。用一個變量去存儲兩個值,可避免在做相關決策時,出現不一緻的情況,不必為了維護兩者的一緻,而占用鎖資源。通過閱讀線程池源代碼也可以發現,經常出現要同時判斷線程池運作狀态和線程數量的情況。線程池也提供了若幹方法去供使用者獲得線程池目前的運作狀态、線程個數。這裡都使用的是位運算的方式,相比于基本運算,速度也會快很多。

關于内部封裝的擷取生命周期狀态、擷取線程池線程數量的計算方法如以下代碼所示:

private static int runStateOf(int c)     { return c & ~CAPACITY; } //計算目前運作狀态
private static int workerCountOf(int c)  { return c & CAPACITY; }  //計算目前線程數量
private static int ctlOf(int rs, int wc) { return rs | wc; }   //通過狀态和線程數生成ctl
           

ThreadPoolExecutor的運作狀态有5種,分别為:

ThreadPoolExecutor線程池原理

其生命周期轉換如下入所示:

ThreadPoolExecutor線程池原理

3.4 關閉線程池

可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池

  • 原理:周遊線程池中的工作線程,然後逐個調用線程的interrupt方法來中斷線程,是以無法響應中斷的任務可能永遠無法終止
    • shutdownNow首先将線程池的狀态設定成 STOP,然後嘗試停止所有的正在執行或暫停任務的線程,并傳回等待執行任務的清單
    • shutdown隻是将線程池的狀态設定成SHUTDOWN狀态,然後中斷所有沒有正在執行任務的線程。

3.5 合理地配置線程池

要想合理地配置線程池,就必須首先分析任務特性,可以從以下幾個角度來分析:

  • 任務的性質:CPU密集型任務、IO密集型任務和混合型任務。
  • 任務的優先級:高、中和低。
  • 任務的執行時間:長、中和短。
  • 任務的依賴性:是否依賴其他系統資源,如資料庫連接配接。

性質不同的任務可以用不同規模的線程池分開處理:

  • CPU密集型任務應配置盡可能小的線程,如配置Ncpu+1個線程的線程池
  • IO密集型任務線程由于并不是一直在執行任務,則應配置盡可能多的線程,如2*Ncpu。混合型的任務

    如果可以拆分,可以将要執行的任務其拆分成一個CPU密集型任務和一個IO密集型任務,隻要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量将高于串行執行的吞吐量。如果這兩個任務執行時間相差太大,則沒必要進行分解

注意:我們初始化線程池的時候,建議使用有界隊列

對于一些阻塞線程,如果我們使用無界隊列會造成線程池的隊列就會越來越多,任務擠壓線上程池中, 有可能會撐滿記憶體,導緻整個系統不可用,而不隻是背景任務出現問題。

有界隊列能增加系統的穩定性和預警能力

如何判斷是 CPU 密集任務還是 IO 密集任務?

CPU 密集型簡單了解就是利用 CPU 計算能力的任務比如你在記憶體中對大量資料進行排序。但凡涉及到網絡讀取,檔案讀取這類都是 IO 密集型,這類任務的特點是 CPU 計算耗費時間相比于等待 IO 操作完成的時間來說很少,大部分時間都花在了等待 IO 操作完成上。

🌈 拓展一下(參見:issue#1737open in new window):

線程數更嚴謹的計算的方法應該是:

最佳線程數 = N(CPU 核心數)∗(1+WT(線程等待時間)/ST(線程計算時間))

,其中

WT(線程等待時間)=線程運作總時間 - ST(線程計算時間)

線程等待時間所占比例越高,需要越多線程。線程計算時間所占比例越高,需要越少線程。

我們可以通過 JDK 自帶的工具 VisualVM 來檢視

WT/ST

比例。

CPU 密集型任務的

WT/ST

接近或者等于 0,是以, 線程數可以設定為 N(CPU 核心數)∗(1+0)= N,和我們上面說的 N(CPU 核心數)+1 差不多。

IO 密集型任務下,幾乎全是線程等待時間,從理論上來說,你就可以将線程數設定為 2N(按道理來說,WT/ST 的結果應該比較大,這裡選擇 2N 的原因應該是為了避免建立過多線程吧)。

美團技術團隊在《Java 線程池實作原理及其在美團業務中的實踐》open in new window這篇文章中介紹到對線程池參數實作可自定義配置的思路和方法。

4. 線程池的監控

如果在系統中大量使用線程池,則有必要對線程池進行監控,友善在出現問題時,可以根據線程池的使用狀況快速定位問題。

可以通過線程池提供的參數進行監控,在監控線程池的時候可以使用以下屬性:

  • taskCount:線程池需要執行的任務數量。
  • completedTaskCount:線程池在運作過程中已完成的任務數量,小于或等于taskCount。
  • largestPoolSize:線程池裡曾經建立過的最大線程數量。通過這個資料可以知道線程池是否曾經滿過。如該數值等于線程池的最大大小,則表示線程池曾經滿過。
  • getPoolSize:線程池的線程數量。如果線程池不銷毀的話,線程池裡的線程不會自動銷 毀,是以這個大小隻增不減。
  • getActiveCount:擷取活動的線程數。

通過擴充線程池進行監控。可以通過繼承線程池來自定義線程池,重寫線程池的beforeExecute、afterExecute和terminated方法,也可以在任務執行前、執行後和線程池關閉前執行一些代碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。 這幾個方法線上程池裡是空方法。

參考資料:

  • Java線程池實作原理及其在美團業務中的實踐 - 美團技術團隊 (meituan.com)
  • JavaGuide
  • 《java并發程式設計的藝術》