ScheduledThreadPoolExecutor作为ScheduledExecutorService接口的实现,提供了延迟执行任务或者周期性执行任务的能力。通过名称可以看出,ScheduledThreadPoolExecutor基于线程池实现,它通过继承ThreadPoolExecutor实现线程池管理能力的复用,同时扩展了自己的定时任务调度能力。
首先来看ScheduledExecutorServicej接口,它继承了ExecutorService接口,作为任务执行器的一种扩展类型,提供了如下方法:
schedule方法:用于任务的单次执行,允许指定延迟时间,当时间为0或者负数时,表示立即执行任务;
scheduleAtFixedRate方法:以固定的时间间隔执行任务,当任务本身的执行时间超过时间间隔时,会等到任务执行完成后,立即执行下一次任务;同一个任务总是串行执行,不会并发执行;
scheduleWithFixedDelay方法:以固定的延迟执行任务,当前任务执行时间与上一次任务执行时间相隔固定的延迟;任务每次执行完成后,会在结束时间上加上固定的延迟作为下一次执行时间。任务执行的周期会将任务本身执行耗时考虑在内,因而并非每次执行的时间间隔都相同;
ScheduledThreadPoolExecutor继承ThreadPoolExecutor,主要做了如下改变:
使用ScheduledFutureTask作为任务封装类,代替原先的FutureTask类;
使用DelayedWorkQueue作为阻塞队列,队列为无界队列;ScheduledThreadPoolExecutor的构造器仅需要传入corePoolSize,使用"corePoolSize+无界队列"实现任务调度;
支持run-after-shutdown参数,使得ScheduledThreadPoolExecutor重写shutdown方法,允许移除并且取消不需要在shutdown后执行的任务;
提供了decorateTask方法,用来定制任务操作;

ScheduledThreadPoolExecutor由3部分组成:
任务调度控制:ScheduledThreadPoolExecutor,负责任务调度控制,实现了ScheduledExecutorService接口;
阻塞队列:DelayedWorkQueue,作为ScheduledThreadPoolExecutor的内部类,用于缓存线程任务的阻塞队列,仅能够存放RunnableScheduledFuture对象;该队列实现了延迟调度任务的逻辑,如果当前时间大于等于任务的延迟执行时间,任务才可以被调度。
调度任务:ScheduledFutureTask,作为ScheduledFutureTask的内部类,实现了RunnableScheduledFuture,封装了调度任务的执行逻辑。其中的time字段存放下一次执行时间,DelayedWorkQueue会据此判断任务是否可以被执行。period字段存放执行周期,对于周期性执行任务,每次会根据period计算time。
ScheduledThreadPoolExecutor的构造器最多指定3个参数:
corePoolSize:线程池核心工作线程数量;
threadFactory:定制工作线程创建方式;
handler:驳回任务处理策略;
ScheduledThreadPoolExecutor构造器会调用父类构造器进行线程池初始化,使用DelayedWorkQueue作为阻塞队列,该队列为无界队列,因而maximumPoolSize属性配置无效。又因为都是核心工作线程,没有非核心线程需要回收,因而keepAliveTime配置为0。代码如下:
ScheduledThreadPoolExecutor初始化时并不会预先创建工作线程,而是在提交任务的时候,通过父类java.util.concurrent.ThreadPoolExecutor#ensurePrestart方法判断线程数是否达到corePoolSize,如果未达到,则新增线程;实现逻辑如下:
ScheduledThreadPoolExecutor的任务执行分为单次执行和周期性执行。
单次执行:通过schedule方法执行的任务属于单次执行任务。Executor的execute方法、ExecutorService的submit方法都是通过调用schedule方法执行,故也是单次执行的任务。除了schedule可以指定延迟时间以外,其余方法的延迟时间均为0,即立刻执行任务。比如:execute方法实现如下:
周期性执行:通过scheduleAtFixedRate、scheduleWithFixedDelay方法执行的任务均为周期性执行任务。周期性执行的实现可以理解为每次执行完成后设定下一次执行时间,然后将任务重新放入到阻塞队列等待下一次调度。
无论是单次执行还是周期性执行,其执行的入口都是delayedExecute方法。delayedExecute()将任务放入到阻塞队列中,复用ThreadPoolExecutor的逻辑进行任务调度。代码如下:
当ThreadPoolExecutor的Worker线程从阻塞队列取出任务执行时,会调用ScheduledFutureTask的run方法。该方法对任务类型进行判断,如果是单次执行任务,则立即执行并设置返回结果。如果是周期性执行任务,则执行任务并设置下一次执行时间,然后将任务放入到阻塞队列中,等待下一次调度。方法代码如下:
schedule的执行主要分为参数封装和执行两个步骤。实现如下:
参数封装过程会调用decorateTask方法,该方法为protected的空方法,用于定制RunnableScheduledFuture的属性,可以通过重写实现定制。
scheduleAtFixedRate()的实现与schedule()方法非常相似,仅是将decorateTask()返回的RunnableScheduledFuture对象设置为原有Future的outerTask属性。在重新知心任务时,会将outerTask添加到阻塞队列,从而保证decorateTask()的定制效果一直有效。
scheduleAtFixedRate()的实现与schedule()方法非常相似,仅是设置ScheduledFutureTask延迟时,使用负数,标识执行方式为scheduleAtFixedRate。
ScheduledFutureTask并没有设置单独的字段用于标识执行类型,而是通过period字段的正负号和是否为0表示执行方式:
正数:fixed-rate执行方式;
负数:fixed-delay执行方式;
0:单次执行任务;
scheduleAtFixedRate() / scheduleWithFixedDelay()执行的主要区别在于设置下一次执行时间的策略不同,而执行时间通过ScheduledFutureTask的time字段保存,通过ScheduledFutureTask#setNextRunTime()进行设置,代码如下:
DelayedWorkQueue是专门存放RunnableScheduledFuture和ScheduledFutureTask对象的优先队列,底层基于最小二叉堆实现,为了能够提升任务的查找和删除效率,ScheduledFutureTask中增加了一个heapIndex的成员变量,用于存放任务在堆数组中的索引位置,当需要查找或者删除某个特定的任务时,直接根据任务的heapIndex访问堆数组中的元素。任务是否到达执行时间的判断逻辑均在DelayedWorkQueue中实现。
与PriorityQueue的实现不同,DelayedWorkQueue涉及到多线程访问,因而需要保证线程同步测正确性,故使用ReentrantLock来控制操作的原子性,同时使用Condition来协调线程的执行;
为了方便在DelayedWorkQueue中查找和删除任务,ScheduledFutureTask有一个heapIndex用于存放任务在堆数组中的索引位置。每当任务在队列中的位置改变时,需要同步更新任务的heapIndex。
上浮、下沉操作的实现与PriorityQueue实现相似,只多了更新索引位置的操作,且需要在加锁的环境下调用。
通过上面代码我们总结DelayedWorkQueue的实现原理:
1)基于最小二叉堆实现的优先队列,根据ScheduledFutureTask.compareTo方法比较任务执行时间,使得最近要执行的任务位于队首;
2)任务出队时,通过轮询判断任务是否到达执行时间点,ScheduledFutureTask实现了Delayed接口,通过getDelay方法能够获取到任务还有多长时间执行;
3)当队列中所有任务都没有到达执行时间时,队列中会维持一个leader线程,用于轮询等待队首任务,其余线程均await()。
4)ScheduledFutureTask增加heapIndex属性,用于标记任务在堆数组中的索引,从而便于任务的快速查找(是否存在)与取消(删除);
任务的取消通过ScheduledFutureTask.cancel()方法实现,该方法调用ThreadPoolExecutor.cancel(),在取消任务后,判断是否需要从阻塞队列中移除任务。其中removeOnCancel参数通过setRemoveOnCancelPolicy()设置。之所以要在取消任务后移除阻塞队列中任务,是为了防止队列中积压大量已被取消的任务。
ScheduledThreadPoolExecutor的shutdown() / shutdownNow()方法均调用ThreadPoolExecutor的相应方法实现。同时,ScheduledThreadPoolExecutor实现了ThreadPoolExecutor的onShutdown()用于在shutdown()执行过程中取消任务执行。
此处涉及2个参数:
executeExistingDelayedTasksAfterShutdown:当执行shutdown()后,是否继续执行队列中的单次执行任务;默认为true,即执行;
continueExistingPeriodicTasksAfterShutdown:当执行shutdown()后,是否继续执行队列中的周期性任务;默认为false,即不执行;