java.util.concurrent.ThreadPoolExecutor類時線程池中最核心的一個類,是以如果要透徹的了解java中線程池,必須先了解這個類。下面看ThreadPoolExecutor類的具體實作源碼:
在ThreadPoolExecutor類中提供了四個構造方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<code>public</code> <code>class</code> <code>ThreadPoolExecutor </code><code>extends</code> <code>AbstractExecutorService {</code>
<code> </code><code>.....</code>
<code> </code><code>public</code> <code>ThreadPoolExecutor(</code><code>int</code> <code>corePoolSize,</code><code>int</code> <code>maximumPoolSize,</code><code>long</code> <code>keepAliveTime,TimeUnit unit,</code>
<code> </code><code>BlockingQueue<Runnable> workQueue);</code>
<code> </code>
<code> </code><code>BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);</code>
<code> </code><code>BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);</code>
<code> </code><code>BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);</code>
<code> </code><code>...</code>
<code>}</code>
從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,并提供了四個構造器,事實上,通過觀察每個構造器的源碼具體實作,發現前三個構造器都是調用第四個構造器進行的初始化工作。
下面解釋下構造器中各個參數的含義:
maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示線上程池中最多能建立多少個線程;
keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。 預設情況下,隻有線程池中的線程數大于corePoolSize時,keepaliveTime才會起作用,直到線程池中的線程數不大于corePoolSize,即當線程池中的線程數大于corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,線上程池中的線程數不大于corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
unit:參數keepAliveTime的時間機關,有7種取值,在TimeUnit類中有7種靜态屬性:
<code>TimeUnit.DAYS; </code><code>//天</code>
<code>TimeUnit.HOURS; </code><code>//小時</code>
<code>TimeUnit.MINUTES; </code><code>//分鐘</code>
<code>TimeUnit.SECONDS; </code><code>//秒</code>
<code>TimeUnit.MILLISECONDS; </code><code>//毫秒</code>
<code>TimeUnit.MICROSECONDS; </code><code>//微妙</code>
<code>TimeUnit.NANOSECONDS; </code><code>//納秒</code>
workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運作過程産生重大影響,一般來說,這裡的阻塞隊列有以下幾種選擇:
<code>ArrayBlocklingQueue</code>
<code>LinkedBlockingQueue</code>
<code>SynchronousQueue</code>
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous.線程池的排隊政策與blockingQueue有關。
threadFactory:線程工廠,主要用來建立線程;
handler:便是當拒絕處理任務時的政策,有以下四種取值:
<code>ThreadPoolExecutor.AbortPolicy:丢棄任務并抛出RejectedExecutionException異常。</code>
<code>ThreadPoolExecutor.DiscardPolicy:也是丢棄任務,但是不抛出異常。</code>
<code>ThreadPoolExecutor.DiscardOldestPolicy:丢棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)</code>
<code>ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務</code>
具體參數的配置與線程池的關系将在下一節講述。
從上面給出的ThreadPoolExecutor類的代碼可以知道,ThreadPoolExecutor內建了AbstractExecutorService,我們來看一下AbstractExecutorService的實作:
16
17
18
19
20
21
22
23
24
25
26
27
<code>public</code> <code>abstract</code> <code>class</code> <code>AbstractExecutorService </code><code>implements</code> <code>ExecutorService {</code>
<code> </code>
<code> </code><code>protected</code> <code><T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };</code>
<code> </code><code>protected</code> <code><T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };</code>
<code> </code><code>public</code> <code>Future<?> submit(Runnable task) {};</code>
<code> </code><code>public</code> <code><T> Future<T> submit(Runnable task, T result) { };</code>
<code> </code><code>public</code> <code><T> Future<T> submit(Callable<T> task) { };</code>
<code> </code><code>private</code> <code><T> T doInvokeAny(Collection<? </code><code>extends</code> <code>Callable<T>> tasks,</code>
<code> </code><code>boolean</code> <code>timed, </code><code>long</code> <code>nanos)</code>
<code> </code><code>throws</code> <code>InterruptedException, ExecutionException, TimeoutException {</code>
<code> </code><code>};</code>
<code> </code><code>public</code> <code><T> T invokeAny(Collection<? </code><code>extends</code> <code>Callable<T>> tasks)</code>
<code> </code><code>throws</code> <code>InterruptedException, ExecutionException {</code>
<code> </code><code>public</code> <code><T> T invokeAny(Collection<? </code><code>extends</code> <code>Callable<T>> tasks,</code>
<code> </code><code>long</code> <code>timeout, TimeUnit unit)</code>
<code> </code><code>public</code> <code><T> List<Future<T>> invokeAll(Collection<? </code><code>extends</code> <code>Callable<T>> tasks)</code>
<code> </code><code>throws</code> <code>InterruptedException {</code>
<code> </code><code>public</code> <code><T> List<Future<T>> invokeAll(Collection<? </code><code>extends</code> <code>Callable<T>> tasks,</code>
<code> </code><code>long</code> <code>timeout, TimeUnit unit)</code>
AbstractExecutorService是一個抽象類,它實作了ExecutorService接口。
我們接着看ExecutorService接口的實作:
<code>public</code> <code>interface</code> <code>ExecutorService </code><code>extends</code> <code>Executor {</code>
<code> </code><code>void</code> <code>shutdown();</code>
<code> </code><code>boolean</code> <code>isShutdown();</code>
<code> </code><code>boolean</code> <code>isTerminated();</code>
<code> </code><code>boolean</code> <code>awaitTermination(</code><code>long</code> <code>timeout, TimeUnit unit)</code>
<code> </code><code>throws</code> <code>InterruptedException;</code>
<code> </code><code><T> Future<T> submit(Callable<T> task);</code>
<code> </code><code><T> Future<T> submit(Runnable task, T result);</code>
<code> </code><code>Future<?> submit(Runnable task);</code>
<code> </code><code><T> List<Future<T>> invokeAll(Collection<? </code><code>extends</code> <code>Callable<T>> tasks)</code>
<code> </code><code><T> List<Future<T>> invokeAll(Collection<? </code><code>extends</code> <code>Callable<T>> tasks,</code>
<code> </code><code>long</code> <code>timeout, TimeUnit unit)</code>
<code> </code><code><T> T invokeAny(Collection<? </code><code>extends</code> <code>Callable<T>> tasks)</code>
<code> </code><code>throws</code> <code>InterruptedException, ExecutionException;</code>
<code> </code><code><T> T invokeAny(Collection<? </code><code>extends</code> <code>Callable<T>> tasks,</code>
<code> </code><code>long</code> <code>timeout, TimeUnit unit)</code>
<code> </code><code>throws</code> <code>InterruptedException, ExecutionException, TimeoutException;</code>
而ExecutorService又繼承了Executor接口,我們看一下Executor接口的實作:
<code>public</code> <code>interface</code> <code>Executor {</code>
<code> </code><code>void</code> <code>execute(Runnable command);</code>
到這裡,大家應該明白ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關系了。
Executor是一個頂層接口,在它裡面隻聲明了一個方法execute(Runnable),傳回值為void,參數為Runnable,從字面意思可以了解,就是用來執行傳進去的任務的。
然後ExecutorService接口:submit,invokeAll,invokeAny以及shutDown等;
抽象類AbstractExecutorService,基本實作了ExecutorService中聲明的所有方法;
然後ThreadPoolExecutor幾成了類AbstractExecutorService,有幾個重要的方法:
<code>execute()</code>
<code>submit()</code>
<code>shutdown()</code>
<code>shutdownNow()</code>
execute()方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實作,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向縣城吃送出一個任務,交又線程池去執行。
submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實作,在ThreadPoolExecutor中并沒有對其進行重寫,這個方法也是用來向線程池送出任務的,但是它和execute()方法不同,它能傳回任務執行的結果,去看submit()方法的實作,會發現它實際上還是調用的execute()方法,隻不過它利用了Future來擷取任務執行結果。
shutdown()和shutdownNow()是用來關閉線程池的。
還有比如;getQueue(),getPoolSize(),getActiveCount(),getCompletedTaskCount()等擷取與線程池相關屬性的方法。
在上一節我們從宏觀上介紹了ThreadPoolExecutor,下面我們來深入解析一下線程池的具體實作原理,将從下面幾個方面講解:
1.線程池狀态
2.任務的執行
3.線程池中的線程初始化
4.任務緩存隊列及排隊政策
5.任務拒絕政策
6.線程池的關閉
7.線程池容量的動态調整
<a></a>
在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀态:
<code>volatile</code> <code>int</code> <code>runState;</code>
<code>static</code> <code>final</code> <code>int</code> <code>RUNNING = </code><code>0</code><code>;</code>
<code>static</code> <code>final</code> <code>int</code> <code>SHUTDOWN = </code><code>1</code><code>;</code>
<code>static</code> <code>final</code> <code>int</code> <code>STOP = </code><code>2</code><code>;</code>
<code>static</code> <code>final</code> <code>int</code> <code>TERMINATED = </code><code>3</code><code>;</code>
runState表示目前線程池的狀态,它是一個volatile變量用來保證線程之間的可見性;
下面的幾個static final變量表示runState可能的幾個取值。
當建立線程池後,初始時,線程池處于RUNNING狀态;
如果調用了shutdown()方法,則線程池處于SHUDOWN狀态,此時線程池不能接受顯得任務,他會等待所有任務執行完畢;
如果調用了shutdownNoe()方法,則線程池處于STOP狀态,此時線程池不能夠接受新的任務,并且會嘗試終止正在執行的任務。
當線程池處于SHUTDOWN或STOP狀态,并且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束後,線程池被設定為TERMINATED狀态。
在了解任務送出給線程池到任務執行完畢整個過程之前,我們來看一下ThreadPoolExecutor類中其他的一些比較重要的成員變量:
<code>private</code> <code>final</code> <code>BlockingQueue<Runnable> workQueue; </code><code>//任務緩存隊列,用來存放等待執行的任務</code>
<code>private</code> <code>final</code> <code>ReentrantLock mainLock = </code><code>new</code> <code>ReentrantLock(); </code><code>//線程池的主要狀态鎖,對線程池狀态(比如線程池大小</code>
<code> </code><code>//、runState等)的改變都要使用這個鎖</code>
<code>private</code> <code>final</code> <code>HashSet<Worker> workers = </code><code>new</code> <code>HashSet<Worker>(); </code><code>//用來存放工作集</code>
<code>private</code> <code>volatile</code> <code>long</code> <code>keepAliveTime; </code><code>//線程存貨時間 </code>
<code>private</code> <code>volatile</code> <code>boolean</code> <code>allowCoreThreadTimeOut; </code><code>//是否允許為核心線程設定存活時間</code>
<code>private</code> <code>volatile</code> <code>int</code> <code>corePoolSize; </code><code>//核心池的大小(即線程池中的線程數目大于這個參數時,送出的任務會被放進任務緩存隊列)</code>
<code>private</code> <code>volatile</code> <code>int</code> <code>maximumPoolSize; </code><code>//線程池最大能容忍的線程數</code>
<code>private</code> <code>volatile</code> <code>int</code> <code>poolSize; </code><code>//線程池中目前的線程數</code>
<code>private</code> <code>volatile</code> <code>RejectedExecutionHandler handler; </code><code>//任務拒絕政策</code>
<code>private</code> <code>volatile</code> <code>ThreadFactory threadFactory; </code><code>//線程工廠,用來建立線程</code>
<code>private</code> <code>int</code> <code>largestPoolSize; </code><code>//用來記錄線程池中曾經出現過的最大線程數</code>
<code>private</code> <code>long</code> <code>completedTaskCount; </code><code>//用來記錄已經執行完畢的任務個數</code>
每個變量的作用都已經标明出來了,這裡要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。
corePoolSize在很多地方被翻譯成核心池大小,其實我的了解這個就是線程池的大小。舉個簡單的例子:加入有一個工廠,工廠裡面有10個勞工,每個勞工同時隻能做一件任務。是以隻要當10個勞工中有勞工是空閑的,來了任務就配置設定給空閑的勞工做;當10個勞工都有任務在做時,如果還來了任務,就把任務進行排隊等待;如果說新任務數目增長的速度遠遠大于勞工做任務的速度,那麼此時工廠主管可能會想補救措施,比如重新招4個臨時勞工進來;然後就将任務也配置設定給這4個臨時勞工做;如果說這14個勞工做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者抛棄前面的一些任務了。當這14個勞工當中有人空閑時,而新任務增長速度又比較緩慢,工廠主管就可能考慮辭掉4個臨時工了,隻保持原來的10個勞工,畢竟額外的勞工是要花錢的。
這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。
不過為了友善了解,在本文後面還是講corePoolSize翻譯成核心池大小。
largestPoolSize隻是一個用來起記錄作用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關系。
下面我們進入正題,看一下任務從送出到最終執行完畢經曆了哪些過程。
在ThreadPoolExecutor類中,最核心的任務送出方法是execute(),雖然通過submit也可以送出任務,但是實際上submit方法裡最終調用的還是execute()方法,是以我們隻需要研究execute()方法的實作原理即可:
我的jdk是1.7.80,源碼已經和作者的不一樣了,不過整體思路還是一樣。但還是不複制了,自己也還欠缺分析源碼的能力。
首先,要清楚corePoolSize和maximumPoolSize的含義;
其次,要知道Worker是用來幹嘛的;
要知道任務送出給線程池後的處理政策,這裡總結主要 有4點:
如果目前線程池中的線程數目小于corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;
如果目前線程池中的線程數目>=corePoolSize,則每來個任務,會嘗試将其添加到任務緩存隊列中,若添加成功,則該任務會等待空閑線程将其取出去執行;若添加失敗(一般來說是任務隊列已滿),則會嘗試建立新的線程去執行這個任務;
如果目前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕政策進行處理;
如果線程池中的線程數量大于corePoolSize,如果某線程空閑時間超過keepAlivetime,線程将被終止,直到線程池中的線程數目不大于corePoolSize;如果允許為核心池中的線程設定存活時間,那麼核心池中的線程空閑時間超過keepAliveTime就會終止。
預設情況下,建立線程池之後,線程池是沒有線程的,需要送出任務之後才能建立線程。
在實際中如果需要線程池建立之後立即建立線程,可以通過以下兩個辦法:
prestartCoreThread():初始化一個核心線程;
prestartAllCoreThread():初始化所有核心線程;
前面提到任務緩存隊列,即workQueue,它是用來存放等待執行的任務。
workQueue的類型為BlockingQueue<Runnable>,通常可以去下面三種類型:
ArrayBlockingQueue:基于數組的先進先出隊列,此隊列建立時必須制定大小;
LinkedBlockingQueue:基于連結清單的先進先出隊列,如果建立時沒有指定隊列大小,則預設為Integer.MAX_VALUE;
synchronousQueue:這個隊列比較特殊,它不會儲存送出的任務,而是将直接建立一個線程執行新來的任務。
當線程池的任務緩存隊列已滿并且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕政策,通常有以下四種政策:
ThreadPoolExecutor提供了兩個方法用于線程池的關閉,分别是shutdown(),shutdownNow(),其中
shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完才終止,但再也不會接收新的任務;
shutdownNow():立即終止線程池,并嘗試打斷正在執行的任務,并且清空緩存隊列,傳回尚未執行的任務
ThreadPoolExecutor提供了動态調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolsize():
setCorePoolSize:設定核心池大小
setMaximumPoolSize:設定線程池最大能建立的線程數目大小
當上述參數從小變大時,ThreadPoolExecutor進行線程指派,還可能立即建立新的線程來執行任務。
28
29
30
31
32
33
34
35
<code>/**</code>
<code> </code><code>* 學習線程池</code>
<code> </code><code>* Created by mrf on 2016/3/7.</code>
<code> </code><code>*/</code>
<code>public</code> <code>class</code> <code>TestThreadPool {</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) {</code>
<code> </code><code>ThreadPoolExecutor executor = </code><code>new</code> <code>ThreadPoolExecutor(</code><code>5</code><code>,</code><code>10</code><code>,</code><code>200</code><code>, TimeUnit.MILLISECONDS,</code><code>new</code> <code>ArrayBlockingQueue<Runnable>(</code><code>5</code><code>));</code>
<code> </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>0</code><code>; i < </code><code>15</code><code>; i++) {</code>
<code> </code><code>MyTask myTask = </code><code>new</code> <code>MyTask(i);</code>
<code> </code><code>executor.execute(myTask);</code>
<code> </code><code>System.out.println(</code><code>"線程池中線程數目:"</code><code>+executor.getPoolSize()+</code><code>",隊列中等待執行的任務數目:"</code>
<code> </code><code>+executor.getQueue().size()+</code><code>",已經執行完的任務數目:"</code><code>+executor.getCompletedTaskCount());</code>
<code> </code><code>}</code>
<code> </code><code>executor.shutdown();</code>
<code> </code><code>}</code>
<code>class</code> <code>MyTask </code><code>implements</code> <code>Runnable{</code>
<code> </code><code>private</code> <code>int</code> <code>taskNum;</code>
<code> </code><code>public</code> <code>MyTask(</code><code>int</code> <code>taskNum) {</code>
<code> </code><code>this</code><code>.taskNum = taskNum;</code>
<code> </code><code>@Override</code>
<code> </code><code>public</code> <code>void</code> <code>run() {</code>
<code> </code><code>System.out.println(</code><code>"正在執行task:"</code><code>+taskNum);</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>Thread.sleep(</code><code>4000</code><code>);</code>
<code> </code><code>} </code><code>catch</code> <code>(InterruptedException e) {</code>
<code> </code><code>e.printStackTrace();</code>
<code> </code><code>System.out.println(</code><code>"task"</code><code>+taskNum+</code><code>"執行完畢"</code><code>);</code>
結果:
<a href="http://www.cnblogs.com/woshimrf/p/5249753.html#" target="_blank">+ View Code</a>
從執行結果可以看出,當線程池中線程的數目大于5時,便将任務放入任務緩存隊列裡面,當任務緩存隊列滿了之後,便建立新的線程。如果上面程式中,将for循環設定為20個任務就會抛出拒絕異常了。
不過在java doc中,并不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜态方法來建立線程池:
<code>Executors.newCachedThreadPool(); </code><code>//建立一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE</code>
<code>Executors.newSingleThreadExecutor(); </code><code>//建立容量為1的緩沖池</code>
<code>Executors.newFixedThreadPool(</code><code>int</code><code>); </code><code>//建立固定容量大小的緩沖池</code>
這三種靜态方法的具體實作:
從它們的具體實作來看,它們實際上也是調用了ThreadPoolExecutor,隻不過參數都已配置好了。
newFixedThreadPool建立的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor将corePoolSize和maximumPoolSize都設定為1,也使用的LinkedBlockingQueue;
newCachedThreadPool将corePoolSize設定為0,将maximumPoolSize設定為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就建立線程運作,當線程空閑超過60秒,就銷毀線程。
實際中,如果Executors提供的三個靜态方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。
另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。
本節來讨論一個比較重要的話題:如何合理配置線程池大小,僅供參考。
一般需要根據任務的類型來配置線程池大小:
如果是CPU密集型任務,就需要盡量壓榨CPU,參考值可以設為 NCPU+1
如果是IO密集型任務,參考值可以設定為2*NCPU
當然,這隻是一個參考值,具體的設定還需要根據實際情況進行調整,比如可以先将線程池大小設定為參考值,再觀察任務運作情況和系統負載、資源使用率來進行适當調整。
本文轉自Ryan.Miao部落格園部落格,原文連結:http://www.cnblogs.com/woshimrf/p/5249753.html,如需轉載請自行聯系原作者