天天看點

elasticjob disable JOB

先上總結:

  1. 可以disable job或者server,或者某個server上的job執行個體
  2. 還是會定時排程,但排程過程發現為disable後不運作job

disable 入口代碼:

private void disableOrEnableJobs(final Optional<String> jobName, final Optional<String> serverIp, final boolean disabled) {
        Preconditions.checkArgument(jobName.isPresent() || serverIp.isPresent(), "At least indicate jobName or serverIp.");
        if (jobName.isPresent() && serverIp.isPresent()) {
            persistDisabledOrEnabledJob(jobName.get(), serverIp.get(), disabled);
        } else if (jobName.isPresent()) {
            JobNodePath jobNodePath = new JobNodePath(jobName.get());
            for (String each : regCenter.getChildrenKeys(jobNodePath.getServerNodePath())) {
                if (disabled) {
                    regCenter.persist(jobNodePath.getServerNodePath(each), "DISABLED");
                } else {
                    regCenter.persist(jobNodePath.getServerNodePath(each), "");
                }
            }
        } else if (serverIp.isPresent()) {
            List<String> jobNames = regCenter.getChildrenKeys("/");
            for (String each : jobNames) {
                if (regCenter.isExisted(new JobNodePath(each).getServerNodePath(serverIp.get()))) {
                    persistDisabledOrEnabledJob(each, serverIp.get(), disabled);
                }
            }
        }
    }
           

disable的nodepath 為 /test-disablejobName/servers/192.168.157.1,即按server 進行disable

JOB排程執行函數為類AbstractElasticJobExecutor的如下代碼:

private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
        if (shardingContexts.getShardingItemParameters().isEmpty()) {
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
            }
            return;
        }
           

上面代碼的getShardingItemParameters 會傳回empty,shardingContexts的擷取來自以下代碼:

@Override
    public ShardingContexts getShardingContexts() {
        boolean isFailover = configService.load(true).isFailover();
        if (isFailover) {
            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
            if (!failoverShardingItems.isEmpty()) {
                return executionContextService.getJobShardingContext(failoverShardingItems);
            }
        }
        shardingService.shardingIfNecessary();
        List<Integer> shardingItems = shardingService.getLocalShardingItems();
        if (isFailover) {
            shardingItems.removeAll(failoverService.getLocalTakeOffItems());
        }
        shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
        return executionContextService.getJobShardingContext(shardingItems);
    }
           

而關鍵的擷取getLocalShardingItems,本地sharding item的相關代碼如下:

/**
     * 擷取運作在本作業執行個體的分片項集合.
     * 
     * @return 運作在本作業執行個體的分片項集合
     */
    public List<Integer> getLocalShardingItems() {
        if (JobRegistry.getInstance().isShutdown(jobName) || !serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) {
            return Collections.emptyList();
        }
        return getShardingItems(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
    }


/**
     * 判斷作業伺服器是否可用.
     * 
     * @param ip 作業伺服器IP位址
     * @return 作業伺服器是否可用
     */
    public boolean isAvailableServer(final String ip) {
        return isEnableServer(ip) && hasOnlineInstances(ip);
    }
    
    private boolean hasOnlineInstances(final String ip) {
        for (String each : jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)) {
            if (each.startsWith(ip)) {
                return true;
            }
        }
        return false;
    }
    
    /**
     * 判斷伺服器是否啟用.
     *
     * @param ip 作業伺服器IP位址
     * @return 伺服器是否啟用
     */
    public boolean isEnableServer(final String ip) {
        return !ServerStatus.DISABLED.name().equals(jobNodeStorage.getJobNodeData(serverNode.getServerNode(ip)));
    }
           

注意以上的hasOnlineInstances函數及isEnableServer,去zookeeper擷取的相關path裡面的狀态被disabled。

繼續閱讀