我相信大家都看過很多的關于線程池的文章,基本上也是面試的時候必問的,如果你在看過很多文章以後,還是一知半解的,那希望這篇文章能讓你真正的掌握好 Java 線程池。
本文一大重點是源碼解析,同時會有少量篇幅介紹線程池設計思想以及作者 Doug Lea 實作過程中的一些巧妙用法。本文還是會一行行關鍵代碼進行分析,目的是為了讓那些自己看源碼不是很了解的同學可以得到參考。
線程池是非常重要的工具,如果你要成為一個好的工程師,還是得比較好地掌握這個知識,很多線上問題都是因為沒有用好線程池導緻的。即使你為了謀生,也要知道,這基本上是面試必問的題目,而且面試官很容易從被面試者的回答中捕捉到被面試者的技術水準。
本文略長,建議在 pc 上閱讀,邊看文章邊翻源碼(Java7 和 Java8 都一樣),建議想好好看的讀者抽出至少 30 分鐘的整塊時間來閱讀。當然,如果讀者僅為面試準備,可以直接滑到最後的總結部分。
總覽
下圖是 java 線程池幾個相關類的繼承結構:
先簡單說說這個繼承結構,Executor 位于最頂層,也是最簡單的,就一個 execute(Runnable runnable) 接口方法定義。
ExecutorService 也是接口,在 Executor 接口的基礎上添加了很多的接口方法,是以一般來說我們會使用這個接口。
然後再下來一層是 AbstractExecutorService,從名字我們就知道,這是抽象類,這裡實作了非常有用的一些方法供子類直接使用,之後我們再細說。
然後才到我們的重點部分 ThreadPoolExecutor 類,這個類提供了關于線程池所需的非常豐富的功能。
另外,我們還涉及到下圖中的這些類:
同在并發包中的 Executors 類,類名中帶字母 s,我們猜到這個是工具類,裡面的方法都是靜态方法,如以下我們最常用的用于生成 ThreadPoolExecutor 的執行個體的一些方法:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
另外,由于線程池支援擷取線程執行的結果,是以,引入了 Future 接口,RunnableFuture 繼承自此接口,然後我們最需要關心的就是它的實作類 FutureTask。到這裡,記住這個概念,線上程池的使用過程中,我們是往線程池送出任務(task),使用過線程池的都知道,我們送出的每個任務是實作了 Runnable 接口的,其實就是先将 Runnable 的任務包裝成 FutureTask,然後再送出到線程池。這樣,讀者才能比較容易記住 FutureTask 這個類名:它首先是一個任務(Task),然後具有 Future 接口的語義,即可以在将來(Future)得到執行的結果。
當然,線程池中的 BlockingQueue 也是非常重要的概念,如果線程數達到 corePoolSize,我們的每個任務會送出到等待隊列中,等待線程池中的線程領取任務并執行。這裡的 BlockingQueue 通常我們使用其實作類 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue,每個實作類都有不同的特征,使用場景之後會慢慢分析。想要詳細了解各個 BlockingQueue 的讀者,可以參考我的前面的一篇對 BlockingQueue 的各個實作類進行詳細分析的文章。
把事情說完整:除了上面說的這些類外,還有一個很重要的類,就是定時任務實作類 ScheduledThreadPoolExecutor,它繼承自本文要重點講解的 ThreadPoolExecutor,用于實作定時執行。不過本文不會介紹它的實作,我相信讀者看完本文後可以比較容易地看懂它的源碼。
以上就是本文要介紹的知識,廢話不多說,開始進入正文。
Executor 接口
/*
* @since 1.5
* @author Doug Lea
*/
public interface Executor {
void execute(Runnable command);
}
我們可以看到 Executor 接口非常簡單,就一個 void execute(Runnable command) 方法,代表送出一個任務。為了讓大家了解 java 線程池的整個設計方案,我會按照 Doug Lea 的設計思路來多說一些相關的東西。
我們經常這樣啟動一個線程:
new Thread(new Runnable(){
// do something
}).start();
用了線程池 Executor 後就可以像下面這麼使用:
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
如果我們希望線程池同步執行每一個任務,我們可以這麼實作這個接口:
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();// 這裡不是用的new Thread(r).start(),也就是說沒有啟動任何一個新的線程。
}
}
我們希望每個任務送出進來後,直接啟動一個新的線程來執行這個任務,我們可以這麼實作:
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start(); // 每個任務都用一個新的線程來執行
}
}
我們再來看下怎麼組合兩個 Executor 來使用,下面這個實作是将所有的任務都加到一個 queue 中,然後從 queue 中取任務,交給真正的執行器執行,這裡采用 synchronized 進行并發控制:
class SerialExecutor implements Executor {
// 任務隊列
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
// 這個才是真正的執行器
final Executor executor;
// 目前正在執行的任務
Runnable active;
// 初始化的時候,指定執行器
SerialExecutor(Executor executor) {
this.executor = executor;
}
// 添加任務到線程池: 将任務添加到任務隊列,scheduleNext 觸發執行器去任務隊列取任務
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
// tasks.poll()傳回隊列第一個元素,并在隊列中删除,即元素出隊列,如果隊列為空,poll() 方法在用空集合調用時傳回 null。
if ((active = tasks.poll()) != null) {
// 具體的執行轉給真正的執行器 executor
executor.execute(active);
}
}
}
當然了,Executor 這個接口隻有送出任務的功能,太簡單了,我們想要更豐富的功能,比如我們想知道執行結果、我們想知道目前線程池有多少個線程活着、已經完成了多少任務等等,這些都是這個接口的不足的地方。接下來我們要介紹的是繼承自 Executor 接口的 ExecutorService 接口,這個接口提供了比較豐富的功能,也是我們最常使用到的接口。
ExecutorService
一般我們定義一個線程池的時候,往往都是使用這個接口:
ExecutorService executor = Executors.newFixedThreadPool(args...);
ExecutorService executor = Executors.newCachedThreadPool(args...);
因為這個接口中定義的一系列方法大部分情況下已經可以滿足我們的需要了。
那麼我們簡單初略地來看一下這個接口中都有哪些方法:
public interface ExecutorService extends Executor {
// 關閉線程池,已送出的任務繼續執行,不接受繼續送出新任務
void shutdown();
// 關閉線程池,嘗試停止正在執行的所有任務,不接受繼續送出新任務
// 它和前面的方法相比,加了一個單詞“now”,差別在于它會去停止目前正在進行的任務
List<Runnable> shutdownNow();
// 線程池是否已關閉
boolean isShutdown();
// 如果調用了 shutdown() 或 shutdownNow() 方法後,所有任務結束了,那麼傳回true
// 這個方法必須在調用shutdown或shutdownNow方法之後調用才會傳回true
boolean isTerminated();
// 等待所有任務完成,并設定逾時時間
// 我們這麼了解,實際應用中是,先調用 shutdown 或 shutdownNow,
// 然後再調這個方法等待所有的線程真正地完成,傳回值意味着有沒有逾時
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 送出一個 Callable 任務
<T> Future<T> submit(Callable<T> task);
// 送出一個 Runnable 任務,第二個參數将會放到 Future 中,作為傳回值,
// 因為 Runnable 的 run 方法本身并不傳回任何東西
<T> Future<T> submit(Runnable task, T result);
// 送出一個 Runnable 任務
Future<?> submit(Runnable task);
// 執行所有任務,傳回 Future 類型的一個 list
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 也是執行所有任務,但是這裡設定了逾時時間
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 隻要其中的一個任務結束了,就可以傳回,傳回執行完的那個任務的結果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 同上一個方法,隻要其中的一個任務結束了,就可以傳回,傳回執行完的那個任務的結果,
// 不過這個帶逾時,超過指定的時間,抛出 TimeoutException 異常
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
這些方法都很好了解,一個簡單的線程池主要就是這些功能,能送出任務,能擷取結果,能關閉線程池,這也是為什麼我們經常用這個接口的原因。
FutureTask
在繼續往下層介紹 ExecutorService 的實作類之前,我們先來說說相關的類 FutureTask。
接口Future <- 接口RunnableFuture <- 類FutureTask
接口Runnable <- 接口RunnableFuture <- 類FutureTask
類FutureTask 通過 接口RunnableFuture 間接實作了 接口Runnable ,
是以每個 Runnable 通常都先包裝成 FutureTask,
然後調用 executor.execute(Runnable command) 将其送出給線程池
我們知道,Runnable 的 void run() 方法是沒有傳回值的,是以,通常,如果我們需要的話,會在 submit 中指定第二個參數作為傳回值:
<T> Future<T> submit(Runnable task, T result);
其實到時候會通過這兩個參數,将其包裝成 Callable。它和 Runnable 的差別在于 run() 沒有傳回值,而 Callable 的 call() 方法有傳回值,同時,如果運作出現異常,call() 方法會抛出異常。
public interface Callable<V> {
V call() throws Exception;
}
在這裡,就不展開說 FutureTask 類了,因為本文篇幅本來就夠大了,這裡我們需要知道怎麼用就行了。
下面,我們來看看 ExecutorService 的抽象實作 AbstractExecutorService 。
AbstractExecutorService
AbstractExecutorService 抽象類派生自 ExecutorService 接口,然後在其基礎上實作了幾個實用的方法,這些方法提供給子類進行調用。
這個抽象類實作了 invokeAny 方法和 invokeAll 方法,這裡的兩個 newTaskFor 方法也比較有用,用于将任務包裝成 FutureTask。定義于最上層接口 Executor中的 void execute(Runnable command) 由于不需要擷取結果,不會進行 FutureTask 的包裝。
需要擷取結果(FutureTask),用 submit 方法,不需要擷取結果,可以用 execute 方法。
下面,我将一行一行源碼地來分析這個類,跟着源碼來看看其實作吧:
Tips: invokeAny 和 invokeAll 方法占了這整個類的絕大多數篇幅,讀者可以選擇适當跳過,因為它們可能在你的實踐中使用的頻次比較低,而且它們不帶有承前啟後的作用,不用擔心會漏掉什麼導緻看不懂後面的代碼。
public abstract class AbstractExecutorService implements ExecutorService {
// RunnableFuture 是用于擷取執行結果的,我們常用它的子類 FutureTask
// 下面兩個 newTaskFor 方法用于将我們的任務包裝成 FutureTask 送出到線程池中執行
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
// 送出任務
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 1. 将任務包裝成 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 2. 交給執行器執行,execute 方法由具體的子類來實作
// 前面也說了,FutureTask 間接實作了Runnable 接口。
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 1. 将任務包裝成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task, result);
// 2. 交給執行器執行
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 1. 将任務包裝成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 2. 交給執行器執行
execute(ftask);
return ftask;
}
// 此方法目的:将 tasks 集合中的任務送出到線程池執行,任意一個線程執行完後就可以結束了
// 第二個參數 timed 代表是否設定逾時機制,逾時時間為第三個參數,
// 如果 timed 為 true,同時逾時了還沒有一個線程傳回結果,那麼抛出 TimeoutException 異常
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
// 任務數
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
//
List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
// ExecutorCompletionService 不是一個真正的執行器,參數 this 才是真正的執行器
// 它對執行器進行了包裝,每個任務結束後,将結果儲存到内部的一個 completionQueue 隊列中
// 這也是為什麼這個類的名字裡面有個 Completion 的原因吧。
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
// 用于儲存異常資訊,此方法如果沒有得到任何有效的結果,那麼我們可以抛出最後得到的一個異常
ExecutionException ee = null;
long lastTime = timed ? System.nanoTime() : 0;
Iterator<? extends Callable<T>> it = tasks.iterator();
// 首先先送出一個任務,後面的任務到下面的 for 循環一個個送出
futures.add(ecs.submit(it.next()));
// 送出了一個任務,是以任務數量減 1
--ntasks;
// 正在執行的任務數(送出的時候 +1,任務結束的時候 -1)
int active = 1;
for (;;) {
// ecs 上面說了,其内部有一個 completionQueue 用于儲存執行完成的結果
// BlockingQueue 的 poll 方法不阻塞,傳回 null 代表隊列為空
Future<T> f = ecs.poll();
// 為 null,說明剛剛送出的第一個線程還沒有執行完成
// 在前面先送出一個任務,加上這裡做一次檢查,也是為了提高性能
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
// 這裡是 else if,不是 if。這裡說明,沒有任務了,同時 active 為 0 說明
// 任務都執行完成了。其實我也沒了解為什麼這裡做一次 break?
// 因為我認為 active 為 0 的情況,必然從下面的 f.get() 傳回了
// 2018-02-23 感謝讀者 newmicro 的 comment,
// 這裡的 active == 0,說明所有的任務都執行失敗,那麼這裡是 for 循環出口
else if (active == 0)
break;
// 這裡也是 else if。這裡說的是,沒有任務了,但是設定了逾時時間,這裡檢測是否逾時
else if (timed) {
// 帶等待的 poll 方法
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
// 如果已經逾時,抛出 TimeoutException 異常,這整個方法就結束了
if (f == null)
throw new TimeoutException();
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
// 這裡是 else。說明,沒有任務需要送出,但是池中的任務沒有完成,還沒有逾時(如果設定了逾時)
// take() 方法會阻塞,直到有元素傳回,說明有任務結束了
else
f = ecs.take();
}
/*
* 我感覺上面這一段并不是很好了解,這裡簡單說下。
* 1. 首先,這在一個 for 循環中,我們設想每一個任務都沒那麼快結束,
* 那麼,每一次都會進到第一個分支,進行送出任務,直到将所有的任務都送出了
* 2. 任務都送出完成後,如果設定了逾時,那麼 for 循環其實進入了“一直檢測是否逾時”
這件事情上
* 3. 如果沒有設定逾時機制,那麼不必要檢測逾時,那就會阻塞在 ecs.take() 方法上,
等待擷取第一個執行結果
* 4. 如果所有的任務都執行失敗,也就是說 future 都傳回了,
但是 f.get() 抛出異常,那麼從 active == 0 分支出去(感謝 newmicro 提出)
// 當然,這個需要看下面的 if 分支。
*/
// 有任務結束了
if (f != null) {
--active;
try {
// 傳回執行結果,如果有異常,都包裝成 ExecutionException
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}// 注意看 for 循環的範圍,一直到這裡
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
// 方法退出之前,取消其他的任務
for (Future<T> f : futures)
f.cancel(true);
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
// 執行所有的任務,傳回任務結果。
// 先不要看這個方法,我們先想想,其實我們自己送出任務到線程池,也是想要線程池執行所有的任務
// 隻不過,我們是每次 submit 一個任務,這裡以一個集合作為參數送出
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
// 這個很簡單
for (Callable<T> t : tasks) {
// 包裝成 FutureTask
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
// 送出任務
execute(f);
}
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
// 這是一個阻塞方法,直到擷取到值,或抛出了異常
// 這裡有個小細節,其實 get 方法簽名上是會抛出 InterruptedException 的
// 可是這裡沒有進行處理,而是抛給外層去了。此異常發生于還沒執行完的任務被取消了
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
// 這個方法傳回,不像其他的場景,傳回 List<Future>,其實執行結果還沒出來
// 這個方法傳回是真正的傳回,任務都結束了
return futures;
} finally {
// 為什麼要這個?就是上面說的有異常的情況
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
// 帶逾時的 invokeAll,我們找不同吧
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null || unit == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
long lastTime = System.nanoTime();
Iterator<Future<T>> it = futures.iterator();
// 每送出一個任務,檢測一次是否逾時
while (it.hasNext()) {
execute((Runnable)(it.next()));
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
// 逾時
if (nanos <= 0)
return futures;
}
for (Future<T> f : futures) {
if (!f.isDone()) {
if (nanos <= 0)
return futures;
try {
// 調用帶逾時的 get 方法,這裡的參數 nanos 是剩餘的時間,
// 因為上面其實已經用掉了一些時間了
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
}
到這裡,我們發現,這個抽象類包裝了一些基本的方法,可是像 submit、invokeAny、invokeAll 等方法,它們都沒有真正開啟線程來執行任務,它們都隻是在方法内部調用了 execute 方法,是以最重要的 execute(Runnable runnable) 方法還沒出現,需要等具體執行器來實作這個最重要的部分,這裡我們要說的就是 ThreadPoolExecutor 類了。
鑒于本文的篇幅,我覺得看到這裡的讀者應該已經不多了,大家都習慣了快餐文化。我寫的每篇文章都力求讓讀者可以通過我的一篇文章而對相關内容有全面的了解,是以篇幅不免長了些。
ThreadPoolExecutor
ThreadPoolExecutor 是 JDK 中的線程池實作,這個類實作了一個線程池需要的各個方法,它實作了任務送出、線程管理、監控等等方法。
我們可以基于它來進行業務上的擴充,以實作我們需要的其他功能,比如實作定時任務的類 ScheduledThreadPoolExecutor 就繼承自 ThreadPoolExecutor。當然,這不是本文關注的重點,下面,還是趕緊進行源碼分析吧。
首先,我們來看看線程池實作中的幾個概念和處理流程。
我們先回顧下送出任務的幾個方法:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
一個最基本的概念是,submit 方法中,參數是 Runnable 類型(也有Callable 類型),這個參數不是用于 new Thread(runnable).start() 中的,此處的這個參數不是用于啟動線程的,這裡指的是任務,任務要做的事情是 run() 方法裡面定義的或 Callable 中的 call() 方法裡面定義的。
初學者往往會搞混這個,因為 Runnable 總是在各個地方出現,經常把一個 Runnable 包到另一個 Runnable 中。請把它想象成有個 Task 接口,這個接口裡面有一個 run() 方法。
我們回過神來繼續往下看,我畫了一個簡單的示意圖來描述線程池中的一些主要的構件:
當然,上圖沒有考慮隊列是否有界,送出任務時隊列滿了怎麼辦?什麼情況下會建立新的線程?送出任務時線程池滿了怎麼辦?空閑線程怎麼關掉?這些問題下面我們會一一解決。
我們經常會使用 Executors 這個工具類來快速構造一個線程池,對于初學者而言,這種工具類是很有用的,開發者不需要關注太多的細節,隻要知道自己需要一個線程池,僅僅提供必需的參數就可以了,其他參數都采用作者提供的預設值。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
這裡先不說有什麼差別,它們最終都會導向這個構造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 這幾個參數都是必須要有的
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
基本上,上面的構造方法中列出了我們最需要關心的幾個屬性了,下面逐個介紹下構造方法中出現的這幾個屬性:
- corePoolSize
核心線程數,不要摳字眼,反正先記着有這麼個屬性就可以了。
- maximumPoolSize
最大線程數,線程池允許建立的最大線程數。
- workQueue
任務隊列,BlockingQueue 接口的某個實作(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)。
- keepAliveTime
空閑線程的保活時間,如果某線程的空閑時間超過這個值都沒有任務給它做,那麼可以被關閉了。注意這個值并不會對所有線程起作用,如果線程池中的線程數少于等于核心線程數 corePoolSize,那麼這些線程不會因為空閑太長時間而被關閉,當然,也可以通過調用 allowCoreThreadTimeOut(true)使核心線程數内的線程也可以被回收。
- threadFactory
用于生成線程,一般我們可以用預設的就可以了。通常,我們可以通過它将我們的線程的名字設定得比較可讀一些,如 Message-Thread-1, Message-Thread-2 類似這樣。
- handler:
當線程池已經滿了,但是又有新的任務送出的時候,該采取什麼政策由這個來指定。有幾種方式可供選擇,像抛出異常、直接拒絕然後傳回等,也可以自己實作相應的接口實作自己的邏輯,這個之後再說。
除了上面幾個屬性外,我們再看看其他重要的屬性。
Doug Lea 采用一個 32 位的整數來存放線程池的狀态和目前池中的線程數,其中高 3 位用于存放線程池狀态,低 29 位表示線程數(即使隻有 29 位,也已經不小了,大概 5 億多,現在還沒有哪個機器能起這麼多線程的吧)。我們知道,java 語言在整數編碼上是統一的,都是采用補碼的形式,下面是簡單的移位操作和布爾操作,都是挺簡單的。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 這裡 COUNT_BITS 設定為 29(32-3),意味着前三位用于存放線程狀态,後29位用于存放線程數
// 很多初學者很喜歡在自己的代碼中寫很多 29 這種數字,或者某個特殊的字元串,然後分布在各個地方,這是非常糟糕的
private static final int COUNT_BITS = Integer.SIZE - 3;
// 000 11111111111111111111111111111
// 這裡得到的是 29 個 1,也就是說線程池的最大線程數是 2^29-1=536870911
// 以我們現在計算機的實際情況,這個數量還是夠用的
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 我們說了,線程池的狀态存放在高 3 位中
// 運算結果為 111跟29個0:111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 将整數 c 的低 29 位修改為 0,就得到了線程池的狀态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 将整數 c 的高 3 為修改為 0,就得到了線程池中的線程數
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
上面就是對一個整數的簡單的位操作,幾個操作方法将會在後面的源碼中一直出現,是以讀者最好把方法名字和其代表的功能記住,看源碼的時候也就不需要來來回回翻了。
在這裡,介紹下線程池中的各個狀态和狀态變化的轉換過程:
- RUNNING:這個沒什麼好說的,這是最正常的狀态:接受新的任務,處理等待隊列中的任務
- SHUTDOWN:不接受新的任務送出,但是會繼續處理等待隊列中的任務
- STOP:不接受新的任務送出,不再處理等待隊列中的任務,中斷正在執行任務的線程
- TIDYING:所有的任務都銷毀了,workCount 為 0。線程池的狀态在轉換為 TIDYING 狀态時,會執行鈎子方法 terminated()
- TERMINATED:terminated() 方法結束後,線程池的狀态就會變成這個
RUNNING 定義為 -1,SHUTDOWN 定義為 0,其他的都比 0 大,是以等于 0 的時候不能送出任務,大于 0 的話,連正在執行的任務也需要中斷。
看了這幾種狀态的介紹,讀者大體也可以猜到十之八九的狀态轉換了,各個狀态的轉換過程有以下幾種:
- RUNNING -> SHUTDOWN:當調用了 shutdown() 後,會發生這個狀态轉換,這也是最重要的
- (RUNNING or SHUTDOWN) -> STOP:當調用 shutdownNow() 後,會發生這個狀态轉換,這下要清楚 shutDown() 和 shutDownNow() 的差別了
- SHUTDOWN -> TIDYING:當任務隊列和線程池都清空後,會由 SHUTDOWN 轉換為 TIDYING
- STOP -> TIDYING:當任務隊列清空後,發生這個轉換
- TIDYING -> TERMINATED:這個前面說了,當 terminated() 方法結束後
上面的幾個記住核心的就可以了,尤其第一個和第二個。
另外,我們還要看看一個内部類 Worker,因為 Doug Lea 把線程池中的線程包裝成了一個個 Worker,翻譯成勞工,就是線程池中做任務的線程。是以到這裡,我們知道任務是 Runnable(内部變量名叫 task 或 command),線程是 Worker。
Worker 這裡又用到了抽象類 AbstractQueuedSynchronizer。題外話,AQS 在并發中真的是到處出現,而且非常容易使用,寫少量的代碼就能實作自己需要的同步方式(對 AQS 源碼感興趣的讀者請參看我之前寫的幾篇文章)。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
// 這個是真正的線程,任務靠你啦
final Thread thread;
// 前面說了,這裡的 Runnable 是任務。為什麼叫 firstTask?因為在建立線程的時候,如果同時指定了
// 這個線程起來以後需要執行的第一個任務,那麼第一個任務就是存放在這裡的(線程可不止執行這一個任務)
// 當然了,也可以為 null,這樣線程起來了,自己到任務隊列(BlockingQueue)中取任務(getTask 方法)就行了
Runnable firstTask;
// 用于存放此線程完成的任務數,注意了,這裡用了 volatile,保證可見性
volatile long completedTasks;
// Worker 隻有這一個構造方法,傳入 firstTask,也可以傳 null
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 調用 ThreadFactory 來建立一個新的線程
this.thread = getThreadFactory().newThread(this);
}
// 這裡調用了外部類的 runWorker 方法
public void run() {
runWorker(this);
}
...// 其他幾個方法沒什麼好看的,就是用 AQS 操作,來擷取這個線程的執行權,用了獨占鎖
}
前面雖然啰嗦,但是簡單。有了上面的這些基礎後,我們終于可以看看 ThreadPoolExecutor 的 execute 方法了,前面源碼分析的時候也說了,各種方法都最終依賴于 execute 方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 前面說的那個表示 “線程池狀态” 和 “線程數” 的整數
int c = ctl.get();
// 如果目前線程數少于核心線程數,那麼直接添加一個 worker 來執行任務,
// 建立一個新的線程,并把目前任務 command 作為這個線程的第一個任務(firstTask)
if (workerCountOf(c) < corePoolSize) {
// 添加任務成功,那麼就結束了。送出任務嘛,線程池已經接受了這個任務,這個方法也就可以傳回了
// 至于執行的結果,到時候會包裝到 FutureTask 中。
// 傳回 false 代表線程池不允許送出任務
if (addWorker(command, true))
return;
c = ctl.get();
}
// 到這裡說明,要麼目前線程數大于等于核心線程數,要麼剛剛 addWorker 失敗了
// 如果線程池處于 RUNNING 狀态,把這個任務添加到任務隊列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
/* 這裡面說的是,如果任務進入了 workQueue,我們是否需要開啟新的線程
* 因為線程數在 [0, corePoolSize) 是無條件開啟新的線程
* 如果線程數已經大于等于 corePoolSize,那麼将任務添加到隊列中,然後進到這裡
*/
int recheck = ctl.get();
// 如果線程池已不處于 RUNNING 狀态,那麼移除已經入隊的這個任務,并且執行拒絕政策
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果線程池還是 RUNNING 的,并且線程數為 0,那麼開啟新的線程
// 到這裡,我們知道了,這塊代碼的真正意圖是:擔心任務送出到隊列中了,但是線程都關閉了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果 workQueue 隊列滿了,那麼進入到這個分支
// 以 maximumPoolSize 為界建立新的 worker,
// 如果失敗,說明目前線程數已經達到 maximumPoolSize,執行拒絕政策
else if (!addWorker(command, false))
reject(command);
}
對建立線程的錯誤了解:如果線程數少于 corePoolSize,建立一個線程,如果線程數在 [corePoolSize, maximumPoolSize] 之間那麼可以建立線程或複用空閑線程,keepAliveTime 對這個區間的線程有效。
從上面的幾個分支,我們就可以看出,上面的這段話是錯誤的。
上面這些一時半會也不可能全部消化搞定,我們先繼續往下吧,到時候再回頭看幾遍。
這個方法非常重要 addWorker(Runnable firstTask, boolean core) 方法,我們看看它是怎麼建立新的線程的:
// 第一個參數是準備送出給這個線程執行的任務,之前說了,可以為 null
// 第二個參數為 true 代表使用核心線程數 corePoolSize 作為建立線程的界限,也就說建立這個線程的時候,
// 如果線程池中的線程總數已經達到 corePoolSize,那麼不能響應這次建立線程的請求
// 如果是 false,代表使用最大線程數 maximumPoolSize 作為界限
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 這個非常不好了解
// 如果線程池已關閉,并滿足以下條件之一,那麼不建立新的 worker:
// 1. 線程池狀态大于 SHUTDOWN,其實也就是 STOP, TIDYING, 或 TERMINATED
// 2. firstTask != null
// 3. workQueue.isEmpty()
// 簡單分析下:
// 還是狀态控制的問題,當線程池處于 SHUTDOWN 的時候,不允許送出任務,但是已有的任務繼續執行
// 當狀态大于 SHUTDOWN 時,不允許送出任務,且中斷正在執行的任務
// 多說一句:如果線程池處于 SHUTDOWN,但是 firstTask 為 null,且 workQueue 非空,那麼是允許建立 worker 的
// 這是因為 SHUTDOWN 的語義:不允許送出新的任務,但是要把已經進入到 workQueue 的任務執行完,是以在滿足條件的基礎上,是允許建立新的 Worker 的
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 如果成功,那麼就是所有建立線程前的條件校驗都滿足了,準備建立線程執行任務了
// 這裡失敗的話,說明有其他線程也在嘗試往線程池中建立線程
if (compareAndIncrementWorkerCount(c))
break retry;
// 由于有并發,重新再讀取一下 ctl
c = ctl.get();
// 正常如果是 CAS 失敗的話,進到下一個裡層的for循環就可以了
// 可是如果是因為其他線程的操作,導緻線程池的狀态發生了變更,如有其他線程關閉了這個線程池
// 那麼需要回到外層的for循環
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
/*
* 到這裡,我們認為在目前這個時刻,可以開始建立線程來執行任務了,
* 因為該校驗的都校驗了,至于以後會發生什麼,那是以後的事,至少目前是滿足條件的
*/
// worker 是否已經啟動
boolean workerStarted = false;
// 是否已将這個 worker 添加到 workers 這個 HashSet 中
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
// 把 firstTask 傳給 worker 的構造方法
w = new Worker(firstTask);
// 取 worker 中的線程對象,之前說了,Worker的構造方法會調用 ThreadFactory 來建立一個新的線程
final Thread t = w.thread;
if (t != null) {
// 這個是整個線程池的全局鎖,持有這個鎖才能讓下面的操作“順理成章”,
// 因為關閉一個線程池需要這個鎖,至少我持有鎖的期間,線程池不會被關閉
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
// 小于 SHUTTDOWN 那就是 RUNNING,這個自不必說,是最正常的情況
// 如果等于 SHUTDOWN,前面說了,不接受新的任務,但是會繼續執行等待隊列中的任務
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// worker 裡面的 thread 可不能是已經啟動的
if (t.isAlive())
throw new IllegalThreadStateException();
// 加到 workers 這個 HashSet 中
workers.add(w);
int s = workers.size();
// largestPoolSize 用于記錄 workers 中的個數的最大值
// 因為 workers 是不斷增加減少的,通過這個值可以知道線程池的大小曾經達到的最大值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加成功的話,啟動這個線程
if (workerAdded) {
// 啟動線程
t.start();
workerStarted = true;
}
}
} finally {
// 如果線程沒有啟動,需要做一些清理工作,如前面 workCount 加了 1,将其減掉
if (! workerStarted)
addWorkerFailed(w);
}
// 傳回線程是否啟動成功
return workerStarted;
}
簡單看下 addWorkFailed 的處理:
// workers 中删除掉相應的 worker
// workCount 減 1
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
// rechecks for termination, in case the existence of this worker was holding up termination
tryTerminate();
} finally {
mainLock.unlock();
}
}
回過頭來,繼續往下走。我們知道,worker 中的線程 start 後,其 run 方法會調用 runWorker 方法:
// Worker 類的 run() 方法
public void run() {
runWorker(this);
}
繼續往下看 runWorker 方法:
// 此方法由 worker 線程啟動後調用,這裡用一個 while 循環來不斷地從等待隊列中擷取任務并執行
// 前面說了,worker 在初始化的時候,可以指定 firstTask,那麼第一個任務也就可以不需要從隊列中擷取
final void runWorker(Worker w) {
//
Thread wt = Thread.currentThread();
// 該線程的第一個任務(如果有的話)
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循環調用 getTask 擷取任務
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果線程池狀态大于等于 STOP,那麼意味着該線程也要中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 這是一個鈎子方法,留給需要的子類實作
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 到這裡終于可以執行任務了
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
// 這裡不允許抛出 Throwable,是以轉換為 Error
thrown = x; throw new Error(x);
} finally {
// 也是一個鈎子方法,将 task 和異常作為參數,留給需要的子類實作
afterExecute(task, thrown);
}
} finally {
// 置空 task,準備 getTask 擷取下一個任務
task = null;
// 累加完成的任務數
w.completedTasks++;
// 釋放掉 worker 的獨占鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 如果到這裡,需要執行線程關閉:
// 1. 說明 getTask 傳回 null,也就是說,隊列中已經沒有任務需要執行了,執行關閉
// 2. 任務執行過程中發生了異常
// 第一種情況,已經在代碼處理了将 workCount 減 1,這個在 getTask 方法分析中會說
// 第二種情況,workCount 沒有進行處理,是以需要在 processWorkerExit 中處理
// 限于篇幅,我不準備分析這個方法了,感興趣的讀者請自行分析源碼
processWorkerExit(w, completedAbruptly);
}
}
我們看看 getTask() 是怎麼擷取任務的,這個方法寫得真的很好,每一行都很簡單,組合起來卻所有的情況都想好了:
// 此方法有三種可能:
// 1. 阻塞直到擷取到任務傳回。我們知道,預設 corePoolSize 之内的線程是不會被回收的,
// 它們會一直等待任務
// 2. 逾時退出。keepAliveTime 起作用的時候,也就是如果這麼多時間内都沒有任務,那麼應該執行關閉
// 3. 如果發生了以下條件,此方法必須傳回 null:
// - 池中有大于 maximumPoolSize 個 workers 存在(通過調用 setMaximumPoolSize 進行設定)
// - 線程池處于 SHUTDOWN,而且 workQueue 是空的,前面說了,這種不再接受新的任務
// - 線程池處于 STOP,不僅不接受新的線程,連 workQueue 中的線程也不再執行
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 兩種可能
// 1. rs == SHUTDOWN && workQueue.isEmpty()
// 2. rs >= STOP
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// CAS 操作,減少工作線程數
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
// 允許核心線程數内的線程回收,或目前線程數超過了核心線程數,那麼有可能發生逾時關閉
timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 這裡 break,是為了不往下執行後一個 if (compareAndDecrementWorkerCount(c))
// 兩個 if 一起看:如果目前線程數 wc > maximumPoolSize,或者逾時,都傳回 null
// 那這裡的問題來了,wc > maximumPoolSize 的情況,為什麼要傳回 null?
// 換句話說,傳回 null 意味着關閉線程。
// 那是因為有可能開發者調用了 setMaximumPoolSize() 将線程池的 maximumPoolSize 調小了,那麼多餘的 Worker 就需要被關閉
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
// compareAndDecrementWorkerCount(c) 失敗,線程池中的線程數發生了改變
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
// wc <= maximumPoolSize 同時沒有逾時
try {
// 到 workQueue 中擷取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
// 如果此 worker 發生了中斷,采取的方案是重試
// 解釋下為什麼會發生中斷,這個讀者要去看 setMaximumPoolSize 方法。
// 如果開發者将 maximumPoolSize 調小了,導緻其小于目前的 workers 數量,
// 那麼意味着超出的部分線程要被關閉。重新進入 for 循環,自然會有部分線程會傳回 null
timedOut = false;
}
}
}
到這裡,基本上也說完了整個流程,讀者這個時候應該回到 execute(Runnable command) 方法,看看各個分支,我把代碼貼過來一下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 前面說的那個表示 “線程池狀态” 和 “線程數” 的整數
int c = ctl.get();
// 如果目前線程數少于核心線程數,那麼直接添加一個 worker 來執行任務,
// 建立一個新的線程,并把目前任務 command 作為這個線程的第一個任務(firstTask)
if (workerCountOf(c) < corePoolSize) {
// 添加任務成功,那麼就結束了。送出任務嘛,線程池已經接受了這個任務,這個方法也就可以傳回了
// 至于執行的結果,到時候會包裝到 FutureTask 中。
// 傳回 false 代表線程池不允許送出任務
if (addWorker(command, true))
return;
c = ctl.get();
}
// 到這裡說明,要麼目前線程數大于等于核心線程數,要麼剛剛 addWorker 失敗了
// 如果線程池處于 RUNNING 狀态,把這個任務添加到任務隊列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
/* 這裡面說的是,如果任務進入了 workQueue,我們是否需要開啟新的線程
* 因為線程數在 [0, corePoolSize) 是無條件開啟新的線程
* 如果線程數已經大于等于 corePoolSize,那麼将任務添加到隊列中,然後進到這裡
*/
int recheck = ctl.get();
// 如果線程池已不處于 RUNNING 狀态,那麼移除已經入隊的這個任務,并且執行拒絕政策
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果線程池還是 RUNNING 的,并且線程數為 0,那麼開啟新的線程
// 到這裡,我們知道了,這塊代碼的真正意圖是:擔心任務送出到隊列中了,但是線程都關閉了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果 workQueue 隊列滿了,那麼進入到這個分支
// 以 maximumPoolSize 為界建立新的 worker,
// 如果失敗,說明目前線程數已經達到 maximumPoolSize,執行拒絕政策
else if (!addWorker(command, false))
reject(command);
}
上面各個分支中,有兩種情況會調用 reject(command) 來處理任務,因為按照正常的流程,線程池此時不能接受這個任務,是以需要執行我們的拒絕政策。接下來,我們說一說 ThreadPoolExecutor 中的拒絕政策。
final void reject(Runnable command) {
// 執行拒絕政策
handler.rejectedExecution(command, this);
}
此處的 handler 我們需要在構造線程池的時候就傳入這個參數,它是 RejectedExecutionHandler 的執行個體。
RejectedExecutionHandler 在 ThreadPoolExecutor 中有四個已經定義好的實作類可供我們直接使用,當然,我們也可以實作自己的政策,不過一般也沒有必要。
// 隻要線程池沒有被關閉,那麼由送出任務的線程自己來執行這個任務。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
// 不管怎樣,直接抛出 RejectedExecutionException 異常
// 這個是預設的政策,如果我們構造線程池的時候不傳相應的 handler 的話,那就會指定使用這個
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
// 不做任何處理,直接忽略掉這個任務
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
// 這個相對霸道一點,如果線程池沒有被關閉的話,
// 把隊列隊頭的任務(也就是等待了最長時間的)直接扔掉,然後送出這個任務到等待隊列中
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
到這裡,ThreadPoolExecutor 的源碼算是分析結束了。單純從源碼的難易程度來說,ThreadPoolExecutor 的源碼還算是比較簡單的,隻是需要我們靜下心來好好看看罷了。
Executors
這節其實也不是分析 Executors 這個類,因為它僅僅是工具類,它的所有方法都是 static 的。
- 生成一個固定大小的線程池:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
最大線程數設定為與核心線程數相等,此時 keepAliveTime 設定為 0(因為這裡它是沒用的,即使不為 0,線程池預設也不會回收 corePoolSize 内的線程),任務隊列采用 LinkedBlockingQueue,無界隊列。
過程分析:剛開始,每送出一個任務都建立一個 worker,當 worker 的數量達到 nThreads 後,不再建立新的線程,而是把任務送出到 LinkedBlockingQueue 中,而且之後線程數始終為 nThreads。
- 生成隻有一個線程的固定線程池,這個更簡單,和上面的一樣,隻要設定線程數為 1 就可以了:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 生成一個需要的時候就建立新的線程,同時可以複用之前建立的線程(如果這個線程目前沒有任務)的線程池:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心線程數為 0,最大線程數為 Integer.MAX_VALUE,keepAliveTime 為 60 秒,任務隊列采用 SynchronousQueue。
這種線程池對于任務可以比較快速地完成的情況有比較好的性能。如果線程空閑了 60 秒都沒有任務,那麼将關閉此線程并從線程池中移除。是以如果線程池空閑了很長時間也不會有問題,因為随着所有的線程都會被關閉,整個線程池不會占用任何的系統資源。
過程分析:我把 execute 方法的主體黏貼過來,讓大家看得明白些。鑒于 corePoolSize 是 0,那麼送出任務的時候,直接将任務送出到隊列中,由于采用了 SynchronousQueue,是以如果是第一個任務送出的時候,offer 方法肯定會傳回 false,因為此時沒有任何 worker 對這個任務進行接收,那麼将進入到最後一個分支來建立第一個 worker。之後再送出任務的話,取決于是否有空閑下來的線程對任務進行接收,如果有,會進入到第二個 if 語句塊中,否則就是和第一個任務一樣,進到最後的 else if 分支建立新線程。
int c = ctl.get();
// corePoolSize 為 0,是以不會進到這個 if 分支
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// offer 如果有空閑線程剛好可以接收此任務,那麼傳回 true,否則傳回 false
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
SynchronousQueue 是一個比較特殊的 BlockingQueue,其本身不儲存任何元素,它有一個虛拟隊列(或虛拟棧),不管讀操作還是寫操作,如果目前隊列中存儲的是與目前操作相同模式的線程,那麼目前操作也進入隊列中等待;如果是相反模式,則配對成功,從目前隊列中取隊頭節點。具體的資訊,可以看我的另一篇關于 BlockingQueue 的文章。
總結
我一向不喜歡寫總結,因為我把所有需要表達的都寫在正文中了,寫小篇幅的總結并不能真正将話說清楚,本文的總結部分為準備面試的讀者而寫,希望能幫到面試者或者沒有足夠的時間看完全文的讀者。
- java 線程池有哪些關鍵屬性?
corePoolSize,maximumPoolSize,workQueue,keepAliveTime,rejectedExecutionHandler
corePoolSize 到 maximumPoolSize 之間的線程會被回收,當然 corePoolSize 的線程也可以通過設定而得到回收(allowCoreThreadTimeOut(true))。
workQueue 用于存放任務,添加任務的時候,如果目前線程數超過了 corePoolSize,那麼往該隊列中插入任務,線程池中的線程會負責到隊列中拉取任務。
keepAliveTime 用于設定空閑時間,如果線程數超出了 corePoolSize,并且有些線程的空閑時間超過了這個值,會執行關閉這些線程的操作
rejectedExecutionHandler 用于處理當線程池不能執行此任務時的情況,預設有抛出 RejectedExecution異常、忽略任務、使用送出任務的線程來執行此任務和将隊列中等待最久的任務删除,然後送出此任務這四種政策,預設為抛出異常。
- 說說線程池中的線程建立時機?
- 如果目前線程數少于 corePoolSize,那麼送出任務的時候建立一個新的線程,并由這個線程執行這個任務;
- 如果目前線程數已經達到 corePoolSize,那麼将送出的任務添加到隊列中,等待線程池中的線程去隊列中取任務;
- 如果任務隊列已滿,那麼建立新的線程來執行任務,需要保證池中的線程數不會超過 maximumPoolSize,如果此時線程數超過了 maximumPoolSize,那麼執行拒絕政策。
- * 注意:如果将隊列設定為無界隊列,那麼線程數達到 corePoolSize 後,其實線程數就不會再增長了。因為後面的任務直接往隊列塞就行了,此時 maximumPoolSize 參數就沒有什麼意義。
- Executors.newFixedThreadPool(…) 和 Executors.newCachedThreadPool() 構造出來的線程池有什麼差别?
細說太長,往上滑一點點,在 Executors 的小節進行了詳盡的描述。
- 任務執行過程中發生異常怎麼處理?
如果某個任務執行出現異常,那麼執行任務的線程會被關閉,而不是繼續接收其他任務。然後會啟動一個新的線程來代替它。
- 什麼時候會執行拒絕政策?
- workers 的數量達到了 corePoolSize(任務此時需要進入任務隊列),任務入隊成功,與此同時線程池被關閉了,而且關閉線程池并沒有将這個任務出隊,那麼執行拒絕政策。這裡說的是非常邊界的問題,入隊和關閉線程池并發執行,讀者仔細看看 execute 方法是怎麼進到第一個 reject(command) 裡面的。
- workers 的數量大于等于 corePoolSize,将任務加入到任務隊列,可是隊列滿了,任務入隊失敗,那麼準備開啟新的線程,可是線程數已經達到 maximumPoolSize,那麼執行拒絕政策。
參考:
https://m.toutiaocdn.cn/item/6687534542913274382/?app=news_article×tamp=1557357009&req_id=20190509071009010025065076647F8BE&group_id=6687534542913274382
https://m.toutiaocdn.cn/item/6687025748340900359/?app=news_article×tamp=1557357040&req_id=201905090710401720170000013076D93&group_id=6687025748340900359