1. 概述
本文主要分享 Elastic-Job-Lite 主節點選舉。
建議前置閱讀:
- 《Elastic-Job-Lite 源碼分析 —— 注冊中心》
- 《Elastic-Job-Lite 源碼分析 —— 作業資料存儲》
- 《Elastic-Job-Lite 源碼分析 —— 注冊中心監聽器》
涉及到主要類的類圖如下( 打開大圖 ):
- 粉色的類在
包下,實作了 Elastic-Job-Lite 主節點選舉。com.dangdang.ddframe.job.lite.internal.election
你行好事會因為得到贊賞而愉悅
同理,開源項目貢獻者會因為 Star 而更加有動力
為 Elastic-Job 點贊!傳送門
2. 為什麼需要選舉主節點
首先我們來看一段官方對 Elastic-Job-Lite 的介紹:
Elastic-Job-Lite 定位為輕量級無中心化解決方案,使用 jar 包的形式提供分布式任務的協調服務。
無中心化,意味着 Elastic-Job-Lite 不存在一個中心執行一些操作,例如:配置設定作業分片項。Elastic-Job-Lite 選舉主節點,通過主節點進行作業分片項配置設定。目前,必須在主節點執行的操作有:配置設定作業分片項,調解分布式作業不一緻狀态。
另外,主節點的選舉是以作業為次元。例如:有一個 Elastic-Job-Lite 叢集有三個作業節點
A
、
B
、
C
,存在兩個作業
a
、
b
,可能
a
作業的主節點是
C
,
b
作業的主節點是
A
。
3. 選舉主節點
調用
LeaderService#electLeader()
選舉主節點。
大體流程如下( 打開大圖 ):
實作代碼如下:
// LeaderService.java
/**
* 選舉主節點.
*/
public void electLeader() {
log.debug("Elect a new leader now.");
jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
log.debug("Leader election completed.");
}
// JobNodeStorage.java
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
} catch (final Exception ex) {
handleException(ex);
}
}
// LeaderElectionExecutionCallback.java
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
if (!hasLeader()) { // 目前無主節點
jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}
}
}
- 使用 Curator LeaderLatch 分布式鎖,保證同一時間有且僅有一個工作節點能夠調用
方法執行主節點設定。Curator LeaderLatch 在《Elastic-Job-Lite 源碼分析 —— 注冊中心》「3.1 在主節點執行操作」有詳細解析。LeaderElectionExecutionCallback#execute()
- 在
為什麼要調用LeaderElectionExecutionCallback#execute()
呢?LeaderLatch 隻保證同一時間有且僅有一個工作節點,在獲得分布式鎖的工作節點結束邏輯後,第二個工作節點會開始邏輯,如果不判斷目前是否有主節點,原來的主節點會被覆寫。#hasLeader()
// LeaderService.java
/**
* 判斷是否已經有主節點.
*
* @return 是否已經有主節點
*/
public boolean hasLeader() {
return jobNodeStorage.isJobNodeExisted(LeaderNode.INSTANCE);
}
- 選舉成功後,Zookeeper 存儲作業的主節點:
為目前節點。該節點為臨時節點。/${JOB_NAME}/leader/electron/instance
[zk: localhost:2181(CONNECTED) 7] get /elastic-job-example-lite-java/javaSimpleJob/leader/election/instance
[email protected]@82496
選舉主節點時機
第一種,注冊作業啟動資訊時。
/**
* 注冊作業啟動資訊.
*
* @param enabled 作業是否啟用
*/
public void registerStartUpInfo(final boolean enabled) {
// .... 省略部分方法
// 選舉 主節點
leaderService.electLeader();
// .... 省略部分方法
}
- 新的作業啟動時,即能保證選舉出主節點。
- 當該作業不存在主節點時,目前作業節點成為主節點。
- 當該作業存在主節點,目前作業節主節點不變。
第二種,節點資料發生變化時。
class LeaderElectionJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {
leaderService.electLeader();
}
}
}
- 符合重新選舉主節點分成兩種情況。
- 主動選舉
#isActiveElection(...)
private boolean isActiveElection(final String path, final String data) {
return !leaderService.hasLeader() // 不存在主節點
&& isLocalServerEnabled(path, data); // 開啟作業
}
private boolean isLocalServerEnabled(final String path, final String data) {
return serverNode.isLocalServerPath(path)
&& !ServerStatus.DISABLED.name().equals(data);
}
-
- 當作業被禁用(
)時,作業是不存在主節點的。那有同學就有疑問了?LiteJobConfiguration.disabled = true
沒做這個限制呀,作業注冊作業啟動資訊時也進行了選舉。在「4. 删除主節點」小結,我們會解開這個答案。這裡大家先記住這個結論。LeaderService#electLeader()
- 根據上面我們說的結論,這裡就很好了解了,
方法判斷了兩個條件:( 1 ) 不存在主節點;( 2 ) 開啟作業,不再禁用,是以需要進行主節點選舉落。#isActiveElection()
- 這裡判斷開啟作業的方法
有點特殊,它不是通過作業節點是否處于開啟狀态,而是該資料不是将作業節點更新成關閉狀态。舉個例子:作業節點處于禁用狀态,使用運維平台設定作業節點開啟,會進行主節點選舉;作業節點處于開啟狀态,使用運維平台設定作業節點禁用,不會進行主節點選舉。#isLocalServerEnabled(...)
- 當作業被禁用(
- 被動選舉
#isPassiveElection(...)
private boolean isPassiveElection(final String path, final Type eventType) {
return isLeaderCrashed(path, eventType) // 主節點 Crashed
&& serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()); // 目前節點正在運作中(未挂掉)
}
private boolean isLeaderCrashed(final String path, final Type eventType) {
return leaderNode.isLeaderInstancePath(path) && Type.NODE_REMOVED == eventType;
}
-
- 當主節點因為各種情況( 「4. 删除主節點」會列舉 )被删除,需要重新進行選舉。對的,必須主節點被删除後才可以重新進行選舉。
-
方法判斷了兩個條件:( 1 ) 原主節點被删除;( 2 ) 目前節點正在運作中(未挂掉),可以參加主節點選舉。#isPassiveElection(...)
-
方法雖然命名帶有#isLeaderCrashed(...)
英文,實際主作業節點正常退出也符合被動選舉條件。Crashed
等待主節點選舉完成
必須在主節點執行的操作,執行之前,需要判斷目前節點是否為主節點。如果主節點已經選舉好,可以直接進行判斷。但是,不排除主節點還沒選舉到,因而需要阻塞等待到主節點選舉完成後才能進行判斷。
實作代碼如下:
// LeaderService.java
/**
* 判斷目前節點是否是主節點.
*
* 如果主節點正在選舉中而導緻取不到主節點, 則阻塞至主節點選舉完成再傳回.
*
* @return 目前節點是否是主節點
*/
public boolean isLeaderUntilBlock() {
// 不存在主節點 && 有可用的伺服器節點
while (!hasLeader() && serverService.hasAvailableServers()) {
log.info("Leader is electing, waiting for {} ms", 100);
BlockUtils.waitingShortTime();
if (!JobRegistry.getInstance().isShutdown(jobName)
&& serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) { // 目前伺服器節點可用
electLeader();
}
}
// 傳回目前節點是否是主節點
return isLeader();
}
- 調用
方法,選舉不到主節點進行等待,避免不間斷、無間隔的進行主節點選舉。BlockUtils#waitingShortTime()
4. 删除主節點
有主節點的選舉,必然有主節點的删除,否則怎麼進行重新選舉。
實作代碼如下:
// LeaderService.java
/**
* 删除主節點供重新選舉.
*/
public void removeLeader() {
jobNodeStorage.removeJobNodeIfExisted(LeaderNode.INSTANCE);
}
删除主節點時機
第一種,主節點程序正常關閉時。
public final class JobShutdownHookPlugin extends ShutdownHookPlugin {
@Override
public void shutdown() {
CoordinatorRegistryCenter regCenter = JobRegistry.getInstance().getRegCenter(jobName);
if (null == regCenter) {
return;
}
LeaderService leaderService = new LeaderService(regCenter, jobName);
if (leaderService.isLeader()) {
leaderService.removeLeader(); // 移除主節點
}
new InstanceService(regCenter, jobName).removeInstance();
}
}
- 這個比較好了解,退出程序,若該程序為主節點,需要将自己移除。
第二種,主節點程序 CRASHED 時。
${JOB_NAME}/leader/electron/instance
是臨時節點,主節點程序 CRASHED 後,超過最大會話時間,Zookeeper 自動進行删除,觸發重新選舉邏輯。
第三種,作業被禁用時。
class LeaderAbdicationJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
leaderService.removeLeader();
}
}
private boolean isLocalServerDisabled(final String path, final String data) {
return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
}
}
- 這裡就解答上面我們遺留的疑問。被禁用的作業注冊作業啟動資訊時即使進行了主節點選舉,也會被該監聽器處理,移除該選舉的主節點。
第四種,主節點程序遠端關閉。
// InstanceShutdownStatusJobListener.java
class InstanceShutdownStatusJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().isShutdown(jobName)
&& !JobRegistry.getInstance().getJobScheduleController(jobName).isPaused() // 作業未暫停排程
&& isRemoveInstance(path, eventType) // 移除【運作執行個體】事件
&& !isReconnectedRegistryCenter()) { // 運作執行個體被移除
schedulerFacade.shutdownInstance();
}
}
private boolean isRemoveInstance(final String path, final Type eventType) {
return instanceNode.isLocalInstancePath(path) && Type.NODE_REMOVED == eventType;
}
private boolean isReconnectedRegistryCenter() {
return instanceService.isLocalJobInstanceExisted();
}
}
// SchedulerFacade.java
/**
* 終止作業排程.
*/
public void shutdownInstance() {
if (leaderService.isLeader()) {
leaderService.removeLeader(); // 移除主節點
}
monitorService.close();
if (reconcileService.isRunning()) {
reconcileService.stopAsync();
}
JobRegistry.getInstance().shutdown(jobName);
}
遠端關閉作業節點有兩種方式:
- zkClient 發起指令:
。rmr /${NAMESPACE}/${JOB_NAME}/instances/${JOB_INSTANCE_ID}
- 運維平台發起
操作。Shutdown
操作實質上就是第一種。Shutdown