天天看點

深入了解Java線程池原理

轉載:http://www.cnblogs.com/dolphin0520/p/3932921.html

在前面的文章中,我們使用線程的時候就去建立一個線程,這樣實作起來非常簡便,但是就會有一個問題:

  如果并發的線程數量很多,并且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁建立線程就會大大降低系統的效率,因為頻繁建立線程和銷毀線程需要時間。

  那麼有沒有一種辦法使得線程可以複用,就是執行完一個任務,并不被銷毀,而是可以繼續執行其他的任務?

  在Java中可以通過線程池來達到這樣的效果。今天我們就來詳細講解一下Java的線程池,首先我們從最核心的ThreadPoolExecutor類中的方法講起,然後再講述它的實作原理,接着給出了它的使用示例,最後讨論了一下如何合理配置線程池的大小。

  以下是本文的目錄大綱:

  一.Java中的ThreadPoolExecutor類

  二.深入剖析線程池實作原理

  三.使用示例

  四.如何合理配置線程池的大小 

  若有不正之處請多多諒解,并歡迎批評指正。

 

一.Java中的ThreadPoolExecutor類

  java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,是以如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實作源碼。

  在ThreadPoolExecutor類中提供了四個構造方法:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

public

class

ThreadPoolExecutor 

extends

AbstractExecutorService {

.....

public

ThreadPoolExecutor(

int

corePoolSize,

int

maximumPoolSize,

long

keepAliveTime,TimeUnit unit,

BlockingQueue<Runnable> workQueue);

public

ThreadPoolExecutor(

int

corePoolSize,

int

maximumPoolSize,

long

keepAliveTime,TimeUnit unit,

BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

public

ThreadPoolExecutor(

int

corePoolSize,

int

maximumPoolSize,

long

keepAliveTime,TimeUnit unit,

BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

public

ThreadPoolExecutor(

int

corePoolSize,

int

maximumPoolSize,

long

keepAliveTime,TimeUnit unit,

BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

...

}

   從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,并提供了四個構造器,事實上,通過觀察每個構造器的源碼具體實作,發現前面三個構造器都是調用的第四個構造器進行的初始化工作。

   下面解釋下一下構造器中各個參數的含義:

  • corePoolSize:核心池的大小,這個參數跟後面講述的線程池的實作原理有非常大的關系。在建立了線程池後,預設情況下,線程池中并沒有任何線程,而是等待有任務到來才建立線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預建立線程的意思,即在沒有任務到來之前就建立corePoolSize個線程或者一個線程。預設情況下,在建立了線程池後,線程池中的線程數為0,當有任務來之後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;
  • maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示線上程池中最多能建立多少個線程;
  • keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。預設情況下,隻有當線程池中的線程數大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大于corePoolSize,即當線程池中的線程數大于corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,線上程池中的線程數不大于corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
  • unit:參數keepAliveTime的時間機關,有7種取值,在TimeUnit類中有7種靜态屬性:
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒      
  • workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運作過程産生重大影響,一般來說,這裡的阻塞隊列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
      

  ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊政策與BlockingQueue有關。

  • threadFactory:線程工廠,主要用來建立線程;
  • handler:表示當拒絕處理任務時的政策,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丢棄任務并抛出RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丢棄任務,但是不抛出異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務       

   具體參數的配置與線程池的關系将在下一節講述。

  從上面給出的ThreadPoolExecutor類的代碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實作:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27

public

abstract

class

AbstractExecutorService 

implements

ExecutorService {

protected

<T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };

protected

<T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };

public

Future<?> submit(Runnable task) {};

public

<T> Future<T> submit(Runnable task, T result) { };

public

<T> Future<T> submit(Callable<T> task) { };

private

<T> T doInvokeAny(Collection<? 

extends

Callable<T>> tasks,

boolean

timed, 

long

nanos)

throws

InterruptedException, ExecutionException, TimeoutException {

};

public

<T> T invokeAny(Collection<? 

extends

Callable<T>> tasks)

throws

InterruptedException, ExecutionException {

};

public

<T> T invokeAny(Collection<? 

extends

Callable<T>> tasks,

long

timeout, TimeUnit unit)

throws

InterruptedException, ExecutionException, TimeoutException {

};

public

<T> List<Future<T>> invokeAll(Collection<? 

extends

Callable<T>> tasks)

throws

InterruptedException {

};

public

<T> List<Future<T>> invokeAll(Collection<? 

extends

Callable<T>> tasks,

long

timeout, TimeUnit unit)

throws

InterruptedException {

};

}

   AbstractExecutorService是一個抽象類,它實作了ExecutorService接口。

  我們接着看ExecutorService接口的實作:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

public

interface

ExecutorService 

extends

Executor {

void

shutdown();

boolean

isShutdown();

boolean

isTerminated();

boolean

awaitTermination(

long

timeout, TimeUnit unit)

throws

InterruptedException;

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<? 

extends

Callable<T>> tasks)

throws

InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? 

extends

Callable<T>> tasks,

long

timeout, TimeUnit unit)

throws

InterruptedException;

<T> T invokeAny(Collection<? 

extends

Callable<T>> tasks)

throws

InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? 

extends

Callable<T>> tasks,

long

timeout, TimeUnit unit)

throws

InterruptedException, ExecutionException, TimeoutException;

}

   而ExecutorService又是繼承了Executor接口,我們看一下Executor接口的實作:

1 2 3

public

interface

Executor {

void

execute(Runnable command);

}

   到這裡,大家應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關系了。

  Executor是一個頂層接口,在它裡面隻聲明了一個方法execute(Runnable),傳回值為void,參數為Runnable類型,從字面意思可以了解,就是用來執行傳進去的任務的;

  然後ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象類AbstractExecutorService實作了ExecutorService接口,基本實作了ExecutorService中聲明的所有方法;

  然後ThreadPoolExecutor繼承了類AbstractExecutorService。

  在ThreadPoolExecutor類中有幾個非常重要的方法:

1 2 3 4

execute()

submit()

shutdown()

shutdownNow()

   execute()方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實作,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池送出一個任務,交由線程池去執行。

  submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實作,在ThreadPoolExecutor中并沒有對其進行重寫,這個方法也是用來向線程池送出任務的,但是它和execute()方法不同,它能夠傳回任務執行的結果,去看submit()方法的實作,會發現它實際上還是調用的execute()方法,隻不過它利用了Future來擷取任務執行結果(Future相關内容将在下一篇講述)。

  shutdown()和shutdownNow()是用來關閉線程池的。

  還有很多其他的方法:

  比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等擷取與線程池相關屬性的方法,有興趣的朋友可以自行查閱API。

二.深入剖析線程池實作原理

  在上一節我們從宏觀上介紹了ThreadPoolExecutor,下面我們來深入解析一下線程池的具體實作原理,将從下面幾個方面講解:

  1.線程池狀态

  2.任務的執行

  3.線程池中的線程初始化

  4.任務緩存隊列及排隊政策

  5.任務拒絕政策

  6.線程池的關閉

  7.線程池容量的動态調整

1.線程池狀态

  在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀态:

1 2 3 4 5

volatile

int

runState;

static

final

int

RUNNING    = 

;

static

final

int

SHUTDOWN   = 

1

;

static

final

int

STOP       = 

2

;

static

final

int

TERMINATED = 

3

;

   runState表示目前線程池的狀态,它是一個volatile變量用來保證線程之間的可見性;

  下面的幾個static final變量表示runState可能的幾個取值。

  當建立線程池後,初始時,線程池處于RUNNING狀态;

  如果調用了shutdown()方法,則線程池處于SHUTDOWN狀态,此時線程池不能夠接受新的任務,它會等待所有任務執行完畢;

  如果調用了shutdownNow()方法,則線程池處于STOP狀态,此時線程池不能接受新的任務,并且會去嘗試終止正在執行的任務;

  當線程池處于SHUTDOWN或STOP狀态,并且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束後,線程池被設定為TERMINATED狀态。

2.任務的執行

  在了解将任務送出給線程池到任務執行完畢整個過程之前,我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

private

final

BlockingQueue<Runnable> workQueue;              

//任務緩存隊列,用來存放等待執行的任務

private

final

ReentrantLock mainLock = 

new

ReentrantLock();   

//線程池的主要狀态鎖,對線程池狀态(比如線程池大小

//、runState等)的改變都要使用這個鎖

private

final

HashSet<Worker> workers = 

new

HashSet<Worker>();  

//用來存放工作集

private

volatile

long

keepAliveTime;    

//線程存貨時間   

private

volatile

boolean

allowCoreThreadTimeOut;   

//是否允許為核心線程設定存活時間

private

volatile

int

corePoolSize;     

//核心池的大小(即線程池中的線程數目大于這個參數時,送出的任務會被放進任務緩存隊列)

private

volatile

int

maximumPoolSize;   

//線程池最大能容忍的線程數

private

volatile

int

poolSize;       

//線程池中目前的線程數

private

volatile

RejectedExecutionHandler handler; 

//任務拒絕政策

private

volatile

ThreadFactory threadFactory;   

//線程工廠,用來建立線程

private

int

largestPoolSize;   

//用來記錄線程池中曾經出現過的最大線程數

private

long

completedTaskCount;   

//用來記錄已經執行完畢的任務個數

   每個變量的作用都已經标明出來了,這裡要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。

  corePoolSize在很多地方被翻譯成核心池大小,其實我的了解這個就是線程池的大小。舉個簡單的例子:

  假如有一個工廠,工廠裡面有10個勞工,每個勞工同時隻能做一件任務。

  是以隻要當10個勞工中有勞工是空閑的,來了任務就配置設定給空閑的勞工做;

  當10個勞工都有任務在做時,如果還來了任務,就把任務進行排隊等待;

  如果說新任務數目增長的速度遠遠大于勞工做任務的速度,那麼此時工廠主管可能會想補救措施,比如重新招4個臨時勞工進來;

  然後就将任務也配置設定給這4個臨時勞工做;

  如果說着14個勞工做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者抛棄前面的一些任務了。

  當這14個勞工當中有人空閑時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,隻保持原來的10個勞工,畢竟請額外的勞工是要花錢的。

  這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

  也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。

  不過為了友善了解,在本文後面還是将corePoolSize翻譯成核心池大小。

  largestPoolSize隻是一個用來起記錄作用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關系。

  下面我們進入正題,看一下任務從送出到最終執行完畢經曆了哪些過程。

  在ThreadPoolExecutor類中,最核心的任務送出方法是execute()方法,雖然通過submit也可以送出任務,但是實際上submit方法裡面最終調用的還是execute()方法,是以我們隻需要研究execute()方法的實作原理即可:

1 2 3 4 5 6 7 8 9 10 11 12

public

void

execute(Runnable command) {

if

(command == 

null

)

throw

new

NullPointerException();

if

(poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

if

(runState == RUNNING && workQueue.offer(command)) {

if

(runState != RUNNING || poolSize == 

)

ensureQueuedTaskHandled(command);

}

else

if

(!addIfUnderMaximumPoolSize(command))

reject(command); 

// is shutdown or saturated

}

}

   上面的代碼可能看起來不是那麼容易了解,下面我們一句一句解釋:

  首先,判斷送出的任務command是否為null,若是null,則抛出空指針異常;

  接着是這句,這句要好好了解一下:

1

if

(poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

   由于是或條件運算符,是以先計算前半部分的值,如果線程池中目前線程數不小于核心池大小,那麼就會直接進入下面的if語句塊了。

  如果線程池中目前線程數小于核心池大小,則接着執行後半部分,也就是執行

1

addIfUnderCorePoolSize(command)

  如果執行完addIfUnderCorePoolSize這個方法傳回false,則繼續執行下面的if語句塊,否則整個方法就直接執行完畢了。

  如果執行完addIfUnderCorePoolSize這個方法傳回false,然後接着判斷:

1

if

(runState == RUNNING && workQueue.offer(command))

   如果目前線程池處于RUNNING狀态,則将任務放入任務緩存隊列;如果目前線程池不處于RUNNING狀态或者任務放入緩存隊列失敗,則執行:

1

addIfUnderMaximumPoolSize(command)

  如果執行addIfUnderMaximumPoolSize方法失敗,則執行reject()方法進行任務拒絕處理。

  回到前面:

1

if

(runState == RUNNING && workQueue.offer(command))

   這句的執行,如果說目前線程池處于RUNNING狀态且将任務放入任務緩存隊列成功,則繼續進行判斷:

1

if

(runState != RUNNING || poolSize == 

)

   這句判斷是為了防止在将此任務添加進任務緩存隊列的同時其他線程突然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。如果是這樣就執行:

1

ensureQueuedTaskHandled(command)

   進行應急處理,從名字可以看出是保證 添加到任務緩存隊列中的任務得到處理。

  我們接着看2個關鍵方法的實作:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

private

boolean

addIfUnderCorePoolSize(Runnable firstTask) {

Thread t = 

null

;

final

ReentrantLock mainLock = 

this

.mainLock;

mainLock.lock();

try

{

if

(poolSize < corePoolSize && runState == RUNNING)

t = addThread(firstTask);        

//建立線程去執行firstTask任務   

finally

{

mainLock.unlock();

}

if

(t == 

null

)

return

false

;

t.start();

return

true

;

}

   這個是addIfUnderCorePoolSize方法的具體實作,從名字可以看出它的意圖就是當低于核心吃大小時執行的方法。下面看其具體實作,首先擷取到鎖,因為這地方涉及到線程池狀态的變化,先通過if語句判斷目前線程池中的線程數目是否小于核心池大小,有朋友也許會有疑問:前面在execute()方法中不是已經判斷過了嗎,隻有線程池目前線程數目小于核心池大小才會執行addIfUnderCorePoolSize方法的,為何這地方還要繼續判斷?原因很簡單,前面的判斷過程中并沒有加鎖,是以可能在execute方法判斷的時候poolSize小于corePoolSize,而判斷完之後,在其他線程中又向線程池送出了任務,就可能導緻poolSize不小于corePoolSize了,是以需要在這個地方繼續判斷。然後接着判斷線程池的狀态是否為RUNNING,原因也很簡單,因為有可能在其他線程中調用了shutdown或者shutdownNow方法。然後就是執行

1

t = addThread(firstTask);

   這個方法也非常關鍵,傳進去的參數為送出的任務,傳回值為Thread類型。然後接着在下面判斷t是否為空,為空則表明建立線程失敗(即poolSize>=corePoolSize或者runState不等于RUNNING),否則調用t.start()方法啟動線程。

  我們來看一下addThread方法的實作:

1 2 3 4 5 6 7 8 9 10 11 12

private

Thread addThread(Runnable firstTask) {

Worker w = 

new

Worker(firstTask);

Thread t = threadFactory.newThread(w);  

//建立一個線程,執行任務   

if

(t != 

null

) {

w.thread = t;            

//将建立的線程的引用指派為w的成員變量       

workers.add(w);

int

nt = ++poolSize;     

//目前線程數加1       

if

(nt > largestPoolSize)

largestPoolSize = nt;

}

return

t;

}

   在addThread方法中,首先用送出的任務建立了一個Worker對象,然後調用線程工廠threadFactory建立了一個新的線程t,然後将線程t的引用指派給了Worker對象的成員變量thread,接着通過workers.add(w)将Worker對象添加到工作集當中。

  下面我們看一下Worker類的實作:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64

private

final

class

Worker 

implements

Runnable {

private

final

ReentrantLock runLock = 

new

ReentrantLock();

private

Runnable firstTask;

volatile

long

completedTasks;

Thread thread;

Worker(Runnable firstTask) {

this

.firstTask = firstTask;

}

boolean

isActive() {

return

runLock.isLocked();

}

void

interruptIfIdle() {

final

ReentrantLock runLock = 

this

.runLock;

if

(runLock.tryLock()) {

try

{

if

(thread != Thread.currentThread())

thread.interrupt();

finally

{

runLock.unlock();

}

}

}

void

interruptNow() {

thread.interrupt();

}

private

void

runTask(Runnable task) {

final

ReentrantLock runLock = 

this

.runLock;

runLock.lock();

try

{

if

(runState < STOP &&

Thread.interrupted() &&

runState >= STOP)

boolean

ran = 

false

;

beforeExecute(thread, task);   

//beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實作,使用者可以根據

//自己需要重載這個方法和後面的afterExecute方法來進行一些統計資訊,比如某個任務的執行時間等           

try

{

task.run();

ran = 

true

;

afterExecute(task, 

null

);

++completedTasks;

catch

(RuntimeException ex) {

if

(!ran)

afterExecute(task, ex);

throw

ex;

}

finally

{

runLock.unlock();

}

}

public

void

run() {

try

{

Runnable task = firstTask;

firstTask = 

null

;

while

(task != 

null

|| (task = getTask()) != 

null

) {

runTask(task);

task = 

null

;

}

finally

{

workerDone(

this

);   

//當任務隊列中沒有任務時,進行清理工作       

}

}

}

   它實際上實作了Runnable接口,是以上面的Thread t = threadFactory.newThread(w);效果跟下面這句的效果基本一樣:

1

Thread t = 

new

Thread(w);

   相當于傳進去了一個Runnable任務,線上程t中執行這個Runnable。

  既然Worker實作了Runnable接口,那麼自然最核心的方法便是run()方法了:

1 2 3 4 5 6 7 8 9 10 11 12

public

void

run() {

try

{

Runnable task = firstTask;

firstTask = 

null

;

while

(task != 

null

|| (task = getTask()) != 

null

) {

runTask(task);

task = 

null

;

}

finally

{

workerDone(

this

);

}

}

   從run方法的實作可以看出,它首先執行的是通過構造器傳進來的任務firstTask,在調用runTask()執行完firstTask之後,在while循環裡面不斷通過getTask()去取新的任務來執行,那麼去哪裡取呢?自然是從任務緩存隊列裡面去取,getTask是ThreadPoolExecutor類中的方法,并不是Worker類中的方法,下面是getTask方法的實作:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27

Runnable getTask() {

for

(;;) {

try

{

int

state = runState;

if

(state > SHUTDOWN)

return

null

;

Runnable r;

if

(state == SHUTDOWN)  

// Help drain queue

r = workQueue.poll();

else

if

(poolSize > corePoolSize || allowCoreThreadTimeOut) 

//如果線程數大于核心池大小或者允許為核心池線程設定空閑時間,

//則通過poll取任務,若等待一定的時間取不到任務,則傳回null

r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);

else

r = workQueue.take();

if

(r != 

null

)

return

r;

if

(workerCanExit()) {    

//如果沒取到任務,即r為null,則判斷目前的worker是否可以退出

if

(runState >= SHUTDOWN) 

// Wake up others

interruptIdleWorkers();   

//中斷處于空閑狀态的worker

return

null

;

}

// Else retry

catch

(InterruptedException ie) {

// On interruption, re-check runState

}

}

}

   在getTask中,先判斷目前線程池狀态,如果runState大于SHUTDOWN(即為STOP或者TERMINATED),則直接傳回null。

  如果runState為SHUTDOWN或者RUNNING,則從任務緩存隊列取任務。

  如果目前線程池的線程數大于核心池大小corePoolSize或者允許為核心池中的線程設定空閑存活時間,則調用poll(time,timeUnit)來取任務,這個方法會等待一定的時間,如果取不到任務就傳回null。

  然後判斷取到的任務r是否為null,為null則通過調用workerCanExit()方法來判斷目前worker是否可以退出,我們看一下workerCanExit()的實作:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16

private

boolean

workerCanExit() {

final

ReentrantLock mainLock = 

this

.mainLock;

mainLock.lock();

boolean

canExit;

//如果runState大于等于STOP,或者任務緩存隊列為空了

//或者  允許為核心池線程設定空閑存活時間并且線程池中的線程數目大于1

try

{

canExit = runState >= STOP ||

workQueue.isEmpty() ||

(allowCoreThreadTimeOut &&

poolSize > Math.max(

1

, corePoolSize));

finally

{

mainLock.unlock();

}

return

canExit;

}

   也就是說如果線程池處于STOP狀态、或者任務隊列已為空或者允許為核心池線程設定空閑存活時間并且線程數大于1時,允許worker退出。如果允許worker退出,則調用interruptIdleWorkers()中斷處于空閑狀态的worker,我們看一下interruptIdleWorkers()的實作:

1 2 3 4 5 6 7 8 9 10

void

interruptIdleWorkers() {

final

ReentrantLock mainLock = 

this

.mainLock;

mainLock.lock();

try

{

for

(Worker w : workers)  

//實際上調用的是worker的interruptIfIdle()方法

w.interruptIfIdle();

finally

{

mainLock.unlock();

}

}

   從實作可以看出,它實際上調用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:

1 2 3 4 5 6 7 8 9 10 11 12

void

interruptIfIdle() {

final

ReentrantLock runLock = 

this

.runLock;

if

(runLock.tryLock()) {    

//注意這裡,是調用tryLock()來擷取鎖的,因為如果目前worker正在執行任務,鎖已經被擷取了,是無法擷取到鎖的

//如果成功擷取了鎖,說明目前worker處于空閑狀态

try

{

if

(thread != Thread.currentThread())  

thread.interrupt();

finally

{

runLock.unlock();

}

}

}

    這裡有一個非常巧妙的設計方式,假如我們來設計線程池,可能會有一個任務分派線程,當發現有線程空閑時,就從任務緩存隊列中取一個任務交給空閑線程執行。但是在這裡,并沒有采用這樣的方式,因為這樣會要額外地對任務分派線程進行管理,無形地會增加難度和複雜度,這裡直接讓執行完任務的線程去任務緩存隊列裡面取任務來執行。

   我們再看addIfUnderMaximumPoolSize方法的實作,這個方法的實作思想和addIfUnderCorePoolSize方法的實作思想非常相似,唯一的差別在于addIfUnderMaximumPoolSize方法是線上程池中的線程數達到了核心池大小并且往任務隊列中添加任務失敗的情況下執行的:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

private

boolean

addIfUnderMaximumPoolSize(Runnable firstTask) {

Thread t = 

null

;

final

ReentrantLock mainLock = 

this

.mainLock;

mainLock.lock();

try

{

if

(poolSize < maximumPoolSize && runState == RUNNING)

t = addThread(firstTask);

finally

{

mainLock.unlock();

}

if

(t == 

null

)

return

false

;

t.start();

return

true

;

}

   看到沒有,其實它和addIfUnderCorePoolSize方法的實作基本一模一樣,隻是if語句判斷條件中的poolSize < maximumPoolSize不同而已。

  到這裡,大部分朋友應該對任務送出給線程池之後到被執行的整個過程有了一個基本的了解,下面總結一下:

  1)首先,要清楚corePoolSize和maximumPoolSize的含義;

  2)其次,要知道Worker是用來起到什麼作用的;

  3)要知道任務送出給線程池之後的處理政策,這裡總結一下主要有4點:

  • 如果目前線程池中的線程數目小于corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;
  • 如果目前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試将其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程将其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;
  • 如果目前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕政策進行處理;
  • 如果線程池中的線程數量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程将被終止,直至線程池中的線程數目不大于corePoolSize;如果允許為核心池中的線程設定存活時間,那麼核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。

3.線程池中的線程初始化

  預設情況下,建立線程池之後,線程池中是沒有線程的,需要送出任務之後才會建立線程。

  在實際中如果需要線程池建立之後立即建立線程,可以通過以下兩個方法辦到:

  • prestartCoreThread():初始化一個核心線程;
  • prestartAllCoreThreads():初始化所有核心線程

  下面是這2個方法的實作:

1 2 3 4 5 6 7 8 9 10

public

boolean

prestartCoreThread() {

return

addIfUnderCorePoolSize(

null

); 

//注意傳進去的參數是null

}

public

int

prestartAllCoreThreads() {

int

n = 

;

while

(addIfUnderCorePoolSize(

null

))

//注意傳進去的參數是null

++n;

return

n;

}

   注意上面傳進去的參數是null,根據第2小節的分析可知如果傳進去的參數為null,則最後執行線程會阻塞在getTask方法中的

1

r = workQueue.take();

   即等待任務隊列中有任務。

4.任務緩存隊列及排隊政策

  在前面我們多次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。

  workQueue的類型為BlockingQueue<Runnable>,通常可以取下面三種類型:

  1)ArrayBlockingQueue:基于數組的先進先出隊列,此隊列建立時必須指定大小;

  2)LinkedBlockingQueue:基于連結清單的先進先出隊列,如果建立時沒有指定此隊列大小,則預設為Integer.MAX_VALUE;

  3)synchronousQueue:這個隊列比較特殊,它不會儲存送出的任務,而是将直接建立一個線程來執行新來的任務。

5.任務拒絕政策

  當線程池的任務緩存隊列已滿并且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕政策,通常有以下四種政策:

1 2 3 4

ThreadPoolExecutor.AbortPolicy:丢棄任務并抛出RejectedExecutionException異常。

ThreadPoolExecutor.DiscardPolicy:也是丢棄任務,但是不抛出異常。

ThreadPoolExecutor.DiscardOldestPolicy:丢棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)

ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

6.線程池的關閉

  ThreadPoolExecutor提供了兩個方法,用于線程池的關閉,分别是shutdown()和shutdownNow(),其中:

  • shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完後才終止,但再也不會接受新的任務
  • shutdownNow():立即終止線程池,并嘗試打斷正在執行的任務,并且清空任務緩存隊列,傳回尚未執行的任務

7.線程池容量的動态調整

  ThreadPoolExecutor提供了動态調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:設定核心池大小
  • setMaximumPoolSize:設定線程池最大能建立的線程數目大小

  當上述參數從小變大時,ThreadPoolExecutor進行線程指派,還可能立即建立新的線程來執行任務。

三.使用示例

  前面我們讨論了關于線程池的實作原理,這一節我們來看一下它的具體使用:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34

public

class

Test {

public

static

void

main(String[] args) {   

ThreadPoolExecutor executor = 

new

ThreadPoolExecutor(

5

10

200

, TimeUnit.MILLISECONDS,

new

ArrayBlockingQueue<Runnable>(

5

));

for

(

int

i=

;i<

15

;i++){

MyTask myTask = 

new

MyTask(i);

executor.execute(myTask);

System.out.println(

"線程池中線程數目:"

+executor.getPoolSize()+

",隊列中等待執行的任務數目:"

+

executor.getQueue().size()+

",已執行玩别的任務數目:"

+executor.getCompletedTaskCount());

}

executor.shutdown();

}

}

class

MyTask 

implements

Runnable {

private

int

taskNum;

public

MyTask(

int

num) {

this

.taskNum = num;

}

@Override

public

void

run() {

System.out.println(

"正在執行task "

+taskNum);

try

{

Thread.currentThread().sleep(

4000

);

catch

(InterruptedException e) {

e.printStackTrace();

}

System.out.println(

"task "

+taskNum+

"執行完畢"

);

}

}

   執行結果:

正在執行task 0
線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行玩别的任務數目:0
線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行玩别的任務數目:0
正在執行task 1
線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行玩别的任務數目:0
正在執行task 2
線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行玩别的任務數目:0
正在執行task 3
線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行玩别的任務數目:0
正在執行task 4
線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行玩别的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行玩别的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行玩别的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行玩别的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行玩别的任務數目:0
線程池中線程數目:6,隊列中等待執行的任務數目:5,已執行玩别的任務數目:0
正在執行task 10
線程池中線程數目:7,隊列中等待執行的任務數目:5,已執行玩别的任務數目:0
正在執行task 11
線程池中線程數目:8,隊列中等待執行的任務數目:5,已執行玩别的任務數目:0
正在執行task 12
線程池中線程數目:9,隊列中等待執行的任務數目:5,已執行玩别的任務數目:0
正在執行task 13
線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行玩别的任務數目:0
正在執行task 14
task 3執行完畢
task 0執行完畢
task 2執行完畢
task 1執行完畢
正在執行task 8
正在執行task 7
正在執行task 6
正在執行task 5
task 4執行完畢
task 10執行完畢
task 11執行完畢
task 13執行完畢
task 12執行完畢
正在執行task 9
task 14執行完畢
task 8執行完畢
task 5執行完畢
task 7執行完畢
task 6執行完畢
task 9執行完畢      

  從執行結果可以看出,當線程池中線程的數目大于5時,便将任務放入任務緩存隊列裡面,當任務緩存隊列滿了之後,便建立新的線程。如果上面程式中,将for循環中改成執行20個任務,就會抛出任務拒絕異常了。

  不過在java doc中,并不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜态方法來建立線程池:

1 2 3

Executors.newCachedThreadPool();        

//建立一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE

Executors.newSingleThreadExecutor();   

//建立容量為1的緩沖池

Executors.newFixedThreadPool(

int

);    

//建立固定容量大小的緩沖池

   下面是這三個靜态方法的具體實作;

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16

public

static

ExecutorService newFixedThreadPool(

int

nThreads) {

return

new

ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new

LinkedBlockingQueue<Runnable>());

}

public

static

ExecutorService newSingleThreadExecutor() {

return

new

FinalizableDelegatedExecutorService

(

new

ThreadPoolExecutor(

1

1

,

0L, TimeUnit.MILLISECONDS,

new

LinkedBlockingQueue<Runnable>()));

}

public

static

ExecutorService newCachedThreadPool() {

return

new

ThreadPoolExecutor(

, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new

SynchronousQueue<Runnable>());

}

  從它們的具體實作來看,它們實際上也是調用了ThreadPoolExecutor,隻不過參數都已配置好了。

  newFixedThreadPool建立的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

  newSingleThreadExecutor将corePoolSize和maximumPoolSize都設定為1,也使用的LinkedBlockingQueue;

  newCachedThreadPool将corePoolSize設定為0,将maximumPoolSize設定為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就建立線程運作,當線程空閑超過60秒,就銷毀線程。

  實際中,如果Executors提供的三個靜态方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。

  另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。

四.如何合理配置線程池的大小

  本節來讨論一個比較重要的話題:如何合理配置線程池大小,僅供參考。

  一般需要根據任務的類型來配置線程池大小:

  如果是CPU密集型任務,就需要盡量壓榨CPU,參考值可以設為 NCPU+1

  如果是IO密集型任務,參考值可以設定為2*NCPU

  當然,這隻是一個參考值,具體的設定還需要根據實際情況進行調整,比如可以先将線程池大小設定為參考值,再觀察任務運作情況和系統負載、資源使用率來進行适當調整。

  參考資料:

  http://ifeve.com/java-threadpool/

  http://blog.163.com/among_1985/blog/static/275005232012618849266/

  http://developer.51cto.com/art/201203/321885.htm

  http://blog.csdn.net/java2000_wl/article/details/22097059

  http://blog.csdn.net/cutesource/article/details/6061229

  http://blog.csdn.net/xieyuooo/article/details/8718741

繼續閱讀