天天看點

由淺入深了解Java線程池及線程池的如何使用

多線程的異步執行方式,雖然能夠最大限度發揮多核計算機的計算能力,但是如果不加控制,反而會對系統造成負擔。線程本身也要占用記憶體空間,大量的線程會占用記憶體資源并且可能會導緻Out of Memory。即便沒有這樣的情況,大量的線程回收也會給GC帶來很大的壓力。

為了避免重複的建立線程,線程池的出現可以讓線程進行複用。通俗點講,當有工作來,就會向線程池拿一個線程,當工作完成後,并不是直接關閉線程,而是将這個線程歸還給線程池供其他任務使用。

接下來從總體到細緻的方式,來共同探讨線程池。

來看Executor的架構圖:

接口:Executor,CompletionService,ExecutorService,ScheduledExecutorService

抽象類:AbstractExecutorService

實作類:ExecutorCompletionService,ThreadPoolExecutor,ScheduledThreadPoolExecutor

從圖中就可以看到主要的方法,本文主要讨論的是ThreadPoolExecutor

看一下該類的構造器:

corePoolSize :線程池的核心池大小,在建立線程池之後,線程池預設沒有任何線程。

當有任務過來的時候才會去建立建立線程執行任務。換個說法,線程池建立之後,線程池中的線程數為0,當任務過來就會建立一個線程去執行,直到線程數達到corePoolSize 之後,就會被到達的任務放在隊列中。(注意是到達的任務)。換句更精煉的話:corePoolSize 表示允許線程池中允許同時運作的最大線程數。

如果執行了線程池的prestartAllCoreThreads()方法,線程池會提前建立并啟動所有核心線程。

maximumPoolSize :線程池允許的最大線程數,他表示最大能建立多少個線程。maximumPoolSize肯定是大于等于corePoolSize。

keepAliveTime :表示線程沒有任務時最多保持多久然後停止。預設情況下,隻有線程池中線程數大于corePoolSize 時,keepAliveTime 才會起作用。換句話說,當線程池中的線程數大于corePoolSize,并且一個線程空閑時間達到了keepAliveTime,那麼就是shutdown。

Unit:keepAliveTime 的機關。

workQueue :一個阻塞隊列,用來存儲等待執行的任務,當線程池中的線程數超過它的corePoolSize的時候,線程會進入阻塞隊列進行阻塞等待。通過workQueue,線程池實作了阻塞功能

threadFactory :線程工廠,用來建立線程。

handler :表示當拒絕處理任務時的政策。

任務緩存隊列

在前面我們多次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。

workQueue的類型為BlockingQueue<Runnable>,通常可以取下面三種類型:

1)有界任務隊列ArrayBlockingQueue:基于數組的先進先出隊列,此隊列建立時必須指定大小;

2)無界任務隊列LinkedBlockingQueue:基于連結清單的先進先出隊列,如果建立時沒有指定此隊列大小,則預設為Integer.MAX_VALUE;

3)直接送出隊列synchronousQueue:這個隊列比較特殊,它不會儲存送出的任務,而是将直接建立一個線程來執行新來的任務。

AbortPolicy:丢棄任務并抛出RejectedExecutionException

CallerRunsPolicy:隻要線程池未關閉,該政策直接在調用者線程中,運作目前被丢棄的任務。顯然這樣做不會真的丢棄任務,但是,任務送出線程的性能極有可能會急劇下降。

DiscardOldestPolicy:丢棄隊列中最老的一個請求,也就是即将被執行的一個任務,并嘗試再次送出目前任務。

DiscardPolicy:丢棄任務,不做任何處理。

如果目前線程池中的線程數目小于corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;

如果目前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試将其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程将其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;如果目前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕政策進行處理;

如果線程池中的線程數量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程将被終止,直至線程池中的線程數目不大于corePoolSize;如果允許為核心池中的線程設定存活時間,那麼核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。

線程池的關閉

ThreadPoolExecutor提供了兩個方法,用于線程池的關閉,分别是shutdown()和shutdownNow(),其中:

shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完後才終止,但再也不會接受新的任務

shutdownNow():立即終止線程池,并嘗試打斷正在執行的任務,并且清空任務緩存隊列,傳回尚未執行的任務

首先來看最核心的execute方法,這個方法在AbstractExecutorService中并沒有實作,從Executor接口,直到ThreadPoolExecutor才實作了改方法,

ExecutorService中的submit(),invokeAll(),invokeAny()都是調用的execute方法,是以execute是核心中的核心,源碼分析将圍繞它逐漸展開。

 相信看了代碼也是一臉懵,接下來用一個流程圖來講一講,他究竟幹了什麼事:

由淺入深了解Java線程池及線程池的如何使用

結合上面的流程圖來逐行解析,首先前面進行空指針檢查,

wonrkerCountOf()方法能夠取得目前線程池中的線程的總數,取得目前線程數與核心池大小比較,

如果小于,将通過addWorker()方法排程執行。

如果大于核心池大小,那麼就送出到等待隊列。

如果進入等待隊列失敗,則會将任務直接送出給線程池。

如果線程數達到最大線程數,那麼就送出失敗,執行拒絕政策。

excute()方法中添加任務的方式是使用addWorker()方法,看一下源碼,一起學習一下。

addWorker(Runnable firstTask, boolean core)的主要任務是建立并啟動線程。

他會根據目前線程的狀态和給定的值(core or maximum)來判斷是否可以建立一個線程。

addWorker共有四種傳參方式。execute使用了其中三種,分别為:

1.addWorker(paramRunnable, true)

線程數小于corePoolSize時,放一個需要處理的task進Workers Set。如果Workers Set長度超過corePoolSize,就傳回false.

2.addWorker(null, false)

放入一個空的task進workers Set,長度限制是maximumPoolSize。這樣一個task為空的worker線上程執行的時候會去任務隊列裡拿任務,這樣就相當于建立了一個新的線程,隻是沒有馬上配置設定任務。

3.addWorker(paramRunnable, false)

當隊列被放滿時,就嘗試将這個新來的task直接放入Workers Set,而此時Workers Set的長度限制是maximumPoolSize。如果線程池也滿了的話就傳回false.

還有一種情況是execute()方法沒有使用的

addWorker(null, true)

這個方法就是放一個null的task進Workers Set,而且是在小于corePoolSize時,如果此時Set中的數量已經達到corePoolSize那就傳回false,什麼也不幹。實際使用中是在prestartAllCoreThreads()方法,這個方法用來為線程池預先啟動corePoolSize個worker等待從workQueue中擷取任務執行。

執行流程:

1、判斷線程池目前是否為可以添加worker線程的狀态,可以則繼續下一步,不可以return false:

    A、線程池狀态>shutdown,可能為stop、tidying、terminated,不能添加worker線程

    B、線程池狀态==shutdown,firstTask不為空,不能添加worker線程,因為shutdown狀态的線程池不接收新任務

C、線程池狀态==shutdown,firstTask==null,workQueue為空,不能添加worker線程,因為firstTask為空是為了添加一個沒有任務的線程再從workQueue擷取task,而workQueue為      空,說明添加無任務線程已經沒有意義

2、線程池目前線程數量是否超過上限(corePoolSize 或 maximumPoolSize),超過了return false,沒超過則對workerCount+1,繼續下一步

3、線上程池的ReentrantLock保證下,向Workers

Set中添加新建立的worker執行個體,添加完成後解鎖,并啟動worker線程,如果這一切都成功了,return

true,如果添加worker入Set失敗或啟動失敗,調用addWorkerFailed()邏輯

newFixedThreadPool

固定大小的線程池,可以指定線程池的大小,該線程池corePoolSize和maximumPoolSize相等,阻塞隊列使用的是LinkedBlockingQueue,大小為整數最大值。

該線程池中的線程數量始終不變,當有新任務送出時,線程池中有空閑線程則會立即執行,如果沒有,則會暫存到阻塞隊列。對于固定大小的線程池,不存線上程數量的變化。同時使用無界的LinkedBlockingQueue來存放執行的任務。當任務送出十分頻繁的時候,LinkedBlockingQueue

迅速增大,存在着耗盡系統資源的問題。而且線上程池空閑時,即線程池中沒有可運作任務時,它也不會釋放工作線程,還會占用一定的系統資源,需要shutdown。

newSingleThreadExecutor

單個線程線程池,隻有一個線程的線程池,阻塞隊列使用的是LinkedBlockingQueue,若有多餘的任務送出到線程池中,則會被暫存到阻塞隊列,待空閑時再去執行。按照先入先出的順序執行任務。

newCachedThreadPool

緩存線程池,緩存的線程預設存活60秒。線程的核心池corePoolSize大小為0,核心池最大為Integer.MAX_VALUE,阻塞隊列使用的是SynchronousQueue。是一個直接送出的阻塞隊列,    他總會迫使線程池增加新的線程去執行新的任務。在沒有任務執行時,當線程的空閑時間超過keepAliveTime(60秒),則工作線程将會終止被回收,當送出新任務時,如果沒有空閑線程,則建立新線程執行任務,會導緻一定的系統開銷。如果同時又大量任務被送出,而且任務執行的時間不是特别快,那麼線程池便會新增出等量的線程池處理任務,這很可能會很快耗盡系統的資源。

newScheduledThreadPool

定時線程池,該線程池可用于周期性地去執行任務,通常用于周期性的同步資料。

scheduleAtFixedRate:是以固定的頻率去執行任務,周期是指每次執行任務成功執行之間的間隔。

schedultWithFixedDelay:是以固定的延時去執行任務,延時是指上一次執行成功之後和下一次開始執行的之前的時間。

newFixedThreadPool執行個體:

由淺入深了解Java線程池及線程池的如何使用
由淺入深了解Java線程池及線程池的如何使用

View Code

newCachedThreadPool執行個體:

由淺入深了解Java線程池及線程池的如何使用
由淺入深了解Java線程池及線程池的如何使用

這裡沒用調用shutDown方法,這裡可以發現過60秒之後,會自動釋放資源。

由淺入深了解Java線程池及線程池的如何使用
由淺入深了解Java線程池及線程池的如何使用

這裡需要注意一點,newSingleThreadExecutor和newFixedThreadPool一樣,線上程池中沒有任務時可執行,也不會釋放系統資源的,是以需要shudown。

由淺入深了解Java線程池及線程池的如何使用
由淺入深了解Java線程池及線程池的如何使用

線程池的大小決定着系統的性能,過大或者過小的線程池數量都無法發揮最優的系統性能。

當然線程池的大小也不需要做的太過于精确,隻需要避免過大和過小的情況。一般來說,确定線程池的大小需要考慮CPU的數量,記憶體大小,任務是計算密集型還是IO密集型等因素

NCPU = CPU的數量

UCPU = 期望對CPU的使用率 0 ≤ UCPU ≤ 1

W/C = 等待時間與計算時間的比率

如果希望處理器達到理想的使用率,那麼線程池的最優大小為:

線程池大小=NCPU *UCPU(1+W/C)

在Java中使用

擷取CPU的數量。

Executors的線程池如果不指定線程工廠會使用Executors中的DefaultThreadFactory,預設線程池工廠建立的線程都是非守護線程。

使用自定義的線程工廠可以做很多事情,比如可以跟蹤線程池在何時建立了多少線程,也可以自定義線程名稱和優先級。如果将

建立的線程都設定成守護線程,當主線程退出後,将會強制銷毀線程池。

下面這個例子,記錄了線程的建立,并将所有的線程設定成守護線程。

由淺入深了解Java線程池及線程池的如何使用
由淺入深了解Java線程池及線程池的如何使用

ThreadPoolExecutor是可以拓展的,它提供了幾個可以在子類中改寫的方法:beforeExecute,afterExecute和terimated。

在執行任務的線程中将調用beforeExecute和afterExecute,這些方法中還可以添加日志,計時,監視或統計收集的功能,

還可以用來輸出有用的調試資訊,幫助系統診斷故障。下面是一個擴充線程池的例子:

由淺入深了解Java線程池及線程池的如何使用
由淺入深了解Java線程池及線程池的如何使用

線程池的正确使用

以下阿裡編碼規範裡面說的一段話:

線程池不允許使用Executors去建立,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明确線程池的運作規則,規避資源耗盡的風險。 說明:Executors各個方法的弊端:

1)newFixedThreadPool和newSingleThreadExecutor:

  主要問題是堆積的請求處理隊列可能會耗費非常大的記憶體,甚至OOM。

2)newCachedThreadPool和newScheduledThreadPool:

  主要問題是線程數最大數是Integer.MAX_VALUE,可能會建立數量非常多的線程,甚至OOM。

1.任務獨立。如何任務依賴于其他任務,那麼可能産生死鎖。例如某個任務等待另一個任務的傳回值或執行結果,那麼除非線程池足夠大,否則将發生線程饑餓死鎖。

2.合理配置阻塞時間過長的任務。如果任務阻塞時間過長,那麼即使不出現死鎖,線程池的性能也會變得很糟糕。在Java并發包裡可阻塞方法都同時定義了限時方式和不限時方式。例如

Thread.join,BlockingQueue.put,CountDownLatch.await等,如果任務逾時,則辨別任務失敗,然後中止任務或者将任務放回隊列以便随後執行,這樣,無論任務的最終結果是否成功,這種辦法都能夠保證任務總能繼續執行下去。

3.設定合理的線程池大小。隻需要避免過大或者過小的情況即可,上文的公式線程池大小=NCPU *UCPU(1+W/C)。

4.選擇合适的阻塞隊列。newFixedThreadPool和newSingleThreadExecutor都使用了無界的阻塞隊列,無界阻塞隊列會有消耗很大的記憶體,如果使用了有界阻塞隊列,它會規避記憶體占用過大的問題,但是當任務填滿有界阻塞隊列,新的任務該怎麼辦?在使用有界隊列是,需要選擇合适的拒絕政策,隊列的大小和線程池的大小必須一起調節。對于非常大的或者無界的線程池,可以使用SynchronousQueue來避免任務排隊,以直接将任務從生産者送出到工作者線程。

下面是Thrift架構處理socket任務所使用的一個線程池,可以看一下FaceBook的工程師是如何自定義線程池的。

總結:

本文是作者在平時的工作學習中總結出來的,如果不足之處歡迎批評斧正。

參考資料

《實戰Java》高并發程式設計

《Java Concurrency in Practice》

Java線程池ThreadPoolExecutor使用和分析(二)

我的部落格即将入駐“雲栖社群”,誠邀技術同仁一同入駐。

個人部落格網站 http://www.janti.cn

繼續閱讀