
本文你将獲得以下資訊:
線程池源碼解讀
線程池執行流程分析
帶傳回值的線程池實作
延遲線程池實作
為了友善讀者了解,本文會由淺入深,先從線程池的使用開始再延伸到源碼解讀和源碼分析等進階内容,讀者可根據自己的情況自主選擇閱讀順序和需要了解的章節。
線程池能夠更加充分的利用CPU、記憶體、網絡、IO等系統資源,線程池的主要作用如下:
利用線程池可以複用線程,控制最大并發數;
實作任務緩存政策和拒絕機制;
實作延遲執行
阿裡巴巴Java開發手冊強制規定:線程資源必須通過線程池提供,如下圖:
本節會介紹7種線程池的建立與使用,線程池的狀态介紹,ThreadPoolExecutor參數介紹等。
線程池可以使用Executors和ThreadPoolExecutor,其中使用Executors有六種建立線程池的方法,如下圖:
newSingleThreadExecutor(),它的特點在于工作線程數目被限制為 1,操作一個無界的工作隊列,是以它保證了所有任務的都是被順序執行,最多會有一個任務處于活動狀态,并且不允許使用者改動線程池執行個體,是以可以避免其改變線程數目。
newCachedThreadPool(),它是一種用來處理大量短時間工作任務的線程池,具有幾個鮮明特點:它會試圖緩存線程并重用,當無緩存線程可用時,就會建立新的工作線程;如果線程閑置的時間超過 60 秒,則被終止并移出緩存;長時間閑置時,這種線程池,不會消耗什麼資源。其内部使用 SynchronousQueue 作為工作隊列。
newFixedThreadPool(int nThreads),重用指定數目(nThreads)的線程,其背後使用的是無界的工作隊列,任何時候最多有 nThreads 個工作線程是活動的。這意味着,如果任務數量超過了活動隊列數目,将在工作隊列中等待空閑線程出現;如果有工作線程退出,将會有新的工作線程被建立,以補足指定的數目 nThreads。
newSingleThreadScheduledExecutor() 建立單線程池,傳回 ScheduledExecutorService,可以進行定時或周期性的工作排程。
newScheduledThreadPool(int corePoolSize)和newSingleThreadScheduledExecutor()類似,建立的是個 ScheduledExecutorService,可以進行定時或周期性的工作排程,差別在于單一工作線程還是多個工作線程。
newWorkStealingPool(int parallelism),這是一個經常被人忽略的線程池,Java 8 才加入這個建立方法,其内部會建構ForkJoinPool,利用Work-Stealing算法,并行地處理任務,不保證處理順序。
ThreadPoolExecutor是最原始的線程池建立,上面1-3建立方式都是對ThreadPoolExecutor的封裝。
總結: 其中newSingleThreadExecutor、newCachedThreadPool、newFixedThreadPool是對ThreadPoolExecutor的封裝實作,newSingleThreadScheduledExecutor、newScheduledThreadPool則為ThreadPoolExecutor子類ScheduledThreadPoolExecutor的封裝,用于執行延遲任務,newWorkStealingPool則為Java 8新加的方法。
從以上代碼可以看出newSingleThreadExecutor和newSingleThreadScheduledExecutor建立的都是單線程池,那麼單線程池的意義是什麼呢?
雖然是單線程池,但提供了工作隊列,生命周期管理,工作線程維護等功能。
ThreadPoolExecutor作為線程池的核心方法,我們來看一下ThreadPoolExecutor内部實作,以及封裝類是怎麼調用ThreadPoolExecutor的。
先從構造函數說起,構造函數源碼如下:
參數說明:
corePoolSize:所謂的核心線程數,可以大緻了解為長期駐留的線程數目(除非設定了 allowCoreThreadTimeOut)。對于不同的線程池,這個值可能會有很大差別,比如 newFixedThreadPool 會将其設定為 nThreads,而對于 newCachedThreadPool 則是為 0。
maximumPoolSize:顧名思義,就是線程不夠時能夠建立的最大線程數。同樣進行對比,對于 newFixedThreadPool,當然就是 nThreads,因為其要求是固定大小,而 newCachedThreadPool 則是 Integer.MAX_VALUE。
keepAliveTime:空閑線程的保活時間,如果線程的空閑時間超過這個值,那麼将會被關閉。注意此值生效條件必須滿足:空閑時間超過這個值,并且線程池中的線程數少于等于核心線程數corePoolSize。當然核心線程預設是不會關閉的,除非設定了allowCoreThreadTimeOut(true)那麼核心線程也可以被回收。
TimeUnit:時間機關。
BlockingQueue:任務丢列,用于存儲線程池的待執行任務的。
threadFactory:用于生成線程,一般我們可以用預設的就可以了。
handler:當線程池已經滿了,但是又有新的任務送出的時候,該采取什麼政策由這個來指定。有幾種方式可供選擇,像抛出異常、直接拒絕然後傳回等,也可以自己實作相應的接口實作自己的邏輯。
來看一下線程池封裝類對于ThreadPoolExecutor的調用:
newSingleThreadExecutor對ThreadPoolExecutor的封裝源碼如下:
newCachedThreadPool對ThreadPoolExecutor的封裝源碼如下:
newFixedThreadPool對ThreadPoolExecutor的封裝源碼如下:
ScheduledExecutorService對ThreadPoolExecutor的封裝源碼如下:
newSingleThreadScheduledExecutor使用的是ThreadPoolExecutor的子類ScheduledThreadPoolExecutor,如下圖所示:

newScheduledThreadPool對ThreadPoolExecutor的封裝源碼如下:
newScheduledThreadPool使用的也是ThreadPoolExecutor的子類ScheduledThreadPoolExecutor。
檢視ThreadPoolExecutor源碼可知線程的狀态如下:
線程狀态解讀(以下内容來源于:https://javadoop.com/post/java-thread-pool):
RUNNING:這個沒什麼好說的,這是最正常的狀态:接受新的任務,處理等待隊列中的任務;
SHUTDOWN:不接受新的任務送出,但是會繼續處理等待隊列中的任務;
STOP:不接受新的任務送出,不再處理等待隊列中的任務,中斷正在執行任務的線程;
TIDYING:所有的任務都銷毀了,workCount 為 0。線程池的狀态在轉換為 TIDYING 狀态時,會執行鈎子方法 terminated();
TERMINATED:terminated() 方法結束後,線程池的狀态就會變成這個;
RUNNING 定義為 -1,SHUTDOWN 定義為 0,其他的都比 0 大,是以等于 0 的時候不能送出任務,大于 0 的話,連正在執行的任務也需要中斷。
看了這幾種狀态的介紹,讀者大體也可以猜到十之八九的狀态轉換了,各個狀态的轉換過程有以下幾種:
RUNNING -> SHUTDOWN:當調用了 shutdown() 後,會發生這個狀态轉換,這也是最重要的;
(RUNNING or SHUTDOWN) -> STOP:當調用 shutdownNow() 後,會發生這個狀态轉換,這下要清楚 shutDown() 和 shutDownNow() 的差別了;
SHUTDOWN -> TIDYING:當任務隊列和線程池都清空後,會由 SHUTDOWN 轉換為 TIDYING;
STOP -> TIDYING:當任務隊列清空後,發生這個轉換;
TIDYING -> TERMINATED:這個前面說了,當 terminated() 方法結束後;
說了那麼多下來一起來看線程池的是怎麼執行任務的,線程池任務送出有兩個方法:
execute
submit
其中execute隻能接受Runnable類型的任務,使用如下:
submit可以接受Runnable或Callable類型的任務,使用如下:
使用submit傳遞Callable類可以擷取執行任務的傳回值,Callable是JDK 1.5 添加的特性用于補充Runnable無傳回的情況。
線上程池中newSingleThreadScheduledExecutor和newScheduledThreadPool傳回的是ScheduledExecutorService,用于執行延遲線程池的,代碼如下:
完整示例下載下傳位址: https://github.com/vipstone/java-core-example
閱讀線程池的源碼有一個小技巧,可以按照線程池執行的順序進行串連關聯閱讀,這樣更容易了解線程池的實作。
源碼閱讀流程解讀
我們先從線程池的任務送出方法execute()開始閱讀,從execute()我們會發現線程池執行的核心方法是addWorker(),在addWorker()中我們發現啟動線程調用了start()方法,調用start()方法之後會執行Worker類的run()方法,run裡面調用runWorker(),運作程式的關鍵在于getTask()方法,getTask()方法之後就是此線程的關閉,整個線程池的工作流程也就完成了,下來一起來看吧(如果本段文章沒看懂的話也可以看完源碼之後,回過頭來再看一遍)。
在這段代碼可以看出,調用了t.start();
根據上面代碼可知,調用了Worker的t.start()之後,緊接着會調用Worker的run()方法,run()源碼如下:
runWorker()源碼如下:
runWorker裡面的有getTask(),來看下具體的實作:
線程池的執行流程如下圖:
本文總結以問答的形式展示,引自《深度解讀 java 線程池設計思想及源碼實作》,最下方附參考位址。
corePoolSize 到 maximumPoolSize 之間的線程會被回收,當然 corePoolSize 的線程也可以通過設定而得到回收(allowCoreThreadTimeOut(true))。
workQueue 用于存放任務,添加任務的時候,如果目前線程數超過了 corePoolSize,那麼往該隊列中插入任務,線程池中的線程會負責到隊列中拉取任務。
keepAliveTime 用于設定空閑時間,如果線程數超出了 corePoolSize,并且有些線程的空閑時間超過了這個值,會執行關閉這些線程的操作
rejectedExecutionHandler 用于處理當線程池不能執行此任務時的情況,預設有抛出 RejectedExecutionException 異常、忽略任務、使用送出任務的線程來執行此任務和将隊列中等待最久的任務删除,然後送出此任務這四種政策,預設為抛出異常。
如果目前線程數少于 corePoolSize,那麼送出任務的時候建立一個新的線程,并由這個線程執行這個任務;
如果目前線程數已經達到 corePoolSize,那麼将送出的任務添加到隊列中,等待線程池中的線程去隊列中取任務;
如果隊列已滿,那麼建立新的線程來執行任務,需要保證池中的線程數不會超過 maximumPoolSize,如果此時線程數超過了 maximumPoolSize,那麼執行拒絕政策。
如果某個任務執行出現異常,那麼執行任務的線程會被關閉,而不是繼續接收其他任務。然後會啟動一個新的線程來代替它。
workers 的數量達到了 corePoolSize,任務入隊成功,以此同時線程池被關閉了,而且關閉線程池并沒有将這個任務出隊,那麼執行拒絕政策。這裡說的是非常邊界的問題,入隊和關閉線程池并發執行,讀者仔細看看 execute 方法是怎麼進到第一個 reject(command) 裡面的。
workers 的數量大于等于 corePoolSize,準備入隊,可是隊列滿了,任務入隊失敗,那麼準備開啟新的線程,可是線程數已經達到 maximumPoolSize,那麼執行拒絕政策。
書籍:《碼出高效:Java開發手冊》
Java核心技術36講:http://t.cn/EwUJvWA
深度解讀 java 線程池設計思想及源碼實作:https://javadoop.com/post/java-thread-pool
Java線程池-ThreadPoolExecutor源碼解析(基于Java8):https://www.imooc.com/article/42990
課程推薦:
關注下面二維碼,訂閱更多精彩内容。
關注公衆号(加好友):
作者:
王磊的部落格
出處:
http://vipstone.cnblogs.com/