天天看點

[從源碼學設計]螞蟻金服SOFARegistry 之 自動調節間隔周期性任務

SOFARegistry 是螞蟻金服開源的一個生産級、高時效、高可用的服務注冊中心。本系列文章重點在于分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實作機制和架構思路,讓大家借以學習阿裡如何設計。本文為第九篇,介紹SOFARegistry自動調節間隔周期性任務的實作。

[從源碼學設計]螞蟻金服SOFARegistry 之 自動調節間隔周期性任務

目錄

    • 6.1 ThreadPoolExecutor的queue
    • 6.2 SOFARegistry選擇
    • 6.3 LinkedBlockingQueue
    • 6.4 SynchronousQueue
    • 5.1 ScheduledExecutorService
    • 4.1 ExecutorService
    • 0x00 摘要
    • 0x01 業務領域
    • 0x02 阿裡方案
    • 0x03 Scheduler
    • 0x04 無限循環任務
    • 0x05 周期任務
    • 0x06 Queue的選擇
    • 0x07 自動調節間隔的周期性任務
    • 0xFF 參考

SOFARegistry 是螞蟻金服開源的一個生産級、高時效、高可用的服務注冊中心。

本系列文章重點在于分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實作機制和架構思路,讓大家借以學習阿裡如何設計。

本文為第九篇,介紹SOFARegistry自動調節間隔周期性任務的實作。

螞蟻金服這裡的業務需求主要是:

  • 啟動一個無限循環任務,不定期執行任務;
  • 啟動若幹周期性延時任務;
  • 某些周期性任務需要實作自動調節間隔功能:程式一旦遇到發生逾時異常,就将間隔時間調大,如果連續逾時,那麼每次間隔時間都會增大一倍,一直到達外部參數設定的上限為止,一旦新任務不再發生逾時異常,間隔時間又會自動恢複為初始值

阿裡采用了:

  • ExecutorService實作了無限循環任務;
  • ScheduledExecutorService 實作了周期性任務;
  • TimedSupervisorTask 實作了自動調節間隔的周期性任務;

我們在設計延時/周期性任務時就可以參考TimedSupervisorTask的實作

Scheduler類中就是這個方案的展現。

首先,我們需要看看 Scheduler的代碼。

public class Scheduler {

    private final ScheduledExecutorService scheduler;
    public final ExecutorService           versionCheckExecutor;
    private final ThreadPoolExecutor       expireCheckExecutor;

    @Autowired
    private AcceptorStore                  localAcceptorStore;

    public Scheduler() {
        scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler"));

        expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
            new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));

        versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(), new NamedThreadFactory(
                "SyncDataScheduler-versionChangeCheck"));

    }

    public void startScheduler() {
        scheduler.schedule(
                new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3,
                        TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()),
                30, TimeUnit.SECONDS);

        versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());
    }

    public void stopScheduler() {
        if (scheduler != null && !scheduler.isShutdown()) {
            scheduler.shutdown();
        }
        if (versionCheckExecutor != null && !versionCheckExecutor.isShutdown()) {
            versionCheckExecutor.shutdown();
        }
    }
}      

接下來我們就逐一分析下其實作或者說是設計選擇。

阿裡這裡采用ExecutorService實作了無限循環任務,不定期完成業務。

Executor:一個JAVA接口,其定義了一個接收Runnable對象的方法executor,其方法簽名為executor(Runnable command),該方法接收一個Runable執行個體,用來執行一個實作了Runnable接口的類。

ExecutorService:是一個比Executor使用更廣泛的子類接口。

其提供了生命周期管理的方法,傳回 Future 對象,以及可跟蹤一個或多個異步任務執行狀況傳回Future的方法;

當所有已經送出的任務執行完畢後将會關閉ExecutorService。是以我們一般用該接口來實作和管理多線程。

這裡ExecutorService雖然其不能提供周期性功能,但是

localAcceptorStore.changeDataCheck

本身就是一個while (true) loop,其可以依靠DelayQueue來完成類似周期功能。

versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(), new NamedThreadFactory(
                "SyncDataScheduler-versionChangeCheck"));

versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());

public void changeDataCheck() {
        while (true) {
            try {
                DelayItemdelayItem = delayQueue.take();
                Acceptor acceptor = delayItem.getItem();
                removeCache(acceptor); // compare and remove
            } catch (InterruptedException e) {
                break;
            } catch (Throwable e) {
                LOGGER.error(e.getMessage(), e);
            }
        }
}      

阿裡這裡采用了 ScheduledExecutorService 實作了周期性任務。

ScheduledExecutorService是一種線程池,ScheduledExecutorService在ExecutorService提供的功能之上再增加了延遲和定期執行任務的功能。

其schedule方法建立具有各種延遲的任務,并傳回可用于取消或檢查執行的任務對象。

尋常的Timer的内部隻有一個線程,如果有多個任務的話就會順序執行,這樣我們的延遲時間和循環時間就會出現問題,而且異常未檢查會中止線程。

ScheduledExecutorService是線程池,并且線程池對異常做了處理,使得任務之間不會有影響。在對延遲任務和循環任務要求嚴格的時候,就需要考慮使用ScheduledExecutorService了。

ThreadPoolExecutor的完整構造方法的簽名如下

ThreadPoolExecutor
(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler)12      

其中,workQueue參數介紹如下:

workQueue任務隊列):用于儲存等待執行的任務的阻塞隊列。可以選擇以下幾個阻塞隊列。

  • ArrayBlockingQueue:是一個基于數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序;
  • LinkedBlockingQueue:一個基于連結清單結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜态工廠方法Executors.newFixedThreadPool()使用了這個隊列;
  • SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀态,吞吐量通常要高于LinkedBlockingQueue,靜态工廠方法Executors.newCachedThreadPool使用了這個隊列;
  • PriorityBlockingQueue:一個具有優先級的無限阻塞隊列;

這裡采用了兩種Queue。

expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
    new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));

versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<>(), new NamedThreadFactory(
        "SyncDataScheduler-versionChangeCheck"));      

LinkedBlockingQueue是一種阻塞隊列。

LinkedBlockingQueue内部由單連結清單實作了BlockingQueue接口,隻能從head取元素,從tail添加元素。

LinkedBlockingQueue内部分别使用了takeLock 和 putLock 對并發進行控制,也就是說LinkedBlockingQueue是讀寫分離的,添加和删除操作并不是互斥操作,可以并行進行,這樣也就可以大大提高吞吐量。

LinkedBlockingQueue不同于ArrayBlockingQueue,它如果不指定容量,預設為

Integer.MAX_VALUE

,也就是無界隊列。如果存在添加速度大于删除速度時候,有可能會記憶體溢出,是以為了避免隊列過大造成機器負載或者記憶體爆滿的情況出現,我們在使用的時候建議手動傳一個隊列的大小。

另外,LinkedBlockingQueue對每一個lock鎖都提供了一個Condition用來挂起和喚醒其他線程。

不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并沒有資料緩存空間。

你不能調用peek()方法來看隊列中是否有資料元素,因為資料元素隻有當你試着取走的時候才可能存在,不取走而隻想偷窺一下是不行的,當然周遊這個隊列的操作也是不允許的。隊列頭元素是第一個排隊要插入資料的線程,而不是要交換的資料。

資料是在配對的生産者和消費者線程之間直接傳遞的,并不會将資料緩沖資料到隊列中。可以這樣來了解:生産者和消費者互相等待對方,握手,然後一起離開。

SynchronousQueue的一個使用場景是線上程池裡。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要(新任務到來時)建立新的線程,如果有空閑線程則會重複使用,線程空閑了60秒後會被回收。

TimedSupervisorTask 是一個自動調節間隔的周期性任務。這裡基本是借鑒了Eureka的同名實作,但是SOFA這裡去除了“部分異常處理邏輯”。

從整體上看,TimedSupervisorTask是固定間隔的周期性任務,一旦遇到逾時就會将下一個周期的間隔時間調大,如果連續逾時,那麼每次間隔時間都會增大一倍,一直到達外部參數設定的上限為止,一旦新任務不再逾時,間隔時間又會自動恢複為初始值,另外還有CAS來控制多線程同步。

主要邏輯如下:

  • 執行submit()方法送出任務;
  • 執行future.get()方法,如果沒有在規定的時間得到傳回值或者任務出現異常,則進入異常處理catch代碼塊;
  • 如果沒有發生異常,則再設定一次延時任務時間timeoutMillis;
  • 如果發生異常:
    • 發生TimeoutException異常,則執行

      Math.min(maxDelay, currentDelay x 2)

      得到任務延時時間 x 2 和 最大延時時間的最小值,然後改變任務的延時時間timeoutMillis;
    • 發生RejectedExecutionException異常,SOFA隻是列印log。Eureka則将rejectedCounter值+1;
    • 發生Throwable異常,SOFA隻是列印log。Eureka則将throwableCounter值+1;
  • 進入finally代碼塊
    • .如果future不為null,則執行future.cancel(true),中斷線程停止任務;
    • 如果線程池沒有shutdown,則建立一個新的定時任務;最關鍵就在上面的最後一行代碼中:

      scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS)

      :執行完任務後,會再次調用schedule方法,在指定的時間之後執行一次相同的任務,這個間隔時間和最近一次任務是否逾時有關,如果逾時了就間隔時間就會變大;

其實作如下:

public class TimedSupervisorTask extends TimerTask {
    private final ScheduledExecutorService scheduler;
    private final ThreadPoolExecutor       executor;
    private final long                     timeoutMillis;
    private final Runnable                 task;
    private String                         name;
    private final AtomicLong               delay;
    private final long                     maxDelay;

    public TimedSupervisorTask(String name, ScheduledExecutorService scheduler,
                               ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit,
                               int expBackOffBound, Runnable task) {
        this.name = name;
        this.scheduler = scheduler;
        this.executor = executor;
        this.timeoutMillis = timeUnit.toMillis(timeout);
        this.task = task;
        this.delay = new AtomicLong(timeoutMillis);
        this.maxDelay = timeoutMillis * expBackOffBound;

    }

    @Override
    public void run() {
        Future future = null;
        try {
            //使用Future,可以設定子線程的逾時時間,這樣目前線程就不用無限等待了
            future = executor.submit(task);
            //指定等待子線程的最長時間
            // block until done or timeout
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);
            // 每次執行任務成功都會将delay重置
            delay.set(timeoutMillis);
        } catch (TimeoutException e) {

            long currentDelay = delay.get();
            // 如果出現異常,則将時間*2,然後取 定時時間 和 最長定時時間 中最小的為下次任務執行的延時時間
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            // 設定為最新的值,考慮到多線程,是以用了CAS
            delay.compareAndSet(currentDelay, newDelay);

        } catch (RejectedExecutionException e) {
            // 線程池的阻塞隊列中放滿了待處理任務,觸發了拒絕政策
            LOGGER.error("{} task supervisor rejected the task: {}", name, task, e);
        } catch (Throwable e) {
           // 出現未知的異常
            LOGGER.error("{} task supervisor threw an exception", name, e);
        } finally {
           //這裡任務要麼執行完畢,要麼發生異常,都用cancel方法來清理任務;
            if (future != null) {
                future.cancel(true);
            }
            //這裡就是周期性任務的原因:隻要沒有停止排程器,就再建立一次性任務,執行時間時dealy的值,
            //假設外部調用時傳入的逾時時間為30秒(構造方法的入參timeout),最大間隔時間為50秒(構造方法的入參expBackOffBound)
            //如果最近一次任務沒有逾時,那麼就在30秒後開始新任務,
            //如果最近一次任務逾時了,那麼就在50秒後開始新任務(異常進行中有個乘以二的操作,乘以二後的60秒超過了最大間隔50秒)
            scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
        }
    }
}      

Eureka系列(六) TimedSupervisorTask類解析

Eureka的TimedSupervisorTask類(自動調節間隔的周期性任務)

java線程池ThreadPoolExecutor類使用詳解

Java線程池ThreadPoolExecutor實作原理剖析

深入了解Java線程池:ThreadPoolExecutor

Java中線程池ThreadPoolExecutor原理探究

java并發之SynchronousQueue實作原理

ScheduledExecutorService 和 Timer 的差別

Java并發包中的同步隊列SynchronousQueue實作原理

繼續閱讀