ThreadPoolExecutor
- 生命周期
- 執行流程
-
- 線程池建立
- 線程的送出
- 核心方法
-
- addWorker
- ThreadPoolExecutor.Worker
- runWorker
- getTask
參考于 ThreadPoolExecutor
生命周期
狀态 | 介紹 |
---|---|
RUNNING | 允許接收新任務,處理隊列中的任務 |
SHUTDOWN | 不接收新的任務,處理完隊列中的任務 |
STOP | 不接收新的任務,不處理隊列中的任務,嘗試中斷正在執行任務的線程 |
TIDYING | 所有任務被終止了,工作線程數workCount也被設為0,線程的狀态也被設為TIDYING,并開始調用鈎子函數terminated() |
TERMINATED | 鈎子函數terminated()執行完畢 |
生命周期轉換:(不可逆)
執行流程
線程池建立
/**
* @param corePoolSize 核心線程數。即使空閑,也要保留在池中的線程數,除非設定了{@code allowCoreThreadTimeOut}
* @param maximumPoolSize 線程池最大容量大小
* @param keepAliveTime 當線程數大于核心線程數時,多餘的空閑線程将在終止之前等待新任務的最長時間
* @param unit keepAliveTime時間機關
* @param workQueue 用于在任務暫挂之前用于保留任務的隊列。該隊列将僅儲存由{@code execute}方法送出的{@code Runnable}任務。
* @param threadFactory 線程工廠
* @param handler 因為達到了線程界限和隊列容量而在執行被阻塞時使用的處理程式
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
線程的送出
- workerCount < corePoolSize,新送出任務将建立一個新Worker執行任務,即使此時線程池中存在空閑線程;
- 當線程池達到corePoolSize時,新送出任務将被放入workQueue中,等待線程池中任務排程執行;
- 當workQueue已滿,且maximumPoolSize > corePoolSize時,新送出任務會建立新線程執行任務;
- 當送出任務數超過maximumPoolSize時,新送出任務由RejectedExecutionHandler處理;
- 當線程池中超過corePoolSize線程或者allowCoreThreadTimeOut為true,空閑時間達到keepAliveTime時,關閉空閑線程;
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//ctl存儲worker個數和線程池狀态
int c = ctl.get();
//如果worker個數小于核心線程數
if (workerCountOf(c) < corePoolSize) {
//建立worker并添加到workers中
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果線程池正在運作中且加入workQueue成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//二次校驗線程池狀态
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
Worker負責處理處理循環處理Runnable
核心方法
addWorker
先了解一下《retry文法》
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//死循環
for (;;) {
int c = ctl.get();
//擷取線程池運作狀态
int rs = runStateOf(c);
//判斷目前線程池的狀态是不是已經SHUTDOWN,如果SHUTDOWN了拒絕線程加入
//(rs!=SHUTDOWN || firstTask!=null || workQueue.isEmpty())
//如果rs不為SHUTDOWN,狀态是STOP、TIDYING或TERMINATED,是以此時要拒絕請求
//如果狀态為SHUTDOWN,而傳入一個不為null的線程,那麼需要拒絕
//如果狀态為SHUTDOWN,同時隊列中已經沒任務了,那麼拒絕掉
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//死循環
for (;;) {
//擷取worker個數
int wc = workerCountOf(c);
//worker個數超過容量,則傳回失敗
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//線程個數增加成功,則執行循環體以下的代碼
if (compareAndIncrementWorkerCount(c))
break retry;
//重新check線程池狀态,如果狀态改變了,則重新進入第一個循環
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
ThreadPoolExecutor.Worker
Worker繼承《AQS》,實作Runnable接口
Worker
的構造方法:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); //thread運作時執行的是Worker的run方法
}
Worker
的
run
方法:
public void run() {
runWorker(this);
}
runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//先執行firstTask,之後循環getTask擷取Task
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//運作task
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask
擷取Task
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//釋放線程worker,當線程池中超過corePoolSize線程或者allowCoreThreadTimeOut為true,空閑時間達到keepAliveTime時,關閉空閑線程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//可能阻塞
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}