天天看點

Elastic-Job原理--任務失敗轉移(五)

        在上一篇部落格Elastic-Job原理--任務排程處理(四)我們已經了解到Elastic-Job依賴quartz定時任務執行分片任務的過程,這篇部落格我們簡單了解一下Elastic-Job中當某個伺服器節點與注冊中心斷開連接配接(無法進行任務執行)時其需要執行的任務轉移到其他節點的過程。

首先提供如下類圖,與節點任務失敗轉移相關主要類如下:

Elastic-Job原理--任務失敗轉移(五)

FailoverService,作業失效轉移服務。

FailoverNode,作業失效轉移資料存儲路徑。

FailoverListenerManager,作業失效轉移監聽管理器。

一、重新分片

        當伺服器節點從注冊中心zk斷開連接配接時,Elastic-job需要做的一件事情是需要在下次任務執行前進行重新分片,當zk節點數目發生變更時,會引發ListenServersChangedJobListener監聽器調用,此監聽器會調用shardingService的重新分片标志設定方法,這樣再下次任務執行前會重新進行任務分片操作。

/**
 * 當執行個體節點變更時會調用此監聽器
 *
 */
class ListenServersChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            //如果節點數目發生變更則設定重新分片标志,下次任務執行前會進行重新分片
            if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
                shardingService.setReshardingFlag();
            }
        }
        
        private boolean isInstanceChange(final Type eventType, final String path) {
            return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
        }
        
        private boolean isServerChange(final String path) {
            return serverNode.isServerPath(path);
        }
    }      

任務重新分片隻是解決了下次任務執行時,所有的分片任務都是分布到各個執行個體中,但是目前失效的任務是如何處理的。

二、任務失效轉移

     所謂失效轉移,就是在執行任務的過程中遇見異常的情況,這個分片任務可以在其他節點再次執行。這個和上面的HA不同,對于HA,上面如果任務終止,那麼不會在其他任務執行個體上再次重新執行。Job的失效轉移監聽來源于FailoverListenerManager中JobCrashedJobListener的dataChanged方法。FailoverListenerManager監聽的是zk的instance節點删除事件。如果任務配置了failover等于true,其中某個instance與zk失去聯系或被删除,并且失效的節點又不是本身,就會觸發失效轉移邏輯。首先,在某個任務執行個體elastic-job會在leader節點下面建立failover節點以及items節點。items節點下會有失效任務執行個體的原本應該做的分片好。比如,失效的任務執行個體原來負責分片1和2。那麼items節點下就會有名字叫1的子節點,就代表分片1需要轉移到其他節點上去運作。如下圖:

Elastic-Job原理--任務失敗轉移(五)

當節點任務失效時會調用JobCrashedJobListener監聽器,此監聽器會根據執行個體id擷取所有的分片,然後調用FailoverService的setCrashedFailoverFlag方法,将每個分片id寫到/jobName/leader/failover/items下

/**
 * 任務失效時會調用這個監聽器
 */
class JobCrashedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
                String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
                    return;
                }
                //會将所有的分片初始化到注冊中心中
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
                if (!failoverItems.isEmpty()) {
                    for (int each : failoverItems) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                } else {
                    for (int each : shardingService.getShardingItems(jobInstanceId)) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                }
            }
        }
    }      

在FailoverService方法中調用setCrashedFailoverFlag方法将需要任務轉移的分片id進行執行個體化。

/**
     * 設定失效的分片項标記.
     * 
     * @param item 崩潰的作業項
     */
    public void setCrashedFailoverFlag(final int item) {
        if (!isFailoverAssigned(item)) {
            jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
        }
    }      

然後接下來調用FailoverService的failoverIfNessary方法,首先判斷是否需要失敗轉移,如果可以需要則隻需作業失敗轉移。

/**
     * 如果需要失效轉移, 則執行作業失效轉移.
     */
    public void failoverIfNecessary() {
        if (needFailover()) {
            jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
        }
    }      

在needFailover方法會對是否需要失效轉移進行判斷

private boolean needFailover() {
         // `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 有失效轉移的作業分片項
        return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
                // 目前作業不在運作中
                && !JobRegistry.getInstance().isJobRunning(jobName);
    }      

條件一:${JOB_NAME}/leader/failover/items/${ITEM_ID} 有失效轉移的作業分片項。

條件二:目前作業不在運作中。此條件即是上文送出的作業節點空閑的定義。失效轉移: 運作中的作業伺服器崩潰不會導緻重新分片,隻會在下次作業啟動時分片。啟用失效轉移功能可以在本次作業執行過程中,監測其他作業伺服器【空閑】,抓取未完成的孤兒分片項執行

在FailoverLeaderExecutionCallback中回調邏輯如下:

(1)也會首先判斷是否需要失效轉移,

(2)從注冊中心獲得一個 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作業分片項,

(3)在注冊中心節點`${JOB_NAME}/sharding/${ITEM_ID}/failover` 作業分片項 為 目前作業節點,

(4)然後移除任務轉移分片項,

(5)最後調用執行,送出任務

class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
   
   @Override
   public void execute() {
       // 判斷需要失效轉移
       if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
           return;
       }
       // 獲得一個 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作業分片項
       int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
       log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
       // 設定這個 `${JOB_NAME}/sharding/${ITEM_ID}/failover` 作業分片項 為 目前作業節點
       jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
       // 移除這個 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作業分片項
       jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
       // TODO 不應使用triggerJob, 而是使用executor統一排程 疑問:為什麼要用executor統一,後面研究下
       // 觸發作業執行
       JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
       if (null != jobScheduleController) {
           jobScheduleController.triggerJob();
       }
   }
}      
@Override
    public ShardingContexts getShardingContexts() {
        //是否失敗轉移
        boolean isFailover = configService.load(true).isFailover();
        if (isFailover) {
            //擷取失敗分片
            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
            if (!failoverShardingItems.isEmpty()) {
                //執行失敗分片任務
                return executionContextService.getJobShardingContext(failoverShardingItems);
            }
        }
        //重新分片
        shardingService.shardingIfNecessary();
        List<Integer> shardingItems = shardingService.getLocalShardingItems();
        //移除已經失敗轉移執行的任務
        if (isFailover) {
            shardingItems.removeAll(failoverService.getLocalTakeOffItems());
        }
        shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
        return executionContextService.getJobShardingContext(shardingItems);
    }