原文位址:
https://www.colabug.com/2114619.html
ThreadPoolExecutor執行execute方法:
這個方法執行任務時:
- 判斷目前線程池線程數量是否小于核心線程池大小,是則建立線程并啟動,否則到第2步
- 判斷任務隊列是否已滿,未滿則将任務加入阻塞隊列,已滿則到第3步
-
判斷目前線程池線程數量是否小于最大線程池大小,是則建立線程并啟動,否則執行飽和政策
execute方法:
public void execute(Runnable command) {
/任務為空,抛出空指針異常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/判斷目前線程數量是否小于核心線程數
if (workerCountOf(c) < corePoolSize) {
/是則添加一個核心線程(true表示核心線程)到線程池,并且啟動線程執行任務(addWorker方法裡會啟動)
if (addWorker(command, true))
return; /添加成功則傳回
c = ctl.get();
}
/線程池處于運作狀态則向阻塞隊列添加該任務
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
/判斷線程池是否處于運作狀态,不是就移除剛才添加的任務
if (! isRunning(recheck) && remove(command))
/移除成功就執行飽和政策,這樣整個方法就結束了
reject(command);
/否則若處于運作狀态或移除失敗,這時無論處于哪種情況任務都在阻塞隊列裡,判斷目前線程數量是否為
else if (workerCountOf(recheck) == )
若是則添加一個線程并啟動
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
addWorker方法:
boolean addWorker(Runnable firstTask, boolean core) 方法的作用就是建立 Worker 對象并啟動這個對象裡的線程( Worker 裡一個 Thread 類型的字段)。
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet workers = new HashSet();
private int largestPoolSize;
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/如果線程池不處于運作狀态,理論上不應該添加一個執行該任務的線程,但如果滿足下面三個條件的話就可以通過:
線程池狀态是關閉
要執行的任務為空
阻塞隊列不為空
因為線程池關閉後不允許送出任務,但關閉後會執行完阻塞隊列的任務,是以允許添加一個firstTask為空的線程
來幫助執行完阻塞隊列裡的任務
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
/若目前線程池的線程數量達到了線程池所允許的最大線程數或所指定要添加線程類型的線程數量則傳回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/到這裡前面的限制條件都通過,現在嘗試将線程數量增一,成功則退出最外層的循環
if (compareAndIncrementWorkerCount(c))
break retry;
/失敗則重新擷取線程池狀态,狀态改變則從最外層循環開始執行,不變則從内循環開始執行
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
/構造一個Worker對象,每個Worker對象綁定一個線程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
/若線程池處于運作狀态或處于關閉且firstTask為null
if (rs largestPoolSize)
largestPoolSize = s;
/添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
/若添加成功則啟動線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
/若啟動失敗(t線程為空或添加過程中抛出異常)則執行addWorkerFailed方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
runWorker方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); /允許中斷,與Worker構造函數的setState(-)是一對的
boolean completedAbruptly = true;
try {
/擷取到任務才進入循環
while (task != null || (task = getTask()) != null) {
/加鎖,表示非空閑狀态
w.lock();
/ 如果線程池狀态大于等于STOP并且本線程未中斷,則應該執行中斷方法
或者執行Thread.interrupted()方法判斷本線程是否中斷并且清除中斷狀态,
如果發現線程池狀态大于等于STOP則執行中斷方法。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
/ThreadPoolExecutor中的beforeExecute(wt, task)方法一個空方法,用來留給繼承ThreadPoolExecutor的類
來重寫該方法并在任務執行前執行
beforeExecute(wt, task);
Throwable thrown = null;
try {
/執行擷取到的任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
/ThreadPoolExecutor中的afterExecute(task,thrown)方法也是一個空方法,用來留給繼承
ThreadPoolExecutor的類來重寫該方法并在任務執行後執行
afterExecute(task, thrown);
}
} finally {
task = null;
/該線程執行的任務加,即使抛出異常
w.completedTasks++;
/釋放鎖,表示回到空閑狀态
w.unlock();
}
}
/執行到這一步表示是由于擷取不到任務而正常退出的,是以completedAbruptly為false
completedAbruptly = false;
} finally {
/無論怎樣退出都要執行
processWorkerExit(w, completedAbruptly);
}
}
getTask方法:
private Runnable getTask() {
/表示擷取任務是否已逾時
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/ 若線程池狀态大于等于停止狀态,此時線程池不再處理隊列的任務,并且會回收所有線程(不管空不空閑),
是以此時應該把線程池線程數量減,并且擷取的任務為空
/ 處于關閉狀态且任務隊列為空,表示任務隊列為空且不會有任務送出,是以線程數減,并且擷取的任務為空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/是否啟用逾時機制。當允許核心線程逾時或目前線程數超過核心線程則啟用
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/如果線程數量超過線程池所允許的最大線程數或者啟用逾時機制情況下擷取任務逾時,理論上應該回收線程。
但是如果該線程是線程池中的最後一個線程且任務隊列不為空就可以不回收,繼續運作,要是還有其他線程或者任務隊列為空則回收該線程。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > || workQueue.isEmpty())) {
/嘗試将線程數量減,成功傳回null,失敗繼續從循環開始處開始。這裡為什麼不是用decrementWorkerCount()
這種不會失敗的方法減而采用這種方式。是因為 wc > ,如果線程池不隻有一個線程它們互相發現不隻一個線程,
且它們同時執行不會失敗的将線程數量減一的方法,到時線程池線程數量可能就為了,哪麼隊列中的任務就沒線程執行了。
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/ 如果啟用逾時機制就執行poll()方法,在keepAliveTime納秒内還沒擷取就傳回null。
如果未啟用逾時機制就執行take()方法,隊列沒任務就一直阻塞直到有任務。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
/到這裡就是因為逾時擷取不到任務
timedOut = true;
} catch (InterruptedException retry) {
/在執行take()過程中被中斷并不算逾時
timedOut = false;
}
}
}
tryTerminate方法:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/如果滿足下面任意一個條件就沒辦法到達結束狀态
線程池處于運作狀态
線程池狀态是TIDYING或已經是結束狀态
線程池處于關閉狀态且任務隊列不為空
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
/目前線程數量不為也無法到達結束狀态
if (workerCountOf(c) != ) {
/中斷一個空閑線程
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/嘗試将線程池狀态設定為TIDYING,失敗重循環開始處開始
if (ctl.compareAndSet(c, ctlOf(TIDYING, ))) {
try {
/terminated()是一個空方法,留給繼承ThreadPoolExecutor的類覆寫
terminated();
} finally {
/嘗試将線程池狀态設定為TERMINATED
ctl.set(ctlOf(TERMINATED, ));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}