簡介
ThreadPoolExecutor是Executors中一部分功能,下面來介紹另外一部分功能也就是ScheduledThreadPoolExecutor的實作,後者是一個可以在一定延遲時候或者定時進行任務排程的線程池。
Executors其實是個工具類,裡面提供了好多靜态方法,根據使用者選擇傳回不同的線程池執行個體。
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor并實作ScheduledExecutorService接口,關于ThreadPoolExecutor的介紹可以參考:
http://www.jianshu.com/p/3cc67876375f
線程池隊列是DelayedWorkQueue,它是對delayqueue的優化,關于delayqueue參考:http://www.jianshu.com/p/2659eb72134b
ScheduledFutureTask是阻塞隊列元素是對任務修飾。
Executors的類圖結構如下:

ScheduledThreadPoolExecutor的構造函數如下:
//使用改造後的delayqueue.
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
複制
執行個體
// 任務間以固定時間間隔執行,延遲1s後開始執行任務,任務執行完畢後間隔2s再次執行,任務執行完畢後間隔2s再次執行,依次往複
static void scheduleWithFixedDelay() throws InterruptedException, ExecutionException {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
ScheduledFuture<?> result = executorService.scheduleWithFixedDelay(new Runnable() {
public void run() {
System.out.println(System.currentTimeMillis());
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
// 由于是定時任務,一直不會傳回
result.get();
System.out.println("over");
}
// 相對開始加入任務的時間點固定頻率執行:從加入任務開始算1s後開始執行任務,1+2s開始執行,1+2*2s執行,1+n*2s開始執行;
// 但是如果執行任務時間大約2s則不會并發執行後續任務将會延遲。
static void scheduleAtFixedRate() throws InterruptedException, ExecutionException {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
ScheduledFuture<?> result = executorService.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println(System.currentTimeMillis());
}
}, 1000, 2000, TimeUnit.MILLISECONDS);
// 由于是定時任務,一直不會傳回
result.get();
System.out.println("over");
}
// 延遲1s後開始執行,隻執行一次,沒有傳回值
static void scheduleRunable() throws InterruptedException, ExecutionException {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
ScheduledFuture<?> result = executorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println("gh");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, 1000, TimeUnit.MILLISECONDS);
System.out.println(result.get());
}
// 延遲1s後開始執行,隻執行一次,有傳回值
static void scheduleCaller() throws InterruptedException, ExecutionException {
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
ScheduledFuture<String> result = executorService.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "gh";
}
}, 1000, TimeUnit.MILLISECONDS);
// 阻塞,直到任務執行完成
System.out.print(result.get());
}
複制
源碼分析
schedule函數
public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
//裝飾任務,主要實作public long getDelay(TimeUnit unit)和int compareTo(Delayed other)方法
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//添加任務到延遲隊列
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果線程池關閉了,則拒絕任務
if (isShutdown())
reject(task);
else {
//添加任務到隊列
super.getQueue().add(task);
//再次檢查線程池關閉
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//確定至少一個線程在處理任務,即使核心線程數corePoolSize為0
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
//增加核心線程數
if (wc < corePoolSize)
addWorker(null, true);
//如果初始化corePoolSize==0,則也添加一個線程。
else if (wc == 0)
addWorker(null, false);
}
複制
上面做的首先吧runnable裝飾為delay隊列所需要的格式的元素,然後把元素加入到阻塞隊列,然後線程池線程會從阻塞隊列擷取逾時的元素任務進行處理,下面看下隊列元素如何實作的。
//r為被修飾任務,result=null,ns為目前時間加上delay時間後的
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
//通過擴充卡把runnable轉換為callable
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
複制
修飾後把目前任務修飾為了delay隊列所需元素,下面看下元素的兩個重要方法:過期時間計算和元素比較。
過期時間計算
//元素過期算法,裝飾後時間-目前時間,就是即将過期剩餘時間
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}
複制
元素比較
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
複制
schedule(Callable callable,
long delay,
TimeUnit unit)和schedule(Runnable command, long delay,TimeUnit unit)類似。
compareTo作用是在加入元素到dealy隊列時候進行比較,需要調整堆讓最快要過期的元素放到隊首。是以無論什麼時候向隊列裡面添加元素,隊首的都是最即将過期的元素。
scheduleWithFixedDelay函數
定時排程:相鄰任務間時間固定。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
//修飾包裝,注意這裡是period=-delay<0
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//添加任務到隊列
delayedExecute(t);
return t;
}
//period為 delay時間
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
複制
任務添加到隊列後,工作線程會從隊列擷取并移除到期的元素,然後執行run方法,是以下面看看ScheduledFutureTask的run方法如何實作定時排程的。
public void run() {
//是否隻執行一次
boolean periodic = isPeriodic();
//取消任務
if (!canRunInCurrentRunState(periodic))
cancel(false);
//隻執行一次,調用schdule時候
else if (!periodic)
ScheduledFutureTask.super.run();
//定時執行
else if (ScheduledFutureTask.super.runAndReset()) {
//設定time=time+period
setNextRunTime();
//重新加入該任務到delay隊列
reExecutePeriodic(outerTask);
}
}
複制
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else//由于period=-delay是以執行這裡,設定time=now()+delay
time = triggerTime(-p);
}
複制
定時排程是先從隊列擷取任務然後執行,然後在重新設定任務時間,在把任務放入隊列實作的。
如果任務執行時間大于delay時間則等任務執行完畢後的delay時間後在次調用任務,不會同一個任務并發執行。
scheduleAtFixedRate函數
定時排程:相對起始時間點固定頻率調用。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//裝飾任務類,注意period=period>0,不是負的
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//添加任務到隊列
delayedExecute(t);
return t;
}
複制
private void setNextRunTime() {
long p = period;
//period=delay;
if (p > 0)
time += p;//由于period>0是以執行這裡,設定time=time+delay
else
time = triggerTime(-p);
}
複制
相對于上面delay,rate方式執行規則為時間為initdelday + n*period;時候啟動任務,但是如果目前任務還沒有執行完,要等到目前任務執行完畢後在執行一個任務。