天天看點

java并發程式設計筆記--ScheduledThreadPoolExecutor實作

    ScheduledThreadPoolExecutor作為ScheduledExecutorService接口的實作,提供了延遲執行任務或者周期性執行任務的能力。通過名稱可以看出,ScheduledThreadPoolExecutor基于線程池實作,它通過繼承ThreadPoolExecutor實作線程池管理能力的複用,同時擴充了自己的定時任務排程能力。

    首先來看ScheduledExecutorServicej接口,它繼承了ExecutorService接口,作為任務執行器的一種擴充類型,提供了如下方法:

schedule方法:用于任務的單次執行,允許指定延遲時間,當時間為0或者負數時,表示立即執行任務;

scheduleAtFixedRate方法:以固定的時間間隔執行任務,當任務本身的執行時間超過時間間隔時,會等到任務執行完成後,立即執行下一次任務;同一個任務總是串行執行,不會并發執行;

scheduleWithFixedDelay方法:以固定的延遲執行任務,目前任務執行時間與上一次任務執行時間相隔固定的延遲;任務每次執行完成後,會在結束時間上加上固定的延遲作為下一次執行時間。任務執行的周期會将任務本身執行耗時考慮在内,因而并非每次執行的時間間隔都相同;

ScheduledThreadPoolExecutor繼承ThreadPoolExecutor,主要做了如下改變:

使用ScheduledFutureTask作為任務封裝類,代替原先的FutureTask類;

使用DelayedWorkQueue作為阻塞隊列,隊列為無界隊列;ScheduledThreadPoolExecutor的構造器僅需要傳入corePoolSize,使用"corePoolSize+無界隊列"實作任務排程;

支援run-after-shutdown參數,使得ScheduledThreadPoolExecutor重寫shutdown方法,允許移除并且取消不需要在shutdown後執行的任務;

提供了decorateTask方法,用來定制任務操作;

java并發程式設計筆記--ScheduledThreadPoolExecutor實作

ScheduledThreadPoolExecutor由3部分組成:

任務排程控制:ScheduledThreadPoolExecutor,負責任務排程控制,實作了ScheduledExecutorService接口;

阻塞隊列:DelayedWorkQueue,作為ScheduledThreadPoolExecutor的内部類,用于緩存線程任務的阻塞隊列,僅能夠存放RunnableScheduledFuture對象;該隊列實作了延遲排程任務的邏輯,如果目前時間大于等于任務的延遲執行時間,任務才可以被排程。

排程任務:ScheduledFutureTask,作為ScheduledFutureTask的内部類,實作了RunnableScheduledFuture,封裝了排程任務的執行邏輯。其中的time字段存放下一次執行時間,DelayedWorkQueue會據此判斷任務是否可以被執行。period字段存放執行周期,對于周期性執行任務,每次會根據period計算time。

ScheduledThreadPoolExecutor的構造器最多指定3個參數:

corePoolSize:線程池核心工作線程數量;

threadFactory:定制工作線程建立方式;

handler:駁回任務處理政策;

    ScheduledThreadPoolExecutor構造器會調用父類構造器進行線程池初始化,使用DelayedWorkQueue作為阻塞隊列,該隊列為無界隊列,因而maximumPoolSize屬性配置無效。又因為都是核心工作線程,沒有非核心線程需要回收,因而keepAliveTime配置為0。代碼如下:

ScheduledThreadPoolExecutor初始化時并不會預先建立工作線程,而是在送出任務的時候,通過父類java.util.concurrent.ThreadPoolExecutor#ensurePrestart方法判斷線程數是否達到corePoolSize,如果未達到,則新增線程;實作邏輯如下:

ScheduledThreadPoolExecutor的任務執行分為單次執行和周期性執行。

單次執行:通過schedule方法執行的任務屬于單次執行任務。Executor的execute方法、ExecutorService的submit方法都是通過調用schedule方法執行,故也是單次執行的任務。除了schedule可以指定延遲時間以外,其餘方法的延遲時間均為0,即立刻執行任務。比如:execute方法實作如下:

周期性執行:通過scheduleAtFixedRate、scheduleWithFixedDelay方法執行的任務均為周期性執行任務。周期性執行的實作可以了解為每次執行完成後設定下一次執行時間,然後将任務重新放入到阻塞隊列等待下一次排程。

    無論是單次執行還是周期性執行,其執行的入口都是delayedExecute方法。delayedExecute()将任務放入到阻塞隊列中,複用ThreadPoolExecutor的邏輯進行任務排程。代碼如下:

    當ThreadPoolExecutor的Worker線程從阻塞隊列取出任務執行時,會調用ScheduledFutureTask的run方法。該方法對任務類型進行判斷,如果是單次執行任務,則立即執行并設定傳回結果。如果是周期性執行任務,則執行任務并設定下一次執行時間,然後将任務放入到阻塞隊列中,等待下一次排程。方法代碼如下:

schedule的執行主要分為參數封裝和執行兩個步驟。實作如下:

參數封裝過程會調用decorateTask方法,該方法為protected的空方法,用于定制RunnableScheduledFuture的屬性,可以通過重寫實作定制。

    scheduleAtFixedRate()的實作與schedule()方法非常相似,僅是将decorateTask()傳回的RunnableScheduledFuture對象設定為原有Future的outerTask屬性。在重新知心任務時,會将outerTask添加到阻塞隊列,進而保證decorateTask()的定制效果一直有效。

    scheduleAtFixedRate()的實作與schedule()方法非常相似,僅是設定ScheduledFutureTask延遲時,使用負數,辨別執行方式為scheduleAtFixedRate。

    ScheduledFutureTask并沒有設定單獨的字段用于辨別執行類型,而是通過period字段的正負号和是否為0表示執行方式:

正數:fixed-rate執行方式;

負數:fixed-delay執行方式;

0:單次執行任務;

    scheduleAtFixedRate() / scheduleWithFixedDelay()執行的主要差別在于設定下一次執行時間的政策不同,而執行時間通過ScheduledFutureTask的time字段儲存,通過ScheduledFutureTask#setNextRunTime()進行設定,代碼如下:

    DelayedWorkQueue是專門存放RunnableScheduledFuture和ScheduledFutureTask對象的優先隊列,底層基于最小二叉堆實作,為了能夠提升任務的查找和删除效率,ScheduledFutureTask中增加了一個heapIndex的成員變量,用于存放任務在堆數組中的索引位置,當需要查找或者删除某個特定的任務時,直接根據任務的heapIndex通路堆數組中的元素。任務是否到達執行時間的判斷邏輯均在DelayedWorkQueue中實作。

    與PriorityQueue的實作不同,DelayedWorkQueue涉及到多線程通路,因而需要保證線程同步測正确性,故使用ReentrantLock來控制操作的原子性,同時使用Condition來協調線程的執行;

    為了友善在DelayedWorkQueue中查找和删除任務,ScheduledFutureTask有一個heapIndex用于存放任務在堆數組中的索引位置。每當任務在隊列中的位置改變時,需要同步更新任務的heapIndex。

上浮、下沉操作的實作與PriorityQueue實作相似,隻多了更新索引位置的操作,且需要在加鎖的環境下調用。

    通過上面代碼我們總結DelayedWorkQueue的實作原理:

1)基于最小二叉堆實作的優先隊列,根據ScheduledFutureTask.compareTo方法比較任務執行時間,使得最近要執行的任務位于隊首;

2)任務出隊時,通過輪詢判斷任務是否到達執行時間點,ScheduledFutureTask實作了Delayed接口,通過getDelay方法能夠擷取到任務還有多長時間執行;

3)當隊列中所有任務都沒有到達執行時間時,隊列中會維持一個leader線程,用于輪詢等待隊首任務,其餘線程均await()。

4)ScheduledFutureTask增加heapIndex屬性,用于标記任務在堆數組中的索引,進而便于任務的快速查找(是否存在)與取消(删除);

    任務的取消通過ScheduledFutureTask.cancel()方法實作,該方法調用ThreadPoolExecutor.cancel(),在取消任務後,判斷是否需要從阻塞隊列中移除任務。其中removeOnCancel參數通過setRemoveOnCancelPolicy()設定。之是以要在取消任務後移除阻塞隊列中任務,是為了防止隊列中積壓大量已被取消的任務。

ScheduledThreadPoolExecutor的shutdown() / shutdownNow()方法均調用ThreadPoolExecutor的相應方法實作。同時,ScheduledThreadPoolExecutor實作了ThreadPoolExecutor的onShutdown()用于在shutdown()執行過程中取消任務執行。

此處涉及2個參數:

executeExistingDelayedTasksAfterShutdown:當執行shutdown()後,是否繼續執行隊列中的單次執行任務;預設為true,即執行;

continueExistingPeriodicTasksAfterShutdown:當執行shutdown()後,是否繼續執行隊列中的周期性任務;預設為false,即不執行;