線程池基本原理是:系統先啟動若幹數量的線程。并讓這些線程處于睡眠狀态。當有新任務時,就會喚醒線程池中的某一個睡眠線程,讓它來處理這個任務。當處理完這個任務後,線程又處于睡眠狀态。
Java 中,線程池的主要組成部分是工作者線程。這樣的類型的線程獨立于它運作的Runnable和Callable任務存在,而且經經常使用于運作多個任務。
工作者線程和普通線程不同之處在于run方法的不同。
普通線程在完畢線程應該運作的代碼後,自然退出,線程結束,Java虛拟機回收配置設定給線程的資源。線程對象被垃圾回收期收回。而構成線程池的工作者線程是可重用線程,它的run方法運作完某一任務的特定代碼後,使自己進入睡眠狀态而不是結束線程。
線程池作用就是限制系統中運作線程的數量。
依據系統的環境情況,能夠自己主動或手動設定線程數量,達到執行的最佳效果;少了浪費了系統資源,多了造成系統擁擠效率不高。用線程池控制線程數量,其它線程排隊等候。一個任務執行完成,再從隊列的中取最前面的任務開始執行。若隊列中沒有等待程序,線程池的這一資源處于等待。當一個新任務須要執行時,假設線程池中有等待的工作線程,就能夠開始執行了。否則進入等待隊列。
①降低了建立和銷毀線程的次數,每一個工作線程都能夠被反複利用。可運作多個任務。
②能夠依據系統的承受能力,調整線程池中工作線線程的數目。防止由于消耗過多的記憶體,而把server累趴下(每一個線程須要大約1MB記憶體,線程開的越多。消耗的記憶體也就越大,最後當機)。
Java裡面線程池的頂級接口是Executor,可是嚴格意義上講Executor并非一個線程池。而僅僅是一個運作線程的工具。真正的線程池接口是ExecutorService。
比較重要的幾個類:
<col>
ExecutorService
真正的線程池接口
ScheduledExecutorService
和Timer/TimerTask類似,解決那些須要任務反複運作的問題
ThreadPoolExecutor
ExecutorService的預設實作
ScheduledThreadPoolExecutor
繼承ThreadPoolExecutor的ScheduledExecutorService接口實作,周期性任務排程的類實作
類之間的架構圖
ThreadPoolExecutor具體解釋
hreadPoolExecutor的完整構造方法的簽名是:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) .
參數解釋:
corePoolSize - 池中所儲存的線程數。包含空暇線程。
maximumPoolSize-池中同意的最大線程數。
keepAliveTime - 當線程數大于核心時。此為終止前多餘的空暇線程等待新任務的最長時間。
unit - keepAliveTime 參數的時間機關。
workQueue - 運作前用于保持任務的隊列。
此隊列僅保持由 execute方法送出的 Runnable任務。
threadFactory - 運作程式建立新線程時使用的工廠。
handler - 因為超出線程範圍和隊列容量而使運作被堵塞時所使用的處理程式。
ThreadPoolExecutor是Executors類的底層實作。
在JDK幫助文檔中,有如此一段話:
“強烈建議程式猿使用較為友善的Executors工廠方法Executors.newCachedThreadPool()(無界線程池,能夠進行自己主動線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)Executors.newSingleThreadExecutor()(單個背景線程),它們均為大多數使用場景提前定義了設定。”
以下介紹幾個類的源代碼:
ExecutorService newFixedThreadPool (int nThreads):固定大小線程池
能夠看到。corePoolSize和maximumPoolSize的大小是一樣的(實際上,後面會介紹。假設使用無界queue的話maximumPoolSize參數是沒有意義的),keepAliveTime和unit的設值表名什麼?——就是該實作不想keep alive。最後的BlockingQueue選擇了LinkedBlockingQueue,該LinkedBlockingQueue有一個特點,它是無界的。
ExecutorService newSingleThreadExecutor():單線程
ExecutorService newCachedThreadPool():無界線程池,能夠進行自己主動線程回收
首先是無界的線程池,是以我們能夠發現maximumPoolSize為Integer.MAX_VALUE。
其次BlockingQueue的選擇上使用SynchronousQueue。可能對于該BlockingQueue有些陌生,簡單說:該BlockingQueue中。每一個插入操作必須等待還有一個線程的相應移除操作。
先從BlockingQueue<Runnable> workQueue這個入參開始說起。
在JDK中,事實上已經說得非常清楚了,一共同擁有三種類型的queue。
全部BlockingQueue 都可用于傳輸和保持送出的任務。
能夠使用此隊列與池大小進行互動:
假設執行的線程少于 corePoolSize,則 Executor始終首選加入新的線程,而不進行排隊。(假設目前執行的線程小于corePoolSize。則任務根本不會存放,加入到queue中,而是直接抄家夥(thread)開始執行)
假設執行的線程等于或多于 corePoolSize,則 Executor始終首選将請求加入隊列,而不加入新的線程。
假設無法将請求增加隊列,則建立新的線程。除非建立此線程超出 maximumPoolSize,在這樣的情況下。任務将被拒絕。
排隊有三種通用政策:
直接送出
工作隊列的預設選項是 SynchronousQueue,它将任務直接送出給線程而不保持它們。
在此,假設不存在可用于馬上執行任務的線程,則試圖把任務增加隊列将失敗。是以會構造一個新的線程。
此政策能夠避免在處理可能具有内部依賴性的請求集時出現鎖。
直接送出通常要求無界maximumPoolSizes 以避免拒絕新送出的任務。
當指令以超過隊列所能處理的平均數連續到達時。此政策同意無界線程具有增長的可能性。
無界隊列
使用無界隊列(比如,不具有提前定義容量的 LinkedBlockingQueue)将導緻在全部 corePoolSize 線程都忙時新任務在隊列中等待。這樣,建立的線程就不會超過 corePoolSize。
(是以。maximumPoolSize的值也就無效了。
)當每一個任務全然獨立于其它任務。即任務運作互不影響時。适合于使用無界隊列。比如。在 Web頁server中。這樣的排隊可用于處理瞬态突發請求,當指令以超過隊列所能處理的平均數連續到達時,此政策同意無界線程具有增長的可能性。
有界隊列
當使用有限的 maximumPoolSizes時。有界隊列(如 ArrayBlockingQueue)有助于防止資源耗盡,可是可能較難調整和控制。隊列大小和最大池大小可能須要互相折衷:使用大型隊列和小型池能夠最大限度地減少 CPU 使用率、作業系統資源和上下文切換開銷。可是可能導緻人工減少吞吐量。假設任務頻繁堵塞(比如。假設它們是 I/O邊界)。則系統可能為超過您許可的很多其它線程安排時間。
使用小型隊列通常要求較大的池大小,CPU使用率較高,可是可能遇到不可接受的排程開銷,這樣也會減少吞吐量。
①使用直接送出政策,也即SynchronousQueue
首先SynchronousQueue是無界的。也就是說他存數任務的能力是沒有限制的。可是因為該Queue本身的特性,在某次加入元素後必須等待其它線程取走後才幹繼續加入。在這裡不是核心線程便是新建立的線程,可是我們試想一樣下。以下的場景。
我們使用一下參數構造ThreadPoolExecutor:
當核心線程已經有2個正在執行.
1、此時繼續來了一個任務(A),依據前面介紹的“假設執行的線程等于或多于 corePoolSize,則 Executor始終首選将請求加入隊列。而不加入新的線程。”,是以A被加入到queue中。
2、又來了一個任務(B),且核心2個線程還沒有忙完。OK,接下來首先嘗試1中描寫叙述,可是因為使用的SynchronousQueue,是以一定無法增加進去。
3、此時便滿足了上面提到的“假設無法将請求增加隊列,則建立新的線程。除非建立此線程超出maximumPoolSize,在這樣的情況下,任務将被拒絕。”。是以必定會建立一個線程來執行這個任務。
4、臨時還能夠,可是假設這三個任務都還沒完畢。連續來了兩個任務,第一個加入入queue中,後一個呢?queue中無法插入,而線程數達到了maximumPoolSize。是以僅僅好運作異常政策了。
是以在使用SynchronousQueue通常要求maximumPoolSize是無界的,這樣就能夠避免上述情況發生(假設希望限制就直接使用有界隊列)。
對于使用SynchronousQueue的作用jdk中寫的非常清楚:此政策能夠避免在處理可能具有内部依賴性的請求集時出現鎖。
什麼意思?假設你的任務A1。A2有内部關聯。A1須要先運作,那麼先送出A1。再送出A2,當使用SynchronousQueue我們能夠保證,A1必然先被運作,在A1麼有被運作前,A2不可能加入入queue中。
②使用無界隊列政策。即LinkedBlockingQueue
這個是最為複雜的使用。是以JDK不推薦使用也有些道理。與上面的相比,最大的特點便是能夠防止資源耗盡的情況發生。
舉例來說。請看例如以下構造方法:
如果。全部的任務都永遠無法運作完。
對于首先來的A,B來說直接執行。接下來,假設來了C,D。他們會被放到queue中,假設接下來再來E,F,則添加線程執行E,F。可是假設再來任務,隊列無法再接受了,線程數也到達最大的限制了。是以就會使用拒絕政策來處理。
jdk中的解釋是:當線程數大于核心時。此為終止前多餘的空暇線程等待新任務的最長時間。
有點拗口,事實上這個不難了解,在使用了“池”的應用中,大多都有類似的參數須要配置。比方資料庫連接配接池,DBCP中的maxIdle。minIdle參數。
什麼意思?接着上面的解釋。後來向老闆派來的勞工始終是“借來的”。俗話說“有借就有還”,但這裡的問題就是什麼時候還了。假設借來的勞工剛完畢一個任務就還回去,後來發現任務還有,那豈不是又要去借?這一來一往。老闆肯定頭也大死了。
合理的政策:既然借了。那就多借一會兒。直到“某一段”時間後,發現再也用不到這些勞工時。便能夠還回去了。
這裡的某一段時間便是keepAliveTime的含義,TimeUnit為keepAliveTime值的度量。
還有一種情況便是。即使向老闆借了勞工,可是任務還是繼續過來。還是忙隻是來,這時整個隊伍僅僅好拒絕接受了。
RejectedExecutionHandler接口提供了對于拒絕任務的處理的自定方法的機會。在ThreadPoolExecutor中已經預設包括了4中政策,由于源代碼很easy,這裡直接貼出來。
① CallerRunsPolicy:線程調用執行該任務的 execute 本身。此政策提供簡單的回報控制機制,可以減緩新任務的送出速度。
這個政策顯然不想放棄運作任務。可是因為池中已經沒有不論什麼資源了,那麼就直接使用調用該execute的線程本身來運作。
② AbortPolicy:處理程式遭到拒絕将抛出執行時RejectedExecutionException
這樣的政策直接抛出異常,丢棄任務。
③ DiscardPolicy:不能運作的任務将被删除
這樣的政策和AbortPolicy差點兒一樣,也是丢棄任務,僅僅隻是他不抛出異常。
④ DiscardOldestPolicy:假設運作程式尚未關閉,則位于工作隊列頭部的任務将被删除,然後重試運作程式(假設再次失敗,則反複此過程)
該政策就略微複雜一些,在pool沒有關閉的前提下首先丢掉緩存在隊列中的最早的任務。然後又一次嘗試執行該任務。這個政策須要适當小心。
設想:假設其它線程都還在執行,那麼新來任務踢掉舊任務,緩存在queue中,再來一個任務又會踢掉queue中最老任務。
小結
keepAliveTime和maximumPoolSize及BlockingQueue的類型均有關系。假設BlockingQueue是無界的。那麼永遠不會觸發maximumPoolSize。自然keepAliveTime也就沒有了意義。
反之,假設核心數較小,有界BlockingQueue數值又較小。同一時候keepAliveTime又設的非常小。假設任務頻繁。那麼系統就會頻繁的申請回收線程。
①newSingleThreadExecutor
MyThread.java
TestSingleThreadExecutor.java
輸出結果:
②newFixedThreadPool
TestFixedThreadPool.java
③newCachedThreadPool
TestCachedThreadPool.java
④newScheduledThreadPool
TestScheduledThreadPoolExecutor.java