天天看點

線程池 threadPoolExecutor詳解

這種文章寫得很不錯,讓我對threadPoolExecutor有了更深入的了解!

ThreadPoolExecutor的完整構造方法的簽名是:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) .

corePoolSize - 池中所儲存的線程數,包括空閑線程。

maximumPoolSize - 池中允許的最大線程數。

keepAliveTime - 當線程數大于核心時,此為終止前多餘的空閑線程等待新任務的最長時間。

unit - keepAliveTime 參數的時間機關。

workQueue - 執行前用于保持任務的隊列。此隊列僅保持由 execute 方法送出的 Runnable 任務。

threadFactory - 執行程式建立新線程時使用的工廠。

handler - 由于超出線程範圍和隊列容量而使執行被阻塞時所使用的處理程式。

ThreadPoolExecutor是Executors類的底層實作。

在JDK幫助文檔中,有如此一段話:

“強烈建議程式員使用較為友善的 Executors 工廠方法 Executors.newCachedThreadPool()(無界線程池,可以進行自動線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池) Executors.newSingleThreadExecutor()(單個背景線程)

它們均為大多數使用場景預定義了設定。”

下面介紹一下幾個類的源碼:

ExecutorService   newFixedThreadPool (int nThreads):固定大小線程池。

可以看到,corePoolSize和maximumPoolSize的大小是一樣的(實際上,後面會介紹,如果使用無界queue的話maximumPoolSize參數是沒有意義的),keepAliveTime和unit的設值表名什麼?-就是該實作不想keep alive!最後的BlockingQueue選擇了LinkedBlockingQueue,該queue有一個特點,他是無界的。

1.   public static ExecutorService newFixedThreadPool(int nThreads) {  

2.           return new ThreadPoolExecutor(nThreads, nThreads,  

3.                                         0L, TimeUnit.MILLISECONDS,  

4.                                         new LinkedBlockingQueue());  

5.       }

ExecutorService   newSingleThreadExecutor():單線程

1.   public static ExecutorService newSingleThreadExecutor() {  

2.           return new FinalizableDelegatedExecutorService  

3.               (new ThreadPoolExecutor(1, 1,  

4.                                       0L, TimeUnit.MILLISECONDS,  

5.                                       new LinkedBlockingQueue()));  

6.       }

ExecutorService newCachedThreadPool():無界線程池,可以進行自動線程回收

這個實作就有意思了。首先是無界的線程池,是以我們可以發現maximumPoolSize為big big。其次BlockingQueue的選擇上使用SynchronousQueue。可能對于該BlockingQueue有些陌生,簡單說:該QUEUE中,每個插入操作必須等待另一個線程的對應移除操作。

1.   public static ExecutorService newCachedThreadPool() {  

2.           return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  

3.                                         60L, TimeUnit.SECONDS,  

4.                                         new SynchronousQueue());  

    }

先從BlockingQueue workQueue這個入參開始說起。在JDK中,其實已經說得很清楚了,一共有三種類型的queue。

所有 BlockingQueue 都可用于傳輸和保持送出的任務。可以使用此隊列與池大小進行互動:

如果運作的線程少于 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。(如果目前運作的線程小于corePoolSize,則任務根本不會存放,添加到queue中,而是直接抄家夥(thread)開始運作)

如果運作的線程等于或多于 corePoolSize,則 Executor 始終首選将請求加入隊列,而不添加新的線程。

如果無法将請求加入隊列,則建立新的線程,除非建立此線程超出 maximumPoolSize,在這種情況下,任務将被拒絕。

queue上的三種類型。

排隊有三種通用政策:

直接送出。工作隊列的預設選項是 SynchronousQueue,它将任務直接送出給線程而不保持它們。在此,如果不存在可用于立即運作任務的線程,則試圖把任務加入隊列将失敗,是以會構造一個新的線程。此政策可以避免在處理可能具有内部依賴性的請求集時出現鎖。直接送出通常要求無界 maximumPoolSizes 以避免拒絕新送出的任務。當指令以超過隊列所能處理的平均數連續到達時,此政策允許無界線程具有增長的可能性。

無界隊列。使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)将導緻在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,建立的線程就不會超過 corePoolSize。(是以,maximumPoolSize 的值也就無效了。)當每個任務完全獨立于其他任務,即任務執行互不影響時,适合于使用無界隊列;例如,在 Web 頁伺服器中。這種排隊可用于處理瞬态突發請求,當指令以超過隊列所能處理的平均數連續到達時,此政策允許無界線程具有增長的可能性。

有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要互相折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、作業系統資源和上下文切換開銷,但是可能導緻人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的排程開銷,這樣也會降低吞吐量。 

BlockingQueue的選擇。

例子一:使用直接送出政策,也即SynchronousQueue。

首先SynchronousQueue是無界的,也就是說他存數任務的能力是沒有限制的,但是由于該Queue本身的特性,在某次添加元素後必須等待其他線程取走後才能繼續添加。在這裡不是核心線程便是新建立的線程,但是我們試想一樣下,下面的場景。

我們使用一下參數構造ThreadPoolExecutor: 

new ThreadPoolExecutor(

   2, 3, 30, TimeUnit.SECONDS,

   new SynchronousQueue(),

   new RecorderThreadFactory("CookieRecorderPool"),

   new ThreadPoolExecutor.CallerRunsPolicy());

 當核心線程已經有2個正在運作.

此時繼續來了一個任務(A),根據前面介紹的“如果運作的線程等于或多于 corePoolSize,則 Executor 始終首選将請求加入隊列,而不添加新的線程。”,是以A被添加到queue中。

又來了一個任務(B),且核心2個線程還沒有忙完,OK,接下來首先嘗試1中描述,但是由于使用的SynchronousQueue,是以一定無法加入進去。

此時便滿足了上面提到的“如果無法将請求加入隊列,則建立新的線程,除非建立此線程超出maximumPoolSize,在這種情況下,任務将被拒絕。”,是以必然會建立一個線程來運作這個任務。

暫時還可以,但是如果這三個任務都還沒完成,連續來了兩個任務,第一個添加入queue中,後一個呢?queue中無法插入,而線程數達到了maximumPoolSize,是以隻好執行異常政策了。

是以在使用SynchronousQueue通常要求maximumPoolSize是無界的,這樣就可以避免上述情況發生(如果希望限制就直接使用有界隊列)。對于使用SynchronousQueue的作用jdk中寫的很清楚:此政策可以避免在處理可能具有内部依賴性的請求集時出現鎖。

什麼意思?如果你的任務A1,A2有内部關聯,A1需要先運作,那麼先送出A1,再送出A2,當使用SynchronousQueue我們可以保證,A1必定先被執行,在A1麼有被執行前,A2不可能添加入queue中。

例子二:使用無界隊列政策,即LinkedBlockingQueue

這個就拿newFixedThreadPool來說,根據前文提到的規則:

如果運作的線程少于 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。那麼當任務繼續增加,會發生什麼呢?

如果運作的線程等于或多于 corePoolSize,則 Executor 始終首選将請求加入隊列,而不添加新的線程。OK,此時任務變加入隊列之中了,那什麼時候才會添加新線程呢?

如果無法将請求加入隊列,則建立新的線程,除非建立此線程超出 maximumPoolSize,在這種情況下,任務将被拒絕。這裡就很有意思了,可能會出現無法加入隊列嗎?不像SynchronousQueue那樣有其自身的特點,對于無界隊列來說,總是可以加入的(資源耗盡,當然另當别論)。換句說,永遠也不會觸發産生新的線程!corePoolSize大小的線程數會一直運作,忙完目前的,就從隊列中拿任務開始運作。是以要防止任務瘋長,比如任務運作的實行比較長,而添加任務的速度遠遠超過處理任務的時間,而且還不斷增加,不一會兒就爆了。

例子三:有界隊列,使用ArrayBlockingQueue。

這個是最為複雜的使用,是以JDK不推薦使用也有些道理。與上面的相比,最大的特點便是可以防止資源耗盡的情況發生。

舉例來說,請看如下構造方法:

new ThreadPoolExecutor(

     2, 4, 30, TimeUnit.SECONDS,

     new ArrayBlockingQueue(2),

     new RecorderThreadFactory("CookieRecorderPool"),

     new ThreadPoolExecutor.CallerRunsPolicy());

假設,所有的任務都永遠無法執行完。

對于首先來的A,B來說直接運作,接下來,如果來了C,D,他們會被放到queue中,如果接下來再來E,F,則增加線程運作E,F。但是如果再來任務,隊列無法再接受了,線程數也到達最大的限制了,是以就會使用拒絕政策來處理。

keepAliveTime

jdk中的解釋是:當線程數大于核心時,此為終止前多餘的空閑線程等待新任務的最長時間。

有點拗口,其實這個不難了解,在使用了“池”的應用中,大多都有類似的參數需要配置。比如資料庫連接配接池,DBCP中的maxIdle,minIdle參數。

什麼意思?接着上面的解釋,後來向老闆派來的勞工始終是“借來的”,俗話說“有借就有還”,但這裡的問題就是什麼時候還了,如果借來的勞工剛完成一個任務就還回去,後來發現任務還有,那豈不是又要去借?這一來一往,老闆肯定頭也大死了。

合理的政策:既然借了,那就多借一會兒。直到“某一段”時間後,發現再也用不到這些勞工時,便可以還回去了。這裡的某一段時間便是keepAliveTime的含義,TimeUnit為keepAliveTime值的度量。

RejectedExecutionHandler

另一種情況便是,即使向老闆借了勞工,但是任務還是繼續過來,還是忙不過來,這時整個隊伍隻好拒絕接受了。

RejectedExecutionHandler接口提供了對于拒絕任務的處理的自定方法的機會。在ThreadPoolExecutor中已經預設包含了4中政策,因為源碼非常簡單,這裡直接貼出來。

CallerRunsPolicy:線程調用運作該任務的 execute 本身。此政策提供簡單的回報控制機制,能夠減緩新任務的送出速度。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            if (!e.isShutdown()) {

                r.run();

            }

        }

這個政策顯然不想放棄執行任務。但是由于池中已經沒有任何資源了,那麼就直接使用調用該execute的線程本身來執行。

AbortPolicy:處理程式遭到拒絕将抛出運作時 RejectedExecutionException

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            throw new RejectedExecutionException();

        }

 這種政策直接抛出異常,丢棄任務。

DiscardPolicy:不能執行的任務将被删除

1.   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  

2.           } 

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

        }

 這種政策和AbortPolicy幾乎一樣,也是丢棄任務,隻不過他不抛出異常。

DiscardOldestPolicy:如果執行程式尚未關閉,則位于工作隊列頭部的任務将被删除,然後重試執行程式(如果再次失敗,則重複此過程)

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            if (!e.isShutdown()) {

                e.getQueue().poll();

                e.execute(r);

            }

        }

該政策就稍微複雜一些,在pool沒有關閉的前提下首先丢掉緩存在隊列中的最早的任務,然後重新嘗試運作該任務。這個政策需要适當小心。

設想:如果其他線程都還在運作,那麼新來任務踢掉舊任務,緩存在queue中,再來一個任務又會踢掉queue中最老任務。

總結:

keepAliveTime和maximumPoolSize及BlockingQueue的類型均有關系。如果BlockingQueue是無界的,那麼永遠不會觸發maximumPoolSize,自然keepAliveTime也就沒有了意義。

反之,如果核心數較小,有界BlockingQueue數值又較小,同時keepAliveTime又設的很小,如果任務頻繁,那麼系統就會頻繁的申請回收線程。

public static ExecutorService newFixedThreadPool(int nThreads) {

        return new ThreadPoolExecutor(nThreads, nThreads,

                                      0L, TimeUnit.MILLISECONDS,

                                      new LinkedBlockingQueue());

本文來源:http://blog.sina.com.cn/s/blog_70bcd7c10101a1w7.html

此外這篇文章也很好,詳解了這幾個參數:http://www.th7.cn/Program/java/201409/279333.shtml