系列文章目录
线程安全(一)java对象头分析以及锁状态
线程安全(二)java中的CAS机制
线程安全(三)实现方法sychronized与ReentrantLock(阻塞同步)
线程安全(四)Java内存模型与volatile关键字
线程安全(五)线程状态和线程创建
线程安全(六)线程池
线程安全(七)ThreadLocal和java的四种引用
线程安全(八)Semaphore
线程安全(九)CyclicBarrier
线程安全(十)AQS(AbstractQueuedSynchronizer)
0.前言
为什么要使用线程池?
直接使用线程会有线程重复的创建和销毁
优点:
- 降低内存消耗,避免线程重复的创建和销毁
- 可以创建核心线程数,可直接使用,不需要再创建
- 显性的管理监控线程,防止创建过多
先说结果
再看类图
1.Executor 与ExecutorService接口
1.1.Executor接口
// 提交任务的办法
void execute(Runnable command);
1.2.ExecutorService接口
// 关闭线程池,已提交的任务继续执行,不接受继续提交新任务
void shutdown();
// 关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务
// 它和前面的方法相比,加了一个单词“now”,区别在于它会去停止当前正在进行的任务
List<Runnable> shutdownNow();
// 线程池是否已关闭
boolean isShutdown();
// 如果调用了 shutdown() 或 shutdownNow() 方法后,所有任务结束了,那么返回true
// 这个方法必须在调用shutdown或shutdownNow方法之后调用才会返回true
boolean isTerminated();
// 等待所有任务完成,并设置超时时间
// 我们这么理解,实际应用中是,先调用 shutdown 或 shutdownNow,
// 然后再调这个方法等待所有的线程真正地完成,返回值意味着有没有超时
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交一个 Callable 任务
<T> Future<T> submit(Callable<T> task);
// 提交一个 Runnable 任务,第二个参数将会放到 Future 中,作为返回值,
// 因为 Runnable 的 run 方法本身并不返回任何东西
<T> Future<T> submit(Runnable task, T result);
// 提交一个 Runnable 任务
Future<?> submit(Runnable task);
// 执行所有任务,返回 Future 类型的一个 list
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 也是执行所有任务,但是这里设置了超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 同上一个方法,只有其中的一个任务结束了,就可以返回,返回执行完的那个任务的结果,
// 不过这个带超时,超过指定的时间,抛出 TimeoutException 异常
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
2.AbstractExecutorService类
// 构建RunnableFuture
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
// 提交任务
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
// 提交任务
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
// 提交任务
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
不难看出,上述只是实现提交任务,没有真正开启线程,真正运行的还是excute方法。
3.ThreadPoolExecutor
3.1.属性
32位的整数,前3位用于存放线程池状态,低29位标识线程数。
// 原子性操作变量,CAS的体现
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 29位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池的最大线程数
111 11111111111111111111111111(29个1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState存储在高阶位
// 运行:接受新任务并处理队列任务
111 00000000000000000000000000000(111后29个0)
private static final int RUNNING = -1 << COUNT_BITS;
// 关机:不接受新任务,但处理队列任务
000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 停止:不接受新任务,不处理排队任务,并中断正在进行的任务
001 00000000000000000000000000000(1后29个0)
private static final int STOP = 1 << COUNT_BITS;
// 整理:所有任务已经终止
010 00000000000000000000000000000(10后29个0)
private static final int TIDYING = 2 << COUNT_BITS;
// terminated() 方法结束后,已完成
011 00000000000000000000000000000(11后29个0)
private static final int TERMINATED = 3 << COUNT_BITS;
3.2.线程状态方法
// CAPACITY值为:11111111111111111111111111111
// ~CAPACITY值为:11100000000000000000000000000000
// &操作,位运算结果将低29位置0了,而高3位还是保持c的值,得到是runState的值
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 这个方法用于取出workerCount的值
// &操作,位运算结果将高3位置0了,保留参数c的低29位,得到workerCount的值
private static int workerCountOf(int c) { return c & CAPACITY; }
// 之前说过 32为2进制数,前三位存的是线程状态,后29位存的是线程数,这个运算使将他俩合在一起 取rs(runState)的前三位和wc(workerCount)的后29位
private static int ctlOf(int rs, int wc) { return rs | wc; }
// c是否小于s
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// c是否大于等于s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// c是否小于SHUTDOEN 即是否在run 只有RUNNING状态会小于0
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// 线程号+1
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// 线程号-1
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// 一个线程突然终止 减少线程号
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
3.2.相关属性
// 队列 用于保存任务并传递给工作线程的队列
private final BlockingQueue<Runnable> workQueue;
// 锁
private final ReentrantLock mainLock = new ReentrantLock();
// 包含所有工作线程,只有持有mainLock的才能进入
private final HashSet<Worker> workers = new HashSet<Worker>();
// 等待条件支持awaittermination
private final Condition termination = mainLock.newCondition();
//最大的池大小
private int largestPoolSize;
//完成任务的计数器。仅在工作线程终止时更新。只有在mainlock访问
private long completedTaskCount;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 饱和和关闭时使用
private volatile RejectedExecutionHandler handler;
// 线程从队列中获取任务的超时时间,也就是说如果线程空闲超过这个时间就会终止。
private volatile long keepAliveTime;
// 如果为false(默认),即使空闲时,核心线程仍然保持生存状态。
// 如果为true,核心线程使用keepalivetime超时等待工作。
private volatile boolean allowCoreThreadTimeOut;
// 核心线程数量
private volatile int corePoolSize;
// 最大线程数。注意,实际最大值是由容量内部限定的。
private volatile int maximumPoolSize;
// 默认拒绝 程序
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
//
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
// 终结器
private final AccessControlContext acc;
3.3.构造方法
// 核心线程数,最大线程数,保持时间,单位,队列
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// 核心线程数,最大线程数,保持时间,单位,队列,线程工厂
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
// 核心线程数,最大线程数,保持时间,单位,队列, handler
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
// 核心线程数,最大线程数,保持时间,单位,队列,线程工厂,handler
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
3.4. void execute(Runnable command)
//
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 1.如果少于核心线程数的线程在运行,启动一个新线程,原子性检查运行状态和线程号,
* 通过返回false来防止错误的新建线程
* 2. 如果一个任务可以成功地排队,那么我们仍然需要仔细检查是否应该添加一个线程。
* (自从上次检查结束后,有的线程有的已经死亡)或者自从进入这个方法后池就关闭了。
*所以我们重新检查状态,如果停止,需要回滚入队,如果没有, 或启动一个新线程。
* 3.如果我们不能对任务进行队列,那么我们尝试添加一个新线程。
* 如果失败,我们知道我们被关闭或饱和,所以任务倍拒绝。
// 获取线程状态和线程数
int c = ctl.get();
// 活动线程数 小与 核心线程数,直接启动新的线程,在增加时会原子性检测
// 1,小于核心线程新建线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.线程池处于运行状态,并且队列未满
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 不是runState 则从workQueue中移除任务并拒绝
if (! isRunning(recheck) && remove(command))
// 采用线程池指定的策略拒绝任务
reject(command);
// 线程池处于RUNNING状态 || 线程池处于非RUNNING状态但是任务移除失败
else if (workerCountOf(recheck) == 0)
// 这行代码是为了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
// 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务
addWorker(null, false);
// 两种情况:
// 1.非RUNNING状态拒绝新的任务
// 2.队列满了启动新的线程失败(workCount > maximumPoolSize)
}
// 如果 workQueue 队列满了,/ 以 maximumPoolSize 为界创建新的 worker,
// 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
3.5. void shutdown()
// 会将runState置为SHUTDOWN,会终止所有空闲的线程。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回
advanceRunState(SHUTDOWN);
// 注意这里是中断所有空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit →
// tryTerminate方法中会保证队列中剩余的任务得到执行。
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// w.tryLock能获取到锁,说明该线程没有在运行,因为runWorker中执行任务会先lock,
// 因此保证了中断的肯定是空闲的线程。
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
3.6. List shutdownNow()
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// STOP状态:不再接受新任务且不再执行队列中的任务。
advanceRunState(STOP);
// 中断所有线程
interruptWorkers();
// 返回队列中还没有被执行的任务。
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
void interruptIfStarted() {
Thread t;
// 初始化时state == -1
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
3.7. void runWorker(Worker w)
// 第一次启动会执行初始化传进来的任务firstTask;
// 然后会从workQueue中取任务执行,如果队列为空则等待keepAliveTime这么长时间。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// Worker的构造函数中抑制了线程中断setState(-1),所以这里需要unlock从而允许中断
w.unlock(); // allow interrupts
// 为true的情况:1.执行任务抛出异常;2.被中断。
boolean completedAbruptly = true;
try {
// 如果getTask返回null那么getTask中会将workerCount递减,如果异常了这个递减操作会在processWorkerExit中处理
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 任务执行前可以插入一些处理,子类重载该方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行用户任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 和beforeExecute一样,留给子类去重载
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 结束线程的一些清理工作
processWorkerExit(w, completedAbruptly);
}
}
3.8. boolean addWorker(Runnable firstTask, boolean core)
// 第一个为当前任务,第二个为是否为核心线程,是用corePoolSize ,false用maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 这条语句等价:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||
// workQueue.isEmpty())
// 满足下列调价则直接返回false,线程创建失败:
// rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此时不再接受新的任务,且所有任务执行结束
// rs = SHUTDOWN:firtTask != null 此时不再接受任务,但是仍然会执行队列中的任务
// rs = SHUTDOWN:firtTask == null见execute方法的addWorker(null,
// false),任务为null && 队列为空
// 最后一种情况也就是说SHUTDONW状态下,如果队列不为空还得接着往下执行,为什么?add一个null任务目的到底是什么?
// 看execute方法只有workCount==0的时候firstTask才会为null结合这里的条件就是线程池SHUTDOWN了不再接受新任务
// 但是此时队列不为空,那么还得创建线程把任务给执行完才行
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 走到这的情形:
// 1.线程池状态为RUNNING
// 2.SHUTDOWN状态,但队列中还有任务需要执行
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 原子操作递增workCount
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果线程池的状态发生变化则重试
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// wokerCount递增成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 并发的访问线程池workers对象必须加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动新添加的线程,这个线程首先执行firstTask,然后不停的从队列中取任务执行
// 当等待keepAlieTime还没有任务执行则该线程结束。见runWoker和getTask方法的代码。
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 线程启动失败,则从wokers中移除w并递减wokerCount
if (! workerStarted)
// 递减wokerCount会触发tryTerminate方法
addWorkerFailed(w);
}
return workerStarted;
}
3.9 invokeAll(Collection<? extends Callable> tasks,long timeout, TimeUnit unit)
批量执行线程任务时设置的的超时时间(常用于Future,需要拿到所有线程返回结果的值)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
// 所有任务执行execute,同步执行
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}
for (int i = 0; i < size; i++) {
// 判断每个future是否执行完成
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
// 在限定时间内拿到结果
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
// 根据目前已用时间 更新下个future的限定时间
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
// 只要非完成的,就会中断所有线程
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
4.常见线程池
Executors是一个工具类,生成几种常见的线程池
附:图片来源于网络,侵删
4.1.newFixedThreadPool()
生成一个固定大小的线程池(核心线程数和最大线程数是一样的,超出核心线程数的新提交的任务都会加入无界队列)
因为是无界队列,所以最大线程数其实用不到
创建一个线程池,该线程池重用在共享无界队列上运行的固定数量的线程。在任何时候,最多 nThreads 个线程将是活动的处理任务。如果在所有线程都处于活动状态时提交了其他任务,它们将在队列中等待,直到有线程可用。如果任何线程在关闭之前的执行过程中由于失败而终止,如果需要执行后续任务,新的线程将取代它。池中的线程将一直存在,直到显式关闭。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
4.2.newSingleThreadExecutor()
生成只有一个线程的固定线程池,设置线程数为 1
因为是无界队列,所以最大线程数其实用不到
创建一个使用单个工作线程在无界队列上运行的 Executor。 (但请注意,如果该单线程在关闭前的执行过程中因故障而终止,如果需要执行后续任务,则新线程将取代它。)任务保证按顺序执行,并且不会有超过一个任务处于活动状态在任何给定时间。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
4.3.newCachedThreadPool()
生成一个需要的时候就创建新的线程,同时可以复用之前创建的线程(如果这个线程当前没有任务)的线程池。
SynchronousQueue内部没有数据的存储空间,任何入队的线程都会阻塞,直到有线程来出队,可以保证如果加入一个数据,必须要remove掉,才能再次计入
创建一个线程池,根据需要创建新线程,但在可用时将重用以前构造的线程。这些池通常会提高执行许多短期异步任务的程序的性能。如果可用,对执行的调用将重用以前构造的线程。如果没有可用的现有线程,将创建一个新线程并将其添加到池中。六十秒内未使用的线程将被终止并从缓存中删除。因此,保持空闲足够长时间的池不会消耗任何资源。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
4.4.newScheduledThreadPool()
定期或延时执行
创建一个线程池,可以安排命令在给定延迟后运行,或定期执行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
5.线程池关键属性
5.1.线程池创建参数
- corePoolSize(核心线程数)
- maximumPoolSize(最大线程数)
- workQueue(队列)
- keepAliveTime(空闲时间)
-
rejectedExecutionHandler(拒绝策略)
corePoolSize 到 maximumPoolSize 之间的线程会被回收,当然 corePoolSize 的线程也可以通过设置而得到回收(allowCoreThreadTimeOut(true))。
workQueue 用于存放任务,添加任务的时候,如果当前线程数超过了 corePoolSize,那么往该队列中插入任务,线程池中的线程会负责到队列中拉取任务。
keepAliveTime 用于设置空闲时间,如果线程数超出了 corePoolSize,并且有些线程的空闲时间超过了这个值,会执行关闭这些线程的操作
rejectedExecutionHandler 用于处理当线程池不能执行此任务时的情况。
- AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常。
- DiscardPolicy:丢弃任务,但是不抛出异常。
- DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。
- CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务。