天天看點

源碼分析ElasticJob任務錯過機制(misfire)與幂等性

任務在排程執行中,由于某種原因未執行完畢,下一次排程任務觸發後,在同一個Job執行個體中,會出現兩個線程處理同一個分片上的資料,這樣就會造成兩個線程可能處理到相同的資料。為了避免同一條資料可能會被多次執行的問題,ElasticJob引入幂等機制,確定同一條資料不會再被多個Job同時處理,也避免同一條資料在同一個Job執行個體的多個線程處理。再重申一次ElastciJob的分布式是資料的分布式,一個任務在多個Job執行個體上運作,每個Job執行個體處理該Job的部分資料(資料分片)。

本文重點分析ElasticJob是如何做到如下兩點的。

  1. ElasticJob如何確定在同一個Job執行個體中多個線程不會處理相同的資料。
  2. ElasticJob如何確定資料不會被多個Job執行個體處理。

    為了解決上述這種情況,ElasticJob引入任務錯過補償執行(misfire)與幂等機制(monitorExecution)

1、ElasticJob如何確定在同一個Job執行個體中多個線程不會處理相同的資料。

場景:例如任務排程周期為每5s執行一次,正常每次排程任務處理需要耗時2s,如果在某一段時間由于資料庫壓力變大,導緻原本隻需要2s就能處理完成的任務,現在需要16s才能運作,在這個資料處理的過程中,每5s又會觸發一次排程(任務處理),如果不加以控制的話,在同一個執行個體上根據分片條件去查詢資料庫,查詢到的資料有可能相同(部分相同),這樣同一條任務資料将被多次運作,如果這個任務時處理轉賬業務,如果在業務方法不實作幂等,則會引發非常嚴重的問題,那ElasticJob是否可以避免這個問題呢?

答案是肯定。elasticJob提供了一個配置參數:monitorExecution=true,開啟幂等性。

一個任務觸發後,将執行任務處理邏輯,其入口:AbstractElasticJobExecutor#misfireIfRunning

if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {  // @1
       if (shardingContexts.isAllowSendJobEvent()) {  // @2
             jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                    "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, 
                    shardingContexts.getShardingItemParameters().keySet()));
       }
      return;
}           

代碼@1:在一個排程任務觸發後如果上一次任務還未執行,則需要設定該分片狀态為mirefire,表示錯失了一次任務執行。

代碼@2:如果該分片被設定為mirefire并開啟了事件跟蹤,将事件跟蹤儲存在資料庫中。

接下來詳細分析JobFacade.misfireIfRunning的實作邏輯:

/**
     * 如果目前分片項仍在運作則設定任務被錯過執行的标記.
     * 
     * @param items 需要設定錯過執行的任務分片項
     * @return 是否錯過本次執行
     */
    public boolean misfireIfHasRunningItems(final Collection<Integer> items) {
        if (!hasRunningItems(items)) {
            return false;
        }
        setMisfire(items);
        return true;
    }           

如果存在未完成的分片,則調用setMisfire(items)方法,ElasticJob在開啟monitorExecution(true)【幂等機制】機制的情況下,在分片任務開始時會建立${namespace}/jobname/sharding/{item}/running節點,在任務結束後會删除該目錄,是以在判斷是否有分片正在運作時,隻需判斷是否存在上述節點即可。如果存在,調用setMisfire方法。

PS:如果ElasticJob為開啟幂等(monitorExecution)的情況下,才會建立${namespace}/jobname/sharding

/{item}/running,misfire機制才能生效。

ExecutionService#setMisfire

/**
     * 設定任務被錯過執行的标記.
     *
     * @param items 需要設定錯過執行的任務分片項
     */
    public void setMisfire(final Collection<Integer> items) {
        for (int each : items) {
            jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));
        }
    }           

設定misfire的方法為配置設定給該執行個體下的所有分片建立持久節點${namespace}/jobname/shading/{item}/misfire節點,注意,隻要配置設定給該執行個體的任何一分片未執行完畢,則在該執行個體下的所有分片都增加misfire節點,然後忽略本次任務觸發執行,等待任務結束後再執行。

AbstractElasticJobExecutor#execute

execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
     while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
         jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
        execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}           

在任務執行完成後檢查是否存在${namespace}/jobname/sharding/{item}/misfire節點,如果存在,則首先清除misfie相關的檔案,然後執行任務。

ElasticJob的misfire實作方案總結:

在下一個排程周期到達之後,隻要發現這個分片的任何一個分片正在執行,則為該執行個體分片的所有分片都設定為misfire,等任務執行完畢後,再統一執行下一次任務排程。

2、ElasticJob如何確定資料不會被多個Job執行個體處理

ElasticJob基于資料分片,不同分片根據分片參數(人為配置),從資料庫中查詢各自資料(任務資料分片),如果當節點當機,資料會重新分片,如果任務未執行完成,然後執行分片,資料是否會被不同的任務同時處理呢?

答案是不會,因為當節點當機後,是否需要重新分片事件監聽器會監聽到Job執行個體代表的節點删除,設定重新分片,在任務被排程執行具體處理邏輯之前,需要重新分片,重新分片的前提又是要所有的分片的任務全部執行完畢,這也依賴是否開啟幂等控制(monitorExecution),如果開啟,ElasticJob能感覺正在執行處理邏輯的分片,重新分片需要等待目前所有任務全部運作完畢後才會觸發,故不會存在不同節點處理相同資料的問題。

問答:

1、如果一個任務JOB的排程頻率為每10s一次,在某個時間,該job執行耗時用了33s(平時隻需執行5s),按照正常排程,應該後續會觸發3次排程,那該job後執行完,會連續執行3次排程嗎?

答案:在33s這次任務執行完成後,如果後面的任務執行在10s内執行完畢的話,隻會觸發一次,不會補償3次,因為ElasticJob記錄任務錯失執行,隻是建立了misfire節點,并不會記錄錯失的此時,因為也沒這個必要。

原文釋出時間為:2018-12-11

本文作者:丁威,《RocketMQ技術内幕》作者。

本文來自

中間件興趣圈

,了解相關資訊可以關注