背景
公司選用elasticjob作為分布式任務排程工具,版本2.1.5,其中有一個任務對應機器兩台,A和B,任務總分片數是4,A對應0、1,B對應分片2、3。任務每晚23:30:00執行,T日計算資料記錄日期T+1,供T+1日使用。
突然有一天,A機器在22:24運作執行起分片2任務,執行完分片2之後又執行了分片3,注意分片2、3本該是機器B所用的分片,這兩點就很奇怪了,異常的時間執行了不屬于自己的分片,而且是一個一個執行。下方是日志中記錄的task id
// 異常的task id,很清楚的看到分片是@[email protected],機器A在執行
"taskId":"[email protected]@[email protected]@[email protected]@A機器[email protected]@4927"
// 正常的task id,分片号@2,[email protected],ip對應的是機器B
"taskId":"[email protected]@2,[email protected]@[email protected]@B機器[email protected]@12384"
經過分析,基本斷定是進任務失效轉移邏輯了。但是,為什麼任務失效轉移呢?任務不在執行的時間點,而且也沒有執行中,不可能出現這個情況。
經過回憶,22:23的時候,開發對機器B做了一次記憶體dump,與A機器啟動相差一分鐘,可能問題出在這裡了。難道對B機器做dump操作導緻B短暫與ZK注冊中心斷開了嗎,導緻誤以為伺服器當機?帶着問題,我們又對B做了一次dump,很快,證明了我們的猜測,如下圖所示,分片2正在運作,實際執行分片2的是機器A,而且辨別也很清楚,失效轉移
但是,按照官方的說法,失效轉移是指運作中的作業伺服器崩潰不會導緻重新分片,隻會在下次作業啟動時分片。啟用失效轉移功能可以在本次作業執行過程中,監測其他作業伺服器空閑,抓取未完成的孤兒分片項執行。
運作中的作業伺服器崩潰不會導緻重新分片,經過确認,當時我們的任務并不在執行中,這個就很奇怪了,帶着這些疑問我們深入elasticjob源碼。
失效轉移
失效轉移相關邏輯入口在
FailoverListenerManager#JobCrashedJobListener
,實際處理是
FailoverService
。當一台伺服器當機後會觸發一個事件
Type.NODE_REMOVED
,elastic-job根據這個事件來進行相關的處理,相關過程都注釋在代碼裡了。
為了輔助了解,文末附錄了一個注冊中心目錄和節點結構圖。
public final class FailoverListenerManager extends AbstractListenerManager {
// ...
private final FailoverService failoverService;
private final ShardingService shardingService;
class JobCrashedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
// 1失效轉移開啟、2注冊中心事件-節點移除,也就是一台伺服器下線、3是instance路徑,即jobName/instances路徑
if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
// path,jobName/instances/[email protected]@pid
// jobInstanceId是這個樣子的[email protected]@pid
String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
// 如果jobInstanceId和目前機器一緻,直接跳過
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
}
// 擷取失效轉移的分片,對應zk目錄jobName/sharding/分片号/failover,失效轉移分片對應的執行個體id
List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
if (!failoverItems.isEmpty()) {
// 如果有jobInstanceId的失效轉移分片
for (int each : failoverItems) {
// 把分片存放到目錄leader/failover/items
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
} else {
// 擷取如果jobInstanceId沒有失效轉移分片對應的分片,然後存放到目錄leader/failover/items/分片号,執行分片分片失效轉移
// 從這裡看隻要是伺服器當機就一定要執行時效轉移邏輯了,其實也不是,
// shardingService.getShardingItems(jobInstanceId)會判斷伺服器是否還可用,不可用的話傳回的分片集合就是空的
// 但是,針對dump對記憶體導緻的伺服器短暫的不可用,則有可能出現錯誤,我們的任務異常啟動就出現這裡
for (int each : shardingService.getShardingItems(jobInstanceId)) {
// 把分片存放到目錄leader/failover/items
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
}
}
}
}
// ...
}
public final class FailoverService {
/**
* 如果需要失效轉移, 則執行作業失效轉移.
*/
public void failoverIfNecessary() {
if (needFailover()) {
jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
}
}
// 判斷leader/failover/items下是否有節點
// failoverService.setCrashedFailoverFlag(分片号);方法就是往leader/failover/items目錄下存節點,也就是執行了setCrashedFailoverFlag方法後,needFailover()是true
private boolean needFailover() {
return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
&& !JobRegistry.getInstance().isJobRunning(jobName);
}
/**
* 擷取作業伺服器的失效轉移分片項集合.
*
* @param jobInstanceId 作業運作執行個體主鍵
* @return 作業失效轉移的分片項集合
*/
public List<Integer> getFailoverItems(final String jobInstanceId) {
// 作業分片
List<String> items = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT);
List<Integer> result = new ArrayList<>(items.size());
for (String each : items) {
int item = Integer.parseInt(each);
// 擷取目錄sharding/分片号/failover下的節點
String node = FailoverNode.getExecutionFailoverNode(item);
// 确認jobName/sharding/分片号/failover下的執行個體是否和失效的jobInstanceId一緻,如果是的話就加入到失效分片集合
if (jobNodeStorage.isJobNodeExisted(node) && jobInstanceId.equals(jobNodeStorage.getJobNodeDataDirectly(node))) {
result.add(item);
}
}
Collections.sort(result);
return result;
}
class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
// 判斷本機是否停止排程任務了以及是否需要失效轉移
if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
return;
}
// leader/failover/items下擷取失效轉移的分片
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
// 目錄下sharding/分片号/failover下建立節點,辨別失效轉移正在執行中
jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
// 删除分片失效轉移記錄
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
// TODO 不應使用triggerJob, 而是使用executor統一排程
// 執行失效轉移作業
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (null != jobScheduleController) {
jobScheduleController.triggerJob();
}
}
}
}
public final class ShardingService {
/**
* 設定需要重新分片的标記.
*/
public void setReshardingFlag() {
jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
}
/**
* 擷取作業運作執行個體的分片項集合.
*
* @param jobInstanceId 作業運作執行個體主鍵
* @return 作業運作執行個體的分片項集合
*/
public List<Integer> getShardingItems(final String jobInstanceId) {
JobInstance jobInstance = new JobInstance(jobInstanceId);
// 伺服器可用,即servers/目錄及jobName/instances目錄下存在對應的ip
if (!serverService.isAvailableServer(jobInstance.getIp())) {
return Collections.emptyList();
}
List<Integer> result = new LinkedList<>();
// 擷取所有分片
int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount();
for (int i = 0; i < shardingTotalCount; i++) {
// 找到當機伺服器對應的分片
if (jobInstance.getJobInstanceId().equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {
result.add(i);
}
}
return result;
}
}
結論
dump堆記憶體導緻伺服器B短暫不可用,與注冊中心斷開連接配接,觸發了注冊中心zk節點删除事件,伺服器A監聽到事件後執行失效轉移邏輯,當伺服器A去擷取伺服器B對應的分片時,伺服器B又恢複了工作,這時伺服器A拿到了服務B的兩個分片2、3,依次執行失效轉移邏輯,這就是為什麼dump B之後A開始執行B的兩個分片。
class JobCrashedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
// 1失效轉移開啟、2注冊中心事件-節點移除,也就是一台伺服器下線、3是instance路徑,即jobName/instances路徑
if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
...
List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
if (!failoverItems.isEmpty()) {
...
} else {
// 擷取如果jobInstanceId沒有失效轉移分片對應的分片,然後存放到目錄leader/failover/items/分片号,執行分片分片失效轉移
// 從這裡看隻要是伺服器當機就一定要執行時效轉移邏輯了,其實也不是,
// shardingService.getShardingItems(jobInstanceId)會判斷伺服器是否還可用,不可用的話傳回的分片集合就是空的
// 但是,針對dump對記憶體導緻的伺服器短暫的不可用,則有可能出現錯誤,我們的任務異常啟動就出現這裡
for (int each : shardingService.getShardingItems(jobInstanceId)) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
}
}
}
}
附 zk注冊中心任務記錄
關于ElasticJob源碼的知識點可以參考我的另一篇文章
https://blog.csdn.net/yangfangjit/article/details/105336278