天天看點

ScheduledThreadPoolExecutor原理探究簡介執行個體源碼分析

簡介

ThreadPoolExecutor是Executors中一部分功能,下面來介紹另外一部分功能也就是ScheduledThreadPoolExecutor的實作,後者是一個可以在一定延遲時候或者定時進行任務排程的線程池。

Executors其實是個工具類,裡面提供了好多靜态方法,根據使用者選擇傳回不同的線程池執行個體。

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor并實作ScheduledExecutorService接口,關于ThreadPoolExecutor的介紹可以參考:

http://www.jianshu.com/p/3cc67876375f

線程池隊列是DelayedWorkQueue,它是對delayqueue的優化,關于delayqueue參考:http://www.jianshu.com/p/2659eb72134b

ScheduledFutureTask是阻塞隊列元素是對任務修飾。

Executors的類圖結構如下:

ScheduledThreadPoolExecutor原理探究簡介執行個體源碼分析

ScheduledThreadPoolExecutor的構造函數如下:

//使用改造後的delayqueue.
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }           

複制

執行個體

// 任務間以固定時間間隔執行,延遲1s後開始執行任務,任務執行完畢後間隔2s再次執行,任務執行完畢後間隔2s再次執行,依次往複
    static void scheduleWithFixedDelay() throws InterruptedException, ExecutionException {
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

        ScheduledFuture<?> result = executorService.scheduleWithFixedDelay(new Runnable() {
            public void run() {

                System.out.println(System.currentTimeMillis());

            }
        }, 1000, 2000, TimeUnit.MILLISECONDS);

        // 由于是定時任務,一直不會傳回
        result.get();
        System.out.println("over");

    }
    // 相對開始加入任務的時間點固定頻率執行:從加入任務開始算1s後開始執行任務,1+2s開始執行,1+2*2s執行,1+n*2s開始執行;
    // 但是如果執行任務時間大約2s則不會并發執行後續任務将會延遲。

    static void scheduleAtFixedRate() throws InterruptedException, ExecutionException {
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

        ScheduledFuture<?> result = executorService.scheduleAtFixedRate(new Runnable() {
            public void run() {

                System.out.println(System.currentTimeMillis());

            }
        }, 1000, 2000, TimeUnit.MILLISECONDS);

        // 由于是定時任務,一直不會傳回
        result.get();
        System.out.println("over");
    }

    // 延遲1s後開始執行,隻執行一次,沒有傳回值
    static void scheduleRunable() throws InterruptedException, ExecutionException {
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

        ScheduledFuture<?> result = executorService.schedule(new Runnable() {

            @Override
            public void run() {
                System.out.println("gh");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        }, 1000, TimeUnit.MILLISECONDS);

        System.out.println(result.get());

    }

    // 延遲1s後開始執行,隻執行一次,有傳回值
    static void scheduleCaller() throws InterruptedException, ExecutionException {
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);

        ScheduledFuture<String> result = executorService.schedule(new Callable<String>() {

            @Override
            public String call() throws Exception {

                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                return "gh";
            }

        }, 1000, TimeUnit.MILLISECONDS);

        // 阻塞,直到任務執行完成
        System.out.print(result.get());

    }           

複制

源碼分析

schedule函數

public ScheduledFuture<?> schedule(Runnable command, long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();

    //裝飾任務,主要實作public long getDelay(TimeUnit unit)和int compareTo(Delayed other)方法
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    //添加任務到延遲隊列
    delayedExecute(t);
    return t;
}

private void delayedExecute(RunnableScheduledFuture<?> task) {

    //如果線程池關閉了,則拒絕任務
    if (isShutdown())
        reject(task);
    else {
        //添加任務到隊列
        super.getQueue().add(task);

        //再次檢查線程池關閉
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            //確定至少一個線程在處理任務,即使核心線程數corePoolSize為0
            ensurePrestart();
    }
}

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    //增加核心線程數
    if (wc < corePoolSize)
        addWorker(null, true);
    //如果初始化corePoolSize==0,則也添加一個線程。
    else if (wc == 0)
        addWorker(null, false);
    }           

複制

上面做的首先吧runnable裝飾為delay隊列所需要的格式的元素,然後把元素加入到阻塞隊列,然後線程池線程會從阻塞隊列擷取逾時的元素任務進行處理,下面看下隊列元素如何實作的。

//r為被修飾任務,result=null,ns為目前時間加上delay時間後的
ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

//通過擴充卡把runnable轉換為callable
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}           

複制

修飾後把目前任務修飾為了delay隊列所需元素,下面看下元素的兩個重要方法:過期時間計算和元素比較。

過期時間計算

//元素過期算法,裝飾後時間-目前時間,就是即将過期剩餘時間
public long getDelay(TimeUnit unit) {
  return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}           

複制

元素比較

public int compareTo(Delayed other) {
  if (other == this) // compare zero ONLY if same object
      return 0;
  if (other instanceof ScheduledFutureTask) {
      ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
      long diff = time - x.time;
      if (diff < 0)
          return -1;
      else if (diff > 0)
          return 1;
      else if (sequenceNumber < x.sequenceNumber)
          return -1;
      else
          return 1;
  }
  long d = (getDelay(TimeUnit.NANOSECONDS) -
            other.getDelay(TimeUnit.NANOSECONDS));
  return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}           

複制

schedule(Callable callable,

long delay,

TimeUnit unit)和schedule(Runnable command, long delay,TimeUnit unit)類似。

compareTo作用是在加入元素到dealy隊列時候進行比較,需要調整堆讓最快要過期的元素放到隊首。是以無論什麼時候向隊列裡面添加元素,隊首的都是最即将過期的元素。

scheduleWithFixedDelay函數

定時排程:相鄰任務間時間固定。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();

        //修飾包裝,注意這裡是period=-delay<0
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        //添加任務到隊列
        delayedExecute(t);
        return t;
    }
       //period為 delay時間
        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }           

複制

任務添加到隊列後,工作線程會從隊列擷取并移除到期的元素,然後執行run方法,是以下面看看ScheduledFutureTask的run方法如何實作定時排程的。

public void run() {

    //是否隻執行一次
    boolean periodic = isPeriodic();

    //取消任務
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    //隻執行一次,調用schdule時候
    else if (!periodic)
        ScheduledFutureTask.super.run();

    //定時執行
    else if (ScheduledFutureTask.super.runAndReset()) {
        //設定time=time+period
        setNextRunTime();

        //重新加入該任務到delay隊列
        reExecutePeriodic(outerTask);
    }
}           

複制

private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else//由于period=-delay是以執行這裡,設定time=now()+delay
                time = triggerTime(-p);
        }           

複制

定時排程是先從隊列擷取任務然後執行,然後在重新設定任務時間,在把任務放入隊列實作的。

如果任務執行時間大于delay時間則等任務執行完畢後的delay時間後在次調用任務,不會同一個任務并發執行。

scheduleAtFixedRate函數

定時排程:相對起始時間點固定頻率調用。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    //裝飾任務類,注意period=period>0,不是負的
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    //添加任務到隊列
    delayedExecute(t);
    return t;
}           

複制

private void setNextRunTime() {
            long p = period;
           //period=delay;
            if (p > 0)
                time += p;//由于period>0是以執行這裡,設定time=time+delay
            else
                time = triggerTime(-p);
        }           

複制

相對于上面delay,rate方式執行規則為時間為initdelday + n*period;時候啟動任務,但是如果目前任務還沒有執行完,要等到目前任務執行完畢後在執行一個任務。