在上一篇部落格Elastic-Job原理--任務排程處理(四)我們已經了解到Elastic-Job依賴quartz定時任務執行分片任務的過程,這篇部落格我們簡單了解一下Elastic-Job中當某個伺服器節點與注冊中心斷開連接配接(無法進行任務執行)時其需要執行的任務轉移到其他節點的過程。
首先提供如下類圖,與節點任務失敗轉移相關主要類如下:
FailoverService,作業失效轉移服務。
FailoverNode,作業失效轉移資料存儲路徑。
FailoverListenerManager,作業失效轉移監聽管理器。
一、重新分片
當伺服器節點從注冊中心zk斷開連接配接時,Elastic-job需要做的一件事情是需要在下次任務執行前進行重新分片,當zk節點數目發生變更時,會引發ListenServersChangedJobListener監聽器調用,此監聽器會調用shardingService的重新分片标志設定方法,這樣再下次任務執行前會重新進行任務分片操作。
/**
* 當執行個體節點變更時會調用此監聽器
*
*/
class ListenServersChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
//如果節點數目發生變更則設定重新分片标志,下次任務執行前會進行重新分片
if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
shardingService.setReshardingFlag();
}
}
private boolean isInstanceChange(final Type eventType, final String path) {
return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
}
private boolean isServerChange(final String path) {
return serverNode.isServerPath(path);
}
}
任務重新分片隻是解決了下次任務執行時,所有的分片任務都是分布到各個執行個體中,但是目前失效的任務是如何處理的。
二、任務失效轉移
所謂失效轉移,就是在執行任務的過程中遇見異常的情況,這個分片任務可以在其他節點再次執行。這個和上面的HA不同,對于HA,上面如果任務終止,那麼不會在其他任務執行個體上再次重新執行。Job的失效轉移監聽來源于FailoverListenerManager中JobCrashedJobListener的dataChanged方法。FailoverListenerManager監聽的是zk的instance節點删除事件。如果任務配置了failover等于true,其中某個instance與zk失去聯系或被删除,并且失效的節點又不是本身,就會觸發失效轉移邏輯。首先,在某個任務執行個體elastic-job會在leader節點下面建立failover節點以及items節點。items節點下會有失效任務執行個體的原本應該做的分片好。比如,失效的任務執行個體原來負責分片1和2。那麼items節點下就會有名字叫1的子節點,就代表分片1需要轉移到其他節點上去運作。如下圖:
當節點任務失效時會調用JobCrashedJobListener監聽器,此監聽器會根據執行個體id擷取所有的分片,然後調用FailoverService的setCrashedFailoverFlag方法,将每個分片id寫到/jobName/leader/failover/items下
/**
* 任務失效時會調用這個監聽器
*/
class JobCrashedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
}
//會将所有的分片初始化到注冊中心中
List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
if (!failoverItems.isEmpty()) {
for (int each : failoverItems) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
} else {
for (int each : shardingService.getShardingItems(jobInstanceId)) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
}
}
}
}
在FailoverService方法中調用setCrashedFailoverFlag方法将需要任務轉移的分片id進行執行個體化。
/**
* 設定失效的分片項标記.
*
* @param item 崩潰的作業項
*/
public void setCrashedFailoverFlag(final int item) {
if (!isFailoverAssigned(item)) {
jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
}
}
然後接下來調用FailoverService的failoverIfNessary方法,首先判斷是否需要失敗轉移,如果可以需要則隻需作業失敗轉移。
/**
* 如果需要失效轉移, 則執行作業失效轉移.
*/
public void failoverIfNecessary() {
if (needFailover()) {
jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
}
}
在needFailover方法會對是否需要失效轉移進行判斷
private boolean needFailover() {
// `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 有失效轉移的作業分片項
return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
// 目前作業不在運作中
&& !JobRegistry.getInstance().isJobRunning(jobName);
}
條件一:${JOB_NAME}/leader/failover/items/${ITEM_ID} 有失效轉移的作業分片項。
條件二:目前作業不在運作中。此條件即是上文送出的作業節點空閑的定義。失效轉移: 運作中的作業伺服器崩潰不會導緻重新分片,隻會在下次作業啟動時分片。啟用失效轉移功能可以在本次作業執行過程中,監測其他作業伺服器【空閑】,抓取未完成的孤兒分片項執行
在FailoverLeaderExecutionCallback中回調邏輯如下:
(1)也會首先判斷是否需要失效轉移,
(2)從注冊中心獲得一個 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作業分片項,
(3)在注冊中心節點`${JOB_NAME}/sharding/${ITEM_ID}/failover` 作業分片項 為 目前作業節點,
(4)然後移除任務轉移分片項,
(5)最後調用執行,送出任務
class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
// 判斷需要失效轉移
if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
return;
}
// 獲得一個 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作業分片項
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
// 設定這個 `${JOB_NAME}/sharding/${ITEM_ID}/failover` 作業分片項 為 目前作業節點
jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
// 移除這個 `${JOB_NAME}/leader/failover/items/${ITEM_ID}` 作業分片項
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
// TODO 不應使用triggerJob, 而是使用executor統一排程 疑問:為什麼要用executor統一,後面研究下
// 觸發作業執行
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (null != jobScheduleController) {
jobScheduleController.triggerJob();
}
}
}
@Override
public ShardingContexts getShardingContexts() {
//是否失敗轉移
boolean isFailover = configService.load(true).isFailover();
if (isFailover) {
//擷取失敗分片
List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
if (!failoverShardingItems.isEmpty()) {
//執行失敗分片任務
return executionContextService.getJobShardingContext(failoverShardingItems);
}
}
//重新分片
shardingService.shardingIfNecessary();
List<Integer> shardingItems = shardingService.getLocalShardingItems();
//移除已經失敗轉移執行的任務
if (isFailover) {
shardingItems.removeAll(failoverService.getLocalTakeOffItems());
}
shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
return executionContextService.getJobShardingContext(shardingItems);
}