天天看點

使用 Java 執行器實作線程池

在做一個 JSR 315 - servlet 規範 3.0 的報告時,我意識到了解異步 servlet 的一個關鍵點在于首先要了解 Java 中的異步處理機制。有因有果,很快我陷入了執行器(Executor)和執行器服務(ExecutorService)之中 - 因為它們是 Java 的異步處理的關鍵構件。在本部落格中我将就這一主題我對掌握到的東西做一個總結。

幾個概念

任務:定義為一個小的獨立的活動,它表示在某個時間點啟動的一系列工作,進行一些活動或者計算,之後結束。在一個 web 伺服器中,每個傳入的獨立請求都滿足這一定義。在 Java 中,任務的展現為 Runnable 或者 Callable 的執行個體。

線程:可以認為它是一個任務的執行執行個體。如果說任務表示一系列需要完成的工作的話,那麼線程就表示該任務實際的執行。在 Java 中,線程展現為 Thread 的執行個體。

同步處理:發生在當一個任務必須在主線程執行中完成時。換句話講,主程式必須等待目前任務執行結束之後,才可以繼續處理它自己的流程。

異步處理:當主線程将一個任務的處理委托給一個分離的獨立線程時。這個線程将會負責該任務的相關處理,而住線程則傳回處理主程式接下來要做的事情。

線程池:表示一個或多個等待被配置設定工作的線程。線程池會給我們帶來諸多好處。首先,它減少了線程的建立和銷毀所帶來的系統開銷,因為池中的線程得到了複用,而不是每次從無到重新建立。其次,它能夠對系統中活動線程的數量進行控制,這減小了伺服器的記憶體和計算負擔。最後,它允許你将線程管理這種棘手的問題委托給線程池,簡化了你的程式。

這裡,有必要指出起作用的三個重要機制 - 有一些要處理任務的到來(一些請求一系列工作的完成),有任務送出給接受器,然後是每個任務的實際執行。Java 中的 Executor 架構将後兩個機制進行了分離 - 送出和處理。

請求的到來通常不在程式的控制範圍之内 - 這個取決于來自客戶的請求。一個請求的送出通常是這樣完成:被要求的任務被添加到進入任務的隊列,而處理則實作為配置設定一個進入的任務給線程池中空閑等待的一個線程去處理。

Java 5.0 和線程池

Java 5.0 引入了自己的線程池實作 - Executor 和 ExecutorService 接口。這讓你在自己的程式中使用線程池變得更加容易。

Executor 給應用程式對于任務的考慮提供了一個便利的抽象。不需要從線程的方面進行考慮,應用現在隻需簡單地處理 Runnable 的執行個體,然後将其傳給一個 Executor 去處理。

ExecutorService 接口繼承了非常簡化的 Executor 接口,它添加了一些生命周期方法來管理線程池中的線程。比如,你可以關掉池中所有的線程。

另外,Executor 允許你送出一個簡單的任務給池中的某個線程執行,ExecutorService 還能允許你送出一個任務的集合,或者獲得一個 Futrure 對象以跟蹤該任務的執行情況。

Runnable 和 Callable

Executor 架構代表了使用 Runnable 或者 Callable 執行個體的一系列任務。Runnable 的 run() 方法限制是它既沒有傳回值,也不會抛 checked 異常。Callable 是一個加強版,定義了一個 call() 方法以允許一些計算值的傳回,甚至能夠抛出一個異常,如果需要的話。

控制你的任務

你可以通過使用 FutureTask 類擷取任務的詳細資訊,它能夠對 Callable 或 Runnable 的執行個體進行包裝。你可以通過調用一個 ExecutorService 的 submit() 方法的傳回值獲得一個 FutureTask 執行個體,或者你也可以在調用 execute() 方法之前手工将你的任務包裝到一個 FutureTask。

FutureTask 的執行個體,因為其實作了 Future 接口,通過它你能夠監控一個執行中的任務、取消該任務、擷取其執行結果(就像 Callable 的 call() 方法有傳回值那樣)。

ThreadPoolExecutor

最常見的 ExecutorService 實作是 ThreadPoolExecutor。

任務作為一個 Runnable 的執行個體送出給 ThreadPoolExecutor,後者負責實際處理,而你的應用則無須關心在這個抽象的背後到底發生了什麼事情。

ThreadPoolExecutor 的定義如下:

  1. 一個線程池(定義了最多線程和最少線程的數量);
  2. 一個工作隊列:這個隊列持有送出的任務,這些任務将被依次配置設定給線程池中的某個線程。主要有兩種類型的隊列 - 有界和無界的。給一個有界隊列添加任務永遠是成功添加的,但是有界隊列(比如一個固定容量的 LinkedBlockingQueue)在挂起的任務達到其最大容量的時候會拒絕新任務添加。
  3. 一個定義了如何處理被拒絕任務的處理器(飽和政策):當一個任務無法被添加到隊列的時候,線程池将會調用其注冊的拒絕處理器來決定将會發生什麼事情。預設的拒絕政策是簡單地抛出一個 RejectedExecutionException 運作時異常,并由程式捕捉該異常并進行處理。還有其他的一些個政策,比如 DiscardPolicy,它會默默地丢棄任務而沒有任何通知。
  4. 一個線程工廠:預設情況下,ThreadPoolExecutor 執行器構造的新線程将會具有特定屬性 - 比如線程優先級,以及一個根據線程池數量、線程池中線程數來決定的線程名。你可以使用一個自定義工廠來重寫這些預設值。

使用執行器的算法

1. 建立一個執行器

你首先要在一個全局環境下建立一個 Executor 或 ExecutorService 的執行個體(比如一個 servlet 容器下的應用的上下文)。

Executors 類提供了很多建立一個 ExecutorService 的靜态工廠方法。比如,newFixedThreadPool() 傳回一個具有無界隊列和固定線程數的 ThreadPoolExecutor 執行個體;newCachedThreadPool() 方法傳回一個具有無界隊列和無界線程數的 ThreadPoolExecutor 執行個體。對于後者,如果有空閑線程的話,該線程會被複用;如果沒有空閑線程,會建立一個線程并将其添加到線程池。超過固定時間始終閑置的線程将會被移出線程池。

private static final Executor executor = Executors.newFixedThreadPool(10);      

如果不使用這些便利的方法,你也許會發現使用自己定義的 ThreadPoolExecutor 更合适 - 使用它的衆多構造子。

private static final Executor executor = new ThreadPoolExecutor(10, 10, 50000L,   TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100));      

這個構造子将建立一個大小 100 的有界隊列、線程池固定大小為 10 的 ThreadPoolExecutor。

2. 建立一個或多個任務

你需要建立一個或多個由 Runnable 或者 Callable 執行個體執行的任務。

3. 送出任務到執行器

一旦你有了一個 ExecutorService,你就可以通過使用 submit() 或者 execute() 方法将任務送出給它了,之後來自線程池中的一個空閑線程将該任務出列并執行之。

4. 執行任務

執行器除了管理線程池和隊列之外還會負責管理任務的執行。這裡具體會發生什麼取決于線程池的大小限制、空閑線程的數量以及隊列的邊界。

通常情況下,如果線程池具備線程的數量小于其定義的最小線程數,新的線程會被建立以處理隊列中的任務,直到達到該數目限制。

如果池中的線程數超過了配置的最小線程數,線程池将會不再建立更多線程,該任務将會被放入任務隊列,直到一個線程空閑出來去處理它。如果隊列滿了的話,就必須得啟動一個新的線程去處理這個新任務了。

如果池中的線程數到達了配置的最大線程數,線程池将不再啟動新的線程,于是新送出的任務要麼被添加到任務隊列,要麼因為該隊列已滿而被拒絕。

線程池中的線程會持續監控該隊列以擷取任務去執行。等待了超過配置的最大空閑時間的線程将會被終止。

5. 執行器的關閉

程式關閉期間,我們通過調用執行器的 shutdown() 方法将其關閉。你可以選擇是暴利關閉還是優雅關閉。

參考資料

  • Goetz 等著《​​Java并發程式設計實踐​​》
  • Oaks 和 Wong 著《​​Java 線程​​》