天天看點

「高并發」深度解析ScheduledThreadPoolExecutor類的源代碼

在【高并發專題】的專欄中,我們深度分析了ThreadPoolExecutor類的源代碼,而ScheduledThreadPoolExecutor類是ThreadPoolExecutor類的子類。今天我們就來一起手撕ScheduledThreadPoolExecutor類的源代碼。

構造方法

我們先來看下ScheduledThreadPoolExecutor的構造方法,源代碼如下所示。

public ScheduledThreadPoolExecutor(int corePoolSize) {
 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    new DelayedWorkQueue(), threadFactory, handler);
}
           

從代碼結構上來看,ScheduledThreadPoolExecutor類是ThreadPoolExecutor類的子類,ScheduledThreadPoolExecutor類的構造方法實際上調用的是ThreadPoolExecutor類的構造方法。

schedule方法

接下來,我們看一下ScheduledThreadPoolExecutor類的schedule方法,源代碼如下所示。

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
 //如果傳遞的Runnable對象和TimeUnit時間機關為空
 //抛出空指針異常
 if (command == null || unit == null)
  throw new NullPointerException();
 //封裝任務對象,在decorateTask方法中直接傳回ScheduledFutureTask對象
 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
 //執行延時任務
 delayedExecute(t);
 //傳回任務
 return t;
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) 
 //如果傳遞的Callable對象和TimeUnit時間機關為空
 //抛出空指針異常
 if (callable == null || unit == null)
  throw new NullPointerException();
 //封裝任務對象,在decorateTask方法中直接傳回ScheduledFutureTask對象
 RunnableScheduledFuture<V> t = decorateTask(callable,
  new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
 //執行延時任務
 delayedExecute(t);
 //傳回任務
 return t;
}
           

從源代碼可以看出,ScheduledThreadPoolExecutor類提供了兩個重載的schedule方法,兩個schedule方法的第一個參數不同。可以傳遞Runnable接口對象,也可以傳遞Callable接口對象。在方法内部,會将Runnable接口對象和Callable接口對象封裝成RunnableScheduledFuture對象,本質上就是封裝成ScheduledFutureTask對象。并通過delayedExecute方法來執行延時任務。

在源代碼中,我們看到兩個schedule都調用了decorateTask方法,接下來,我們就看看decorateTask方法。

decorateTask方法

decorateTask方法源代碼如下所示。

protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
 return task;
}

protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
 return task;
}
           

通過源碼可以看出decorateTask方法的實作比較簡單,接收一個Runnable接口對象或者Callable接口對象和封裝的RunnableScheduledFuture任務,兩個方法都是将RunnableScheduledFuture任務直接傳回。在ScheduledThreadPoolExecutor類的子類中可以重寫這兩個方法。

接下來,我們繼續看下scheduleAtFixedRate方法。

scheduleAtFixedRate方法

scheduleAtFixedRate方法源代碼如下所示。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
 //傳入的Runnable對象和TimeUnit為空,則抛出空指針異常
 if (command == null || unit == null)
  throw new NullPointerException();
 //如果執行周期period傳入的數值小于或者等于0
 //抛出非法參數異常
 if (period <= 0)
  throw new IllegalArgumentException();
 //将Runnable對象封裝成ScheduledFutureTask任務,
 //并設定執行周期
 ScheduledFutureTask<Void> sft =
  new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
 //調用decorateTask方法,本質上還是直接傳回ScheduledFutureTask對象
 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 //設定執行的任務
 sft.outerTask = t;
 //執行延時任務
 delayedExecute(t);
 //傳回執行的任務
 return t;
}
           

通過源碼可以看出,scheduleAtFixedRate方法将傳遞的Runnable對象封裝成ScheduledFutureTask任務對象,并設定了執行周期,下一次的執行時間相對于上一次的執行時間來說,加上了period時長,時長的具體機關由TimeUnit決定。采用固定的頻率來執行定時任務。

ScheduledThreadPoolExecutor類中另一個定時排程任務的方法是scheduleWithFixedDelay方法,接下來,我們就一起看看scheduleWithFixedDelay方法。

scheduleWithFixedDelay方法

scheduleWithFixedDelay方法的源代碼如下所示。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
 //傳入的Runnable對象和TimeUnit為空,則抛出空指針異常
 if (command == null || unit == null)
  throw new NullPointerException();
 //任務延時時長小于或者等于0,則抛出非法參數異常
 if (delay <= 0)
  throw new IllegalArgumentException();
 //将Runnable對象封裝成ScheduledFutureTask任務
 //并設定固定的執行周期來執行任務
 ScheduledFutureTask<Void> sft =
  new ScheduledFutureTask<Void>(command, null,triggerTime(initialDelay, unit), unit.toNanos(-delay));
 //調用decorateTask方法,本質上直接傳回ScheduledFutureTask任務
 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 //設定執行的任務
 sft.outerTask = t;
 //執行延時任務
 delayedExecute(t);
 //傳回任務
 return t;
}
           

從scheduleWithFixedDelay方法的源代碼,我們可以看出在将Runnable對象封裝成ScheduledFutureTask時,設定了執行周期,但是此時設定的執行周期與scheduleAtFixedRate方法設定的執行周期不同。此時設定的執行周期規則為:下一次任務執行的時間是上一次任務完成的時間加上delay時長,時長機關由TimeUnit決定。也就是說,具體的執行時間不是固定的,但是執行的周期是固定的,整體采用的是相對固定的延遲來執行定時任務。

如果大家細心的話,會發現在scheduleWithFixedDelay方法中設定執行周期時,傳遞的delay值為負數,如下所示。

ScheduledFutureTask<Void> sft =
  new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay));
           

這裡的負數表示的是相對固定的延遲。

在ScheduledFutureTask類中,存在一個setNextRunTime方法,這個方法會在run方法執行完任務後調用,這個方法更能展現scheduleAtFixedRate方法和scheduleWithFixedDelay方法的不同,setNextRunTime方法的源碼如下所示。

private void setNextRunTime() {
 //距離下次執行任務的時長
 long p = period;
 //固定頻率執行,
 //上次執行任務的時間
 //加上任務的執行周期
 if (p > 0)
  time += p;
 //相對固定的延遲
 //使用的是系統目前時間
 //加上任務的執行周期
 else
  time = triggerTime(-p);
}
           

在setNextRunTime方法中通過對下次執行任務的時長進行判斷來确定是固定頻率執行還是相對固定的延遲。

triggerTime方法

在ScheduledThreadPoolExecutor類中提供了兩個triggerTime方法,用于擷取下一次執行任務的具體時間。triggerTime方法的源碼如下所示。

private long triggerTime(long delay, TimeUnit unit) {
 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}

long triggerTime(long delay) {
 return now() +
  ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
           

這兩個triggerTime方法的代碼比較簡單,就是擷取下一次執行任務的具體時間。有一點需要注意的是:delay < (Long.MAX_VALUE >> 1判斷delay的值是否小于Long.MAX_VALUE的一半,如果小于Long.MAX_VALUE值的一半,則直接傳回delay,否則需要處理溢出的情況。

我們看到在triggerTime方法中處理防止溢出的邏輯使用了overflowFree方法,接下來,我們就看看overflowFree方法的實作。

overflowFree方法

overflowFree方法的源代碼如下所示。

private long overflowFree(long delay) {
 //擷取隊列中的節點
 Delayed head = (Delayed) super.getQueue().peek();
 //擷取的節點不為空,則進行後續處理
 if (head != null) {
  //從隊列節點中擷取延遲時間
  long headDelay = head.getDelay(NANOSECONDS);
  //如果從隊列中擷取的延遲時間小于0,并且傳遞的delay
  //值減去從隊列節點中擷取延遲時間小于0
  if (headDelay < 0 && (delay - headDelay < 0))
   //将delay的值設定為Long.MAX_VALUE + headDelay
   delay = Long.MAX_VALUE + headDelay;
 }
 //傳回延遲時間
 return delay;
}
           

通過對overflowFree方法的源碼分析,可以看出overflowFree方法本質上就是為了限制隊列中的所有節點的延遲時間在Long.MAX_VALUE值之内,防止在ScheduledFutureTask類中的compareTo方法中溢出。

ScheduledFutureTask類中的compareTo方法的源碼如下所示。

public int compareTo(Delayed other) {
 if (other == this) // compare zero if same object
  return 0;
 if (other instanceof ScheduledFutureTask) {
  ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
  long diff = time - x.time;
  if (diff < 0)
   return -1;
  else if (diff > 0)
   return 1;
  else if (sequenceNumber < x.sequenceNumber)
   return -1;
  else
   return 1;
 }
 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
           

compareTo方法的主要作用就是對各延遲任務進行排序,距離下次執行時間靠前的任務就排在前面。

delayedExecute方法

delayedExecute方法是ScheduledThreadPoolExecutor類中延遲執行任務的方法,源代碼如下所示。

private void delayedExecute(RunnableScheduledFuture<?> task) {
 //如果目前線程池已經關閉
 //則執行線程池的拒絕政策
 if (isShutdown())
  reject(task);
 //線程池沒有關閉
 else {
  //将任務添加到阻塞隊列中
  super.getQueue().add(task);
  //如果目前線程池是SHUTDOWN狀态
  //并且目前線程池狀态下不能執行任務
  //并且成功從阻塞隊列中移除任務
  if (isShutdown() &&
   !canRunInCurrentRunState(task.isPeriodic()) &&
   remove(task))
   //取消任務的執行,但不會中斷執行中的任務
   task.cancel(false);
  else
   //調用ThreadPoolExecutor類中的ensurePrestart()方法
   ensurePrestart();
 }
}
           

可以看到在delayedExecute方法内部調用了canRunInCurrentRunState方法,canRunInCurrentRunState方法的源碼實作如下所示。

boolean canRunInCurrentRunState(boolean periodic) {
 return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown);
}
           

可以看到canRunInCurrentRunState方法的邏輯比較簡單,就是判斷線程池目前狀态下能夠執行任務。

另外,在delayedExecute方法内部還調用了ThreadPoolExecutor類中的ensurePrestart()方法,接下來,我們看下ThreadPoolExecutor類中的ensurePrestart()方法的實作,如下所示。

void ensurePrestart() {
 int wc = workerCountOf(ctl.get());
 if (wc < corePoolSize)
  addWorker(null, true);
 else if (wc == 0)
  addWorker(null, false);
}
           

在ThreadPoolExecutor類中的ensurePrestart()方法中,首先擷取目前線程池中線程的數量,如果線程數量小于corePoolSize則調用addWorker方法傳遞null和true,如果線程數量為0,則調用addWorker方法傳遞null和false。

關于addWork()方法的源碼解析,大家可以參考【高并發專題】中的《高并發之——通過ThreadPoolExecutor類的源碼深度解析線程池執行任務的核心流程》一文,這裡,不再贅述。

reExecutePeriodic方法

reExecutePeriodic方法的源代碼如下所示。

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
 //線程池目前狀态下能夠執行任務
 if (canRunInCurrentRunState(true)) {
  //将任務放入隊列
  super.getQueue().add(task);
  //線程池目前狀态下不能執行任務,并且成功移除任務
  if (!canRunInCurrentRunState(true) && remove(task))
   //取消任務
   task.cancel(false);
  else
   //調用ThreadPoolExecutor類的ensurePrestart()方法
   ensurePrestart();
 }
}
           

總體來說reExecutePeriodic方法的邏輯比較簡單,但是,這裡需要注意和delayedExecute方法的不同點:調用reExecutePeriodic方法的時候已經執行過一次任務,是以,并不會觸發線程池的拒絕政策;傳入reExecutePeriodic方法的任務一定是周期性的任務。

onShutdown方法

onShutdown方法是ThreadPoolExecutor類中的鈎子函數,它是在ThreadPoolExecutor類中的shutdown方法中調用的,而在ThreadPoolExecutor類中的onShutdown方法是一個空方法,如下所示。

void onShutdown() {
}
           

ThreadPoolExecutor類中的onShutdown方法交由子類實作,是以ScheduledThreadPoolExecutor類覆寫了onShutdown方法,實作了具體的邏輯,ScheduledThreadPoolExecutor類中的onShutdown方法的源碼實作如下所示。

@Override
void onShutdown() {
 //擷取隊列
 BlockingQueue<Runnable> q = super.getQueue();
 //線上程池已經調用shutdown方法後,是否繼續執行現有延遲任務
 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
 //線上程池已經調用shutdown方法後,是否繼續執行現有定時任務
 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
 //線上程池已經調用shutdown方法後,不繼續執行現有延遲任務和定時任務
 if (!keepDelayed && !keepPeriodic) {
  //周遊隊列中的所有任務
  for (Object e : q.toArray())
   //取消任務的執行
   if (e instanceof RunnableScheduledFuture<?>)
    ((RunnableScheduledFuture<?>) e).cancel(false);
  //清空隊列
  q.clear();
 }
 //線上程池已經調用shutdown方法後,繼續執行現有延遲任務和定時任務
 else {
  //周遊隊列中的所有任務
  for (Object e : q.toArray()) {
   //目前任務是RunnableScheduledFuture類型
   if (e instanceof RunnableScheduledFuture) {
    //将任務強轉為RunnableScheduledFuture類型
    RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
    //線上程池調用shutdown方法後不繼續的延遲任務或周期任務
    //則從隊列中删除并取消任務
    if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
     t.isCancelled()) {
     if (q.remove(t))
      t.cancel(false);
    }
   }
  }
 }
 //最終調用tryTerminate()方法
 tryTerminate();
}
           

ScheduledThreadPoolExecutor類中的onShutdown方法的主要邏輯就是先判斷線程池調用shutdown方法後,是否繼續執行現有的延遲任務和定時任務,如果不再執行,則取消任務并清空隊列;如果繼續執行,将隊列中的任務強轉為RunnableScheduledFuture對象之後,從隊列中删除并取消任務。大家需要好好了解這兩種處理方式。最後調用ThreadPoolExecutor類的tryTerminate方法。有關ThreadPoolExecutor類的tryTerminate方法的源碼解析,大家可以參考【高并發專題】中的《高并發之——通過源碼深度分析線程池中Worker線程的執行流程》一文,這裡不再贅述。

至此,ScheduledThreadPoolExecutor類中的核心方法的源代碼,我們就分析完了。

繼續閱讀