先上總結:
- 可以disable job或者server,或者某個server上的job執行個體
- 還是會定時排程,但排程過程發現為disable後不運作job
disable 入口代碼:
private void disableOrEnableJobs(final Optional<String> jobName, final Optional<String> serverIp, final boolean disabled) {
Preconditions.checkArgument(jobName.isPresent() || serverIp.isPresent(), "At least indicate jobName or serverIp.");
if (jobName.isPresent() && serverIp.isPresent()) {
persistDisabledOrEnabledJob(jobName.get(), serverIp.get(), disabled);
} else if (jobName.isPresent()) {
JobNodePath jobNodePath = new JobNodePath(jobName.get());
for (String each : regCenter.getChildrenKeys(jobNodePath.getServerNodePath())) {
if (disabled) {
regCenter.persist(jobNodePath.getServerNodePath(each), "DISABLED");
} else {
regCenter.persist(jobNodePath.getServerNodePath(each), "");
}
}
} else if (serverIp.isPresent()) {
List<String> jobNames = regCenter.getChildrenKeys("/");
for (String each : jobNames) {
if (regCenter.isExisted(new JobNodePath(each).getServerNodePath(serverIp.get()))) {
persistDisabledOrEnabledJob(each, serverIp.get(), disabled);
}
}
}
}
disable的nodepath 為 /test-disablejobName/servers/192.168.157.1,即按server 進行disable
JOB排程執行函數為類AbstractElasticJobExecutor的如下代碼:
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
if (shardingContexts.getShardingItemParameters().isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
}
return;
}
上面代碼的getShardingItemParameters 會傳回empty,shardingContexts的擷取來自以下代碼:
@Override
public ShardingContexts getShardingContexts() {
boolean isFailover = configService.load(true).isFailover();
if (isFailover) {
List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
if (!failoverShardingItems.isEmpty()) {
return executionContextService.getJobShardingContext(failoverShardingItems);
}
}
shardingService.shardingIfNecessary();
List<Integer> shardingItems = shardingService.getLocalShardingItems();
if (isFailover) {
shardingItems.removeAll(failoverService.getLocalTakeOffItems());
}
shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
return executionContextService.getJobShardingContext(shardingItems);
}
而關鍵的擷取getLocalShardingItems,本地sharding item的相關代碼如下:
/**
* 擷取運作在本作業執行個體的分片項集合.
*
* @return 運作在本作業執行個體的分片項集合
*/
public List<Integer> getLocalShardingItems() {
if (JobRegistry.getInstance().isShutdown(jobName) || !serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) {
return Collections.emptyList();
}
return getShardingItems(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}
/**
* 判斷作業伺服器是否可用.
*
* @param ip 作業伺服器IP位址
* @return 作業伺服器是否可用
*/
public boolean isAvailableServer(final String ip) {
return isEnableServer(ip) && hasOnlineInstances(ip);
}
private boolean hasOnlineInstances(final String ip) {
for (String each : jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)) {
if (each.startsWith(ip)) {
return true;
}
}
return false;
}
/**
* 判斷伺服器是否啟用.
*
* @param ip 作業伺服器IP位址
* @return 伺服器是否啟用
*/
public boolean isEnableServer(final String ip) {
return !ServerStatus.DISABLED.name().equals(jobNodeStorage.getJobNodeData(serverNode.getServerNode(ip)));
}
注意以上的hasOnlineInstances函數及isEnableServer,去zookeeper擷取的相關path裡面的狀态被disabled。