线程池的状态
The workerCount is the number of workers that have been permitted to start and not permitted to stop
RUNNING: Accept new tasks and process queued tasks
SHUTDOWN: Don't accept new tasks, but process queued tasks
STOP: Don't accept new tasks, don't process queued tasks,and interrupt in-progress tasks
TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
TERMINATED: terminated() has completed
RUNNING -> SHUTDOWN
On invocation of shutdown(), perhaps implicitly in finalize()
(RUNNING or SHUTDOWN) -> STOP
On invocation of shutdownNow()
SHUTDOWN -> TIDYING
When both queue(任务队列) and pool(线程池) are empty
STOP -> TIDYING
When pool is empty
TIDYING -> TERMINATED
When the terminated() hook method has completed
shutdown 方法将线程池状态置为 SHUTDOWN,线程池并不会立即停止,要等正在执行和队列里等待的任务执行完才会停止。
看到调用 shutdownNow 后,第一个任务0正在睡眠(sleep)的时候,触发了 interrupt 中断,之前等待的任务1-5被从队列中清除并返回,之后的任务被拒绝。该方法是通过 interrupt 方法去终止正在运行的任务的,因此无法响应 interrupt 中断的任务可能不会被终止。所以,该方法是无法保证一定能终止任务的。
所以 shutdownNow 方法将线程池状态置为 STOP,试图让线程池立刻停止,但不一定能保证立即停止,要等所有正在执行的任务(不能被 interrupt 中断的任务)执行完才能停止。
awaitTermination 的功能如下:
阻塞当前线程,等已提交和正在执行的任务都执行完,解除阻塞;当等待超过设置的时间,检查线程池是否停止,如果停止返回 true,否则返回 false,并解除阻塞
awaitTermination 和 shutdown 执行时都会申请锁,awaitTermination 需要在 shutdown 调用后调用,否则会知道超时后才会返回
参考 线程池相关-shutdown、shutdownNow和awaitTermination
线程池shutdown与shutdownNow有什么区别?
看代码主要三个区别:
1 shutdown会把线程池的状态改为SHUTDOWN,而shutdownNow把当前线程池状态改为STOP
2 shutdown只会中断所有空闲的线程,而shutdownNow会中断所有的线程。
3 shutdown返回方法为空,会将当前任务队列中的所有任务执行完毕;而shutdownNow把任务队列中的所有任务都取出来返回。
默认线程池可监控属性
taskCount: 线程池待执行的任务数量,从同步队列获取
completedTaskCount: 已经完成的任务数量 不是精确值
largestPoolSize: 线程池中曾经创建的最大的线程数量
getPoolSize: 线程池的线程数量(正在运行+空闲的)
getActiveCount: 活动的线程数(只有正在运行的)
默认扩展方法
protected void beforeExecute(Thread t, Runnable r) { } // task.run方法之前执行
protected void afterExecute(Runnable r, Throwable t) { } // task执行完之后,不管有没有异常都会执行
protected void terminated() { } //线程池终止时运行
在方法中创建并使用线程池
A pool that is no longer referenced in a program AND has no remaining threads will be shutdown automatically
allowCoreThreadTimeOut
这个参数默认值为:false
设置成true还是false需要根据你具体的业务判断,如果该业务需要执行的次数并不多,采用多线程只是为了缩短执行的时间,那么可以设置成true,毕竟用完后很长时间才会用到,线程干耗着也是耗费资源的。
但是如果是需要较高并发执行的业务,那么可以设置为false,保留着线程,避免每次都得创建线程耗费资源。
创建后的初始线程数为 0
默认情况下,线程池创建后的初始线程数为 0,当有任务到来就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize,就会把到达的任务放到缓存队列当中。
自定义线程名称
spring安全关闭线程池
spring线程池参数 setWaitForTasksToCompleteOnShutdown(true)该方法就是这里的关键,用来设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁。同时,这里还设置了setAwaitTerminationSeconds(60),该方法用来设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
异常处理
当线程池中线程频繁出现未捕获的异常,那线程的复用率就大大降低了,需要不断地创建新线程。
1、线程池中线程中异常尽量手动捕获
2、通过设置ThreadFactory的UncaughtExceptionHandler可以对未捕获的异常做保底处理,通过execute提交任务,线程依然会中断,而通过submit提交任务,不会中断线程,线程异常会在get执行结果时抛出。
提交任务执行流程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 线程池当前线程数小于 corePoolSize 时进入if条件调用 addWorker 创建核心线程来执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 线程池当前线程数大于或等于 corePoolSize ,就将任务添加到 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
// 获取到当前线程的状态,赋值给 recheck ,是为了重新检查状态
int recheck = ctl.get();
// 如果 isRunning 返回 false ,那就 remove 掉这个任务,然后执行拒绝策略,也就是回滚重新排队
if (! isRunning(recheck) && remove(command))
reject(command);
// 线程池处于 running 状态,但是没有线程,那就创建线程执行任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果任务放入 workQueue 失败,则尝试通过创建非核心线程来执行任务
// 创建非核心线程失败,则说明线程池已经关闭或者已经饱和,会执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
创建工作线程流程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
//情况1 rs >= SHUTDOWN && rs != SHUTDOWN
//情况2 rs >= SHUTDOWN && firstTask != null (是正在提交的任务)
//情况3 rs >= SHUTDOWN && workQueue.isEmpty()
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false;
for (; ; ) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;
// 跳出两层for循环,执行后面的语句
if (compareAndIncrementWorkerCount(c)) break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//情况1 处于Running状态
//情况2 处于Shutdown状态 && 创建的线程用于执行从任务队列获取的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize) largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted) addWorkerFailed(w);
}
return workerStarted;
}
线程执行任务流程
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
//先interrupt线程,然后线程执行遇到阻塞方法时会直接抛出InterruptedException
wt.interrupt();
try {
beforeExecute(wt, task);//回调方法
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);//回调方法
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
从任务同步队列获取任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//for + cas
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//为什么只减少了工作线程个数?因为如果获取任务为null,当前工作线程就会回收(workers.remove(w))
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//情况1 (wc > maximumPoolSize || (timed && timedOut) && wc >1
//情况2 (wc > maximumPoolSize || (timed && timedOut) && 任务队列为空
if ((wc > maximumPoolSize || (timed && timedOut)) &&
(wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) return null;
continue;
}
try {
//大于核心线程数时会在指定时间内获取任务,超过指定时间就会返回空
//正常情况,获取任务会一直阻塞,直到取到任务
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null) return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
参考 从源码分析几道必问线程池的面试题?
线程执行结束时处理逻辑
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
// completedAbruptly=false 表示不是异常退出
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty()) min = 1;
if (workerCountOf(c) >= min) return; // replacement not needed
}
//小于核心线程数时会再创建一个工作线程
addWorker(null, false);
}
}