天天看點

elasticjob任務失效轉移異常背景

背景

公司選用elasticjob作為分布式任務排程工具,版本2.1.5,其中有一個任務對應機器兩台,A和B,任務總分片數是4,A對應0、1,B對應分片2、3。任務每晚23:30:00執行,T日計算資料記錄日期T+1,供T+1日使用。

突然有一天,A機器在22:24運作執行起分片2任務,執行完分片2之後又執行了分片3,注意分片2、3本該是機器B所用的分片,這兩點就很奇怪了,異常的時間執行了不屬于自己的分片,而且是一個一個執行。下方是日志中記錄的task id

// 異常的task id,很清楚的看到分片是@[email protected],機器A在執行
"taskId":"[email protected]@[email protected]@[email protected]@A機器[email protected]@4927"

// 正常的task id,分片号@2,[email protected],ip對應的是機器B
"taskId":"[email protected]@2,[email protected]@[email protected]@B機器[email protected]@12384"
           

經過分析,基本斷定是進任務失效轉移邏輯了。但是,為什麼任務失效轉移呢?任務不在執行的時間點,而且也沒有執行中,不可能出現這個情況。

經過回憶,22:23的時候,開發對機器B做了一次記憶體dump,與A機器啟動相差一分鐘,可能問題出在這裡了。難道對B機器做dump操作導緻B短暫與ZK注冊中心斷開了嗎,導緻誤以為伺服器當機?帶着問題,我們又對B做了一次dump,很快,證明了我們的猜測,如下圖所示,分片2正在運作,實際執行分片2的是機器A,而且辨別也很清楚,失效轉移

elasticjob任務失效轉移異常背景

但是,按照官方的說法,失效轉移是指運作中的作業伺服器崩潰不會導緻重新分片,隻會在下次作業啟動時分片。啟用失效轉移功能可以在本次作業執行過程中,監測其他作業伺服器空閑,抓取未完成的孤兒分片項執行。

運作中的作業伺服器崩潰不會導緻重新分片,經過确認,當時我們的任務并不在執行中,這個就很奇怪了,帶着這些疑問我們深入elasticjob源碼。

失效轉移

失效轉移相關邏輯入口在

FailoverListenerManager#JobCrashedJobListener

,實際處理是

FailoverService

。當一台伺服器當機後會觸發一個事件

Type.NODE_REMOVED

,elastic-job根據這個事件來進行相關的處理,相關過程都注釋在代碼裡了。

為了輔助了解,文末附錄了一個注冊中心目錄和節點結構圖。

public final class FailoverListenerManager extends AbstractListenerManager {
    
    // ...
    private final FailoverService failoverService;
    private final ShardingService shardingService;
    
    class JobCrashedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            // 1失效轉移開啟、2注冊中心事件-節點移除,也就是一台伺服器下線、3是instance路徑,即jobName/instances路徑
            if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
                // path,jobName/instances/[email protected]@pid
                // jobInstanceId是這個樣子的[email protected]@pid
                String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
                // 如果jobInstanceId和目前機器一緻,直接跳過
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
                    return;
                }
                // 擷取失效轉移的分片,對應zk目錄jobName/sharding/分片号/failover,失效轉移分片對應的執行個體id
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
                if (!failoverItems.isEmpty()) {
                    // 如果有jobInstanceId的失效轉移分片
                    for (int each : failoverItems) {
                    	// 把分片存放到目錄leader/failover/items
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                } else {
                    // 擷取如果jobInstanceId沒有失效轉移分片對應的分片,然後存放到目錄leader/failover/items/分片号,執行分片分片失效轉移
                    // 從這裡看隻要是伺服器當機就一定要執行時效轉移邏輯了,其實也不是,
                    // shardingService.getShardingItems(jobInstanceId)會判斷伺服器是否還可用,不可用的話傳回的分片集合就是空的
                    // 但是,針對dump對記憶體導緻的伺服器短暫的不可用,則有可能出現錯誤,我們的任務異常啟動就出現這裡
                    for (int each : shardingService.getShardingItems(jobInstanceId)) {
                         // 把分片存放到目錄leader/failover/items
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                }
            }
        }
    }
    
    // ...
}
           
public final class FailoverService {
    
    /**
     * 如果需要失效轉移, 則執行作業失效轉移.
     */
    public void failoverIfNecessary() {
        if (needFailover()) {
            jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
        }
    }
    
    // 判斷leader/failover/items下是否有節點
    // failoverService.setCrashedFailoverFlag(分片号);方法就是往leader/failover/items目錄下存節點,也就是執行了setCrashedFailoverFlag方法後,needFailover()是true
    private boolean needFailover() {
        return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
                && !JobRegistry.getInstance().isJobRunning(jobName);
    }
    
    /**
     * 擷取作業伺服器的失效轉移分片項集合.
     * 
     * @param jobInstanceId 作業運作執行個體主鍵
     * @return 作業失效轉移的分片項集合
     */
    public List<Integer> getFailoverItems(final String jobInstanceId) {
        // 作業分片
        List<String> items = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT);
        List<Integer> result = new ArrayList<>(items.size());
        for (String each : items) {
            int item = Integer.parseInt(each);
            // 擷取目錄sharding/分片号/failover下的節點
            String node = FailoverNode.getExecutionFailoverNode(item);
            // 确認jobName/sharding/分片号/failover下的執行個體是否和失效的jobInstanceId一緻,如果是的話就加入到失效分片集合
            if (jobNodeStorage.isJobNodeExisted(node) && jobInstanceId.equals(jobNodeStorage.getJobNodeDataDirectly(node))) {
                result.add(item);
            }
        }
        Collections.sort(result);
        return result;
    }
    
    class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
        
        @Override
        public void execute() {
            // 判斷本機是否停止排程任務了以及是否需要失效轉移
            if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
                return;
            }
            // leader/failover/items下擷取失效轉移的分片
            int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
            log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
            // 目錄下sharding/分片号/failover下建立節點,辨別失效轉移正在執行中
            jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
            // 删除分片失效轉移記錄
            jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
            // TODO 不應使用triggerJob, 而是使用executor統一排程
            // 執行失效轉移作業
            JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
            if (null != jobScheduleController) {
                jobScheduleController.triggerJob();
            }
        }
    }
}

           
public final class ShardingService {
    
    /**
     * 設定需要重新分片的标記.
     */
    public void setReshardingFlag() {
        jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
    }
   
    
    /**
     * 擷取作業運作執行個體的分片項集合.
     *
     * @param jobInstanceId 作業運作執行個體主鍵
     * @return 作業運作執行個體的分片項集合
     */
    public List<Integer> getShardingItems(final String jobInstanceId) {
        JobInstance jobInstance = new JobInstance(jobInstanceId);
        // 伺服器可用,即servers/目錄及jobName/instances目錄下存在對應的ip
        if (!serverService.isAvailableServer(jobInstance.getIp())) {
            return Collections.emptyList();
        }
        List<Integer> result = new LinkedList<>();
        // 擷取所有分片
        int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount();
        for (int i = 0; i < shardingTotalCount; i++) {
            // 找到當機伺服器對應的分片
            if (jobInstance.getJobInstanceId().equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
                result.add(i);
            }
        }
        return result;
    }
    
}
           

結論

dump堆記憶體導緻伺服器B短暫不可用,與注冊中心斷開連接配接,觸發了注冊中心zk節點删除事件,伺服器A監聽到事件後執行失效轉移邏輯,當伺服器A去擷取伺服器B對應的分片時,伺服器B又恢複了工作,這時伺服器A拿到了服務B的兩個分片2、3,依次執行失效轉移邏輯,這就是為什麼dump B之後A開始執行B的兩個分片。

class JobCrashedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            // 1失效轉移開啟、2注冊中心事件-節點移除,也就是一台伺服器下線、3是instance路徑,即jobName/instances路徑
            if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
                ...
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
                if (!failoverItems.isEmpty()) {
                    ... 
                } else {
                    // 擷取如果jobInstanceId沒有失效轉移分片對應的分片,然後存放到目錄leader/failover/items/分片号,執行分片分片失效轉移
                    // 從這裡看隻要是伺服器當機就一定要執行時效轉移邏輯了,其實也不是,
                    // shardingService.getShardingItems(jobInstanceId)會判斷伺服器是否還可用,不可用的話傳回的分片集合就是空的
                    // 但是,針對dump對記憶體導緻的伺服器短暫的不可用,則有可能出現錯誤,我們的任務異常啟動就出現這裡
                    for (int each : shardingService.getShardingItems(jobInstanceId)) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                }
            }
        }
    }
           

附 zk注冊中心任務記錄

elasticjob任務失效轉移異常背景

關于ElasticJob源碼的知識點可以參考我的另一篇文章

https://blog.csdn.net/yangfangjit/article/details/105336278