干货博客
https://www.jianshu.com/p/389b58856894
思想
池化技术
池化技术应用:线程池、数据库连接池、http连接池等等。
池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
通过将资源池化,统一管理,可以方便管控。例如提供资源监控,调度策略等能力
线程池的好处
降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度:当任务到达时,可以不需要等待线程创建就能立即执行。
提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,监控和调优。
原理
下图所示为线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者—消费者模型
设计的关键点,即调度策略制定
- 队列设置多长?如果是无界的,调用方不断地往队列中放任务,可能导致内存耗尽。如果是有界的,当队列满了之后,调用方如何处理?
- 线程池中的线程个数是固定的,还是动态变化的?
- 每次提交新任务,是放入队列?还是开新线程?
- 当没有任务的时候,线程是睡眠一小段时间?还是进入阻塞?如果进入阻塞,如何唤醒?
JDK的实现
ThreadPoolExector/ScheduledThreadPoolExecutor
- 基于阻塞队列实现(即设计关键点:4)
- 通过构造方法入参,让我们定制具体的调度策略(即设计关键点:1,2,3)
JDK中的线程池
类图
种类
- ThreadPoolExector:线程池
- ScheduledThreadPoolExecutor:定时调度的线程池
ThreadPoolExector
概念与逻辑模型
本节的描述不是很严谨,适宜快速理解
概念
- 核心线程是否允许关闭:默认为否
- 核心线程数:若核心线程不允许关闭,则核心线程为线程池的最小线程数量,会一直存活。若核心线程允许关闭,则线程池最小线程数可以为0,不受核心线程数控制。
- 最大线程数数:线程池中同时线程的最大上限。
- 线程超时时间:当 本线程超过超时时间,若 (核心线程不允许关闭 并 线程数量大于核心线程数)或 核心线程允许关闭,本线程会被关闭
- 任务缓冲队列:是一个阻塞队列,当前线程数大于核心线程数时,实时缓冲(体现在实时放任务)一定数量的(即缓冲队列的大小)任务。如果此时有空闲线程,则空闲线程直接执行任务(这个就是阻塞取任务的意义)。若不能实时缓冲(即放入任务后会超过缓冲队列的大小),就尝试新建任务执行。
模型
- 线程池中有 核心线程数,最大线程数,线程保证存活时间,核心线程是否允许关闭,线程集合,任务缓冲-阻塞队列,拒绝策略 等 要素
- 线程池负责调度线程
- 注意在线程池中的线程都是等同的,并不区分某个线程是否是核心线程,
- 线程池 是 依据 存活的线程数量 和 线程是否空闲,选择适宜的调度策略(即执行任务的方式)。
调度策略
- 线程池初始化时不初始化线程
- 我们将任务提交到线程池时,线程池的调度策略
- 任务执行策略
- 当线程池的线程数量小于核心线程数时,新建线程执行(即使有空闲线程也不用)
- 当线程池的线程数量大于等于核心线程数时
- 优先使用空闲线程执行(即 可以实时放入缓冲队列,即 放入后不超过缓冲队列的大小)
- 再尝试扩张线程池,新建线程执行(即 线程池的线程数小于最大线程数)
- 扩张失败后,调用拒绝策略(即 线程池的线程数大于等于最大线程数)
-
线程超时策略
本线程超时后,
* 若核心线程允许关闭,则关闭本线程
* 若核心线程不允许关闭 并 当前线程数大于核心线程数,则关闭本线程
- 任务执行策略
- 线程池的关闭
- shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。
- shutdownNow()会将线程池状态置为SHUTDOWN,对所有线程执行interrupt()操作,清空队列,并将队列中的任务返回回来。
使用
- 根据具体场景构造合适的线程池。应使用ThreadPoolExecutor构造器构造线程池,而不是Executor的工厂方法,因为使用ThreadPoolExecutor构造器可以更清楚的知道线程池的细节,并且可以定制参数。
- 任务的提交
- execute()提交无返回结果的任务
- submit()提交有返回结果的任务
- 注意线程池的关闭
- shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。
- shutdownNow()会将线程池状态置为SHUTDOWN,对所有线程执行interrupt()操作,清空队列,并将队列中的任务返回回来。
- 使用钩子方法定制功能,增强ThreadPoolExector。比如集成日志或者监控
// ThreadPoolExector的钩子方法
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
原理
源码的简要流程
只关注主干逻辑,忽略加锁,异常和线程池关闭等分支
- 构造一个线程池,只是设置线程池参数,没做其他处理。
- 使用线程池执行 任务
- 当前线程数小于核心线程数,则开启新线程并执行任务
- 当超过核心线程数时,
- 先使用offer方法 将任务提交到 任务缓冲-阻塞队列,直接返回成功或失败。(注意不是阻塞当前线程等待消费)
- 提交成功,即在 任务缓冲-阻塞队列 中等待 空闲线程消费
- 提交失败
- 在不大于最大线程数的条件下,启动线程并执行任务
- 若启动线程失败,即超过最大线程数等情况,则调用拒绝策略,拒绝任务。
- 在不大于最大线程数的条件下,启动线程并执行任务
- 先使用offer方法 将任务提交到 任务缓冲-阻塞队列,直接返回成功或失败。(注意不是阻塞当前线程等待消费)
- 线程(这里指的是Worker)的逻辑
- 线程启动后,会立即执行第一个任务
-
执行完第一个任务后,会不断从缓存队列中获取任务,并执行,若没有获取到任务则终止线程
获取任务的逻辑 :
- 如果 线程数大于核心线程数 或 核心线程允许超时销毁,则使用带有超时时间poll方法“阻塞地”从缓存队列中获取任务,
- 如果超时即获取不到任务
- 否则使用take方法“阻塞地”从缓存队列中获取任务
- 如果 线程数大于核心线程数 或 核心线程允许超时销毁,则使用带有超时时间poll方法“阻塞地”从缓存队列中获取任务,
-
关闭线程池:
未完待续…
核心数据结构
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 对线程池内部各种变量进行互斥访问控制
private final ReentrantLock mainLock = new ReentrantLock();
// 线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//...
}
每一个线程是一个Worker对象。Worker是ThreadPoolExector的内部类,核心数据结构如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// ...
final Thread thread; // Worker封装的线程
Runnable firstTask; // Worker接收到的第1个任务
volatile long completedTasks; // Worker执行完毕的任务个数
// ...
}
由定义会发现,Worker继承于AQS,也就是说Worker本身就是一把锁。
这把锁有什么用处呢?用于线程池的关闭、线程执行任务的过程中。
构造器,即配置参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造方法有7个参数
- corePoolSize,线程池中的核心线程数
- maximumPoolSize,线程池中的最大线程数
- keepAliveTime,空闲时间,当线程池数量超过核心线程数时,多余的空闲线程存活的时间,即:这些线程多久被销毁。
- unit,空闲时间的单位,可以是毫秒、秒、分钟、小时和天,等等
- workQueue,等待队列,线程池中的线程数超过核心线程数时,任务将放在等待队列,它是一个BlockingQueue类型的对象
- threadFactory,线程工厂,我们可以使用它来创建一个线程
- handler,拒绝策略,当线程池和等待队列都满了之后,需要通过该对象的回调函数进行回调处理
而我们更关心的是workQueue、threadFactory和handler
-
workQueue
等待队列是BlockingQueue类型的,理论上只要是它的子类,我们都可以用来作为等待队列。
jdk内部自带一些阻塞队列
ArrayBlockingQueue,队列是有界的,基于数组实现的阻塞队列
LinkedBlockingQueue,队列可以有界,也可以无界。基于链表实现的阻塞队列
PriorityBlockingQueue,带优先级的无界阻塞队列
SynchronousQueue,不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作将一直处于阻塞状态。
该队列也是Executors.newCachedThreadPool()的默认队列
-
threadFactory
ThreadFactory是一个接口,只有一个方法。用于生产线程。
public interface ThreadFactory {
Thread newThread(Runnable r);
}
可参考默认的线程工厂实现类:DefaultThreadFactory
-
handler:拒绝策略
jdk自带4种拒绝策略,我们来看看。
AbortPolicy // 直接抛出RejectedExecutionException异常,默认的
DiscardPolicy // 任务直接丢弃,不做任何处理
DiscardOldestPolicy // 丢弃队列里最旧的那个任务,再尝试执行当前任务
CallerRunsPolicy // 在调用者线程执行
我们可以通过实现RejectedExecutionHandler接口的方式,定制自己的拒绝策略。
如何制定需要看实际的场景,比如
线上业务:如 秒杀,订单查询,下订单等,时效性要求高,在超过某个时间后,等待的意义不大了。
离线业务:如 短信提醒批处理,时效性要求不高,但是在生产消费不平衡后,很容易积累大量的待处理任务。
任务提交
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果当前线程数小于corePoolSize,则启动新线程
if (workerCountOf(c) < corePoolSize) {
// 添加Worker,并将command设置为Worker线程的第一个任务开始执行。
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果当前的线程数大于或等于corePoolSize,则调用workQueue.offer放入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池正在停止,则将command任务从队列移除,并拒绝command任务请求。
if (! isRunning(recheck) && remove(command))
reject(command);
// 放入队列中后发现没有线程执行任务,开启新线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 线程数大于maxPoolSize,并且队列已满,调用拒绝策略
else if (!addWorker(command, false))
reject(command);
}
// 该方法用于启动新线程。boolean core代表是否是核心线程。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// 如果线程池状态值起码是SHUTDOWN和STOP,或则第一个任务不是null,或者工作队列为空
// 则添加worker失败,返回false
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
// 工作线程数达到上限,要么是corePoolSize要么是maximumPoolSize,启动线程失败
if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 增加worker数量成功,返回到retry语句
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果线程池运行状态起码是SHUTDOWN,则重试retry标签语句,CAS
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// worker数量加1成功后,接着运行:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建worker对象
w = new Worker(firstTask);
// 获取线程对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c)
|| (runStateLessThan(c, STOP) && firstTask == null)) {
// 由于线程已经在运行中,无法启动,抛异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将线程对应的worker加入worker集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
// 如果添加worker成功,则启动该worker对应的线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果启动新线程失败
if (! workerStarted)
// workCount - 1
addWorkerFailed(w);
}
return workerStarted;
}
线程池关闭
未完待续…