Java面試中,線程池也算是一個高頻的問題,其實就JDK源碼來看線程池這一塊的實作代碼應該算是寫的清晰易懂的,通過這篇文章,我們就來盤點一下線程池的知識點。
本文基于JDK1.8源碼進行分析
首先看下線程池構造函數:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//忽略指派與校驗邏輯
}
構造參數比較多,一個一個說下:
- corePoolSize線程池中的核心線程數
- maximumPoolSize線程池中的最大線程數
- keepAliveTime線程池中的線程存活時間(準确來說應該是沒有任務執行時的回收時間,後面會分析)
- unit時間機關
- workQueue來不及執行的任務存放的阻塞隊列
- threadFactory建立woker線程(注意不是我們送出的任務)是進行一些屬性設定,比如線程名,優先級等等,有預設實作。
- handler 任務拒絕政策,當運作線程數已達到maximumPoolSize,隊列也已經裝滿時會調用該參數拒絕任務,有預設實作。
當我們向線程池送出任務時,通常使用execute方法,接下來就先從該方法開始分析。
在分析execute代碼之前,需要先說明下,我們都知道線程池是維護了一批線程來處理使用者送出的任務,達到線程複用的目的,線程池維護的這批線程被封裝成了Worker。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//JDK8的源碼中,線程池本身的狀态跟worker數量使用同一個變量ctl來維護
int c = ctl.get();
//通過位運算得出當然線程池中的worker數量與構造參數corePoolSize進行比較
if (workerCountOf(c) < corePoolSize) {
//如果小于corePoolSize,則直接新增一個worker,并把當然使用者送出的任務command作為參數,如果成功則傳回。
if (addWorker(command, true))
return;
//如果失敗,則擷取最新的線程池資料
c = ctl.get();
}
//如果線程池仍在運作,則把任務放到阻塞隊列中等待執行。
if (isRunning(c) && workQueue.offer(command)) {
//這裡的recheck思路是為了處理并發問題
int recheck = ctl.get();
//當任務成功放入隊列時,如果recheck發現線程池已經不再運作了則從隊列中把任務删除
if (! isRunning(recheck) && remove(command))
//删除成功以後,會調用構造參數傳入的拒絕政策。
reject(command);
//如果worker的數量為0(此時隊列中可能有任務沒有執行),則建立一個worker(由于此時建立woker的目的是執行隊列中堆積的任務,
//是以入參沒有執行任務,詳細邏輯後面會詳細分析addWorker方法)。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果前面的新增woker,放入隊列都失敗,則會繼續新增worker,此時線程池的狀态是woker數量達到corePoolSize,阻塞隊列任務已滿
//隻能基于maximumPoolSize參數建立woker
else if (!addWorker(command, false))
//如果基于maximumPoolSize建立woker失敗,此時是線程池中線程數已達到上限,隊列已滿,則調用構造參數中傳入的拒絕政策
reject(command);
}
源碼裡我增加了很多注釋,需要多讀幾遍才能完全了解,總結一下使用者向線程池送出任務以後,線程池的執行邏輯:
- 如果目前woker數量小于corePoolSize,則建立一個woker并把目前任務配置設定給該woker線程,成功則傳回。
- 如果第一步失敗,則嘗試把任務放入阻塞隊列,如果成功則傳回。
- 如果第二步失敗,則判斷如果目前woker數量小于maximumPoolSize,則建立一個woker并把目前任務配置設定給該woker線程,成功則傳回。
- 如果第三步失敗,則調用拒絕政策處理該任務。
從execute的源碼可以看出addWorker方法是重中之重,馬上來看下它的實作。
addWorker方法:
private boolean addWorker(Runnable firstTask, boolean core) {
//這裡有一段基于CAS+死循環實作的關于線程池狀态,線程數量的校驗與更新邏輯就先忽略了,重點看主流程。
//...
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//把指定任務作為參數建立一個worker線程
w = new Worker(firstTask);
//這裡是重點,咋一看,一定以為w.thread就是我們傳入的firstTask
//其實是通過線程池構造函數參數threadFactory生成的woker對象
//也就是說這個變量t就是代表woker線程。絕對不是使用者送出的線程任務firstTask!!!
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//加鎖之後仍舊是判斷線程池狀态等一些校驗邏輯。
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
//把建立的woker線程放入集合儲存,這裡使用的是HashSet
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//然後啟動woker線程
//這裡再強調一遍上面說的邏輯,該變量t代表woker線程,也就是會調用woker的run方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//如果woker啟動失敗,則進行一些善後工作,比如說修改目前woker數量等等
addWorkerFailed(w);
}
return workerStarted;
}
addWorker方法主要做的工作就是建立一個Woker線程,加入到woker集合中,然後啟動該線程,那麼接下來的重點就是Woker類的run方法了。
worker執行方法:
//Woker類實作了Runnable接口
public void run() {
runWorker(this);
}
//最終woker執行邏輯走到了這裡
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//task就是Woker構造函數入參指定的任務,即使用者送出的任務
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
//一般情況下,task都不會為空(特殊情況上面注釋中也說明了),是以會直接進入循環體中
//這裡getTask方法是要重點說明的,它的實作跟我們構造參數設定存活時間有關
//我們都知道構造參數設定的時間代表了線程池中的線程,即woker線程的存活時間,如果到期則回收woker線程,這個邏輯的實作就在getTask中。
//來不及執行的任務,線程池會放入一個阻塞隊列,getTask方法就是去阻塞隊列中取任務,使用者設定的存活時間,就是
//從這個阻塞隊列中取任務等待的最大時間,如果getTask傳回null,意思就是woker等待了指定時間仍然沒有
//取到任務,此時就會跳過循環體,進入woker線程的銷毀邏輯。
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//該方法是個空的實作,如果有需要使用者可以自己繼承該類進行實作
beforeExecute(wt, task);
Throwable thrown = null;
try {
//真正的任務執行邏輯
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//該方法是個空的實作,如果有需要使用者可以自己繼承該類進行實作
afterExecute(task, thrown);
}
} finally {
//這裡設為null,也就是循環體再執行的時候會調用getTask方法
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//當指定任務執行完成,阻塞隊列中也取不到可執行任務時,會進入這裡,做一些善後工作,比如在corePoolSize跟maximumPoolSize之間的woker會進行回收
processWorkerExit(w, completedAbruptly);
}
}
woker線程的執行流程就是首先執行初始化時配置設定給的任務,執行完成以後會嘗試從阻塞隊列中擷取可執行的任務,如果指定時間内仍然沒有任務可以執行,則進入銷毀邏輯。
注:這裡隻會回收corePoolSize與maximumPoolSize直接的那部分woker
了解了整個線程池的運作原理以後,再來看下JDK預設提供的線程池類型就會一目了然了:
public static ExecutorService newFixedThreadPool(int nThreads) {
//corePoolSize跟maximumPoolSize值一樣,同時傳入一個無界阻塞隊列
//根據上面分析的woker回收邏輯,該線程池的線程會維持在指定線程數,不會進行回收
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
//線程池中隻有一個線程進行任務執行,其他的都放入阻塞隊列
//外面包裝的FinalizableDelegatedExecutorService類實作了finalize方法,在JVM垃圾回收的時候會關閉線程池
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
//這個線程池corePoolSize為0,maximumPoolSize為Integer.MAX_VALUE,意思也就是說來一個任務就建立一個woker,回收時間是60s
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
最後再說說初始化線程池時線程數的選擇:
- 如果任務是IO密集型,一般線程數需要設定2倍CPU數以上,以此來盡量利用CPU資源。
- 如果任務是CPU密集型,一般線程數量隻需要設定CPU數加1即可,更多的線程數也隻能增加上下文切換,不能增加CPU使用率。
上述隻是一個基本思想,如果真的需要精确的控制,還是需要上線以後觀察線程池中線程數量跟隊列的情況來定。