天天看點

Elastic-Job-Lite 源碼分析 —— 主節點選舉1. 概述2. 為什麼需要選舉主節點3. 選舉主節點4. 删除主節點

1. 概述

本文主要分享 Elastic-Job-Lite 主節點選舉。

建議前置閱讀:

  • 《Elastic-Job-Lite 源碼分析 —— 注冊中心》
  • 《Elastic-Job-Lite 源碼分析 —— 作業資料存儲》
  • 《Elastic-Job-Lite 源碼分析 —— 注冊中心監聽器》

涉及到主要類的類圖如下( 打開大圖 ):

Elastic-Job-Lite 源碼分析 —— 主節點選舉1. 概述2. 為什麼需要選舉主節點3. 選舉主節點4. 删除主節點
  • 粉色的類在 

    com.dangdang.ddframe.job.lite.internal.election

     包下,實作了 Elastic-Job-Lite 主節點選舉。

你行好事會因為得到贊賞而愉悅

同理,開源項目貢獻者會因為 Star 而更加有動力

為 Elastic-Job 點贊!傳送門

2. 為什麼需要選舉主節點

首先我們來看一段官方對 Elastic-Job-Lite 的介紹:

Elastic-Job-Lite 定位為輕量級無中心化解決方案,使用 jar 包的形式提供分布式任務的協調服務。

無中心化,意味着 Elastic-Job-Lite 不存在一個中心執行一些操作,例如:配置設定作業分片項。Elastic-Job-Lite 選舉主節點,通過主節點進行作業分片項配置設定。目前,必須在主節點執行的操作有:配置設定作業分片項,調解分布式作業不一緻狀态。

另外,主節點的選舉是以作業為次元。例如:有一個 Elastic-Job-Lite 叢集有三個作業節點 

A

B

C

,存在兩個作業 

a

b

,可能 

a

 作業的主節點是 

C

b

 作業的主節點是 

A

3. 選舉主節點

調用 

LeaderService#electLeader()

 選舉主節點。

大體流程如下( 打開大圖 ):

Elastic-Job-Lite 源碼分析 —— 主節點選舉1. 概述2. 為什麼需要選舉主節點3. 選舉主節點4. 删除主節點

實作代碼如下:

// LeaderService.java
/**
* 選舉主節點.
*/
public void electLeader() {
   log.debug("Elect a new leader now.");
   jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
   log.debug("Leader election completed.");
}

// JobNodeStorage.java
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
   try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
       latch.start();
       latch.await();
       callback.execute();
   } catch (final Exception ex) {
       handleException(ex);
   }
}

// LeaderElectionExecutionCallback.java
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
   
   @Override
   public void execute() {
       if (!hasLeader()) { // 目前無主節點
           jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
       }
   }
}
           
  • 使用 Curator LeaderLatch 分布式鎖,保證同一時間有且僅有一個工作節點能夠調用 

    LeaderElectionExecutionCallback#execute()

     方法執行主節點設定。Curator LeaderLatch 在《Elastic-Job-Lite 源碼分析 —— 注冊中心》「3.1 在主節點執行操作」有詳細解析。
  • 在 

    LeaderElectionExecutionCallback#execute()

     為什麼要調用 

    #hasLeader()

     呢?LeaderLatch 隻保證同一時間有且僅有一個工作節點,在獲得分布式鎖的工作節點結束邏輯後,第二個工作節點會開始邏輯,如果不判斷目前是否有主節點,原來的主節點會被覆寫。
// LeaderService.java
/**
 * 判斷是否已經有主節點.
 * 
 * @return 是否已經有主節點
 */
public boolean hasLeader() {
    return jobNodeStorage.isJobNodeExisted(LeaderNode.INSTANCE);
}
           
  • 選舉成功後,Zookeeper 存儲作業的主節點:

    /${JOB_NAME}/leader/electron/instance

     為目前節點。該節點為臨時節點。 
[zk: localhost:2181(CONNECTED) 7] get /elastic-job-example-lite-java/javaSimpleJob/leader/election/instance
[email protected]@82496
           

選舉主節點時機

第一種,注冊作業啟動資訊時。

/**
* 注冊作業啟動資訊.
* 
* @param enabled 作業是否啟用
*/
public void registerStartUpInfo(final boolean enabled) {
   // .... 省略部分方法
   // 選舉 主節點
   leaderService.electLeader();
   // .... 省略部分方法
}
           
  • 新的作業啟動時,即能保證選舉出主節點。
    • 當該作業不存在主節點時,目前作業節點成為主節點。
    • 當該作業存在主節點,目前作業節主節點不變。

第二種,節點資料發生變化時。

class LeaderElectionJobListener extends AbstractJobListener {
   
   @Override
   protected void dataChanged(final String path, final Type eventType, final String data) {
       if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {
           leaderService.electLeader();
       }
   }
}
           
  • 符合重新選舉主節點分成兩種情況。
  • 主動選舉 

    #isActiveElection(...)

private boolean isActiveElection(final String path, final String data) {
    return !leaderService.hasLeader() // 不存在主節點
          && isLocalServerEnabled(path, data); // 開啟作業
}

private boolean isLocalServerEnabled(final String path, final String data) {
    return serverNode.isLocalServerPath(path) 
       && !ServerStatus.DISABLED.name().equals(data);
}
           
    • 當作業被禁用( 

      LiteJobConfiguration.disabled = true

       )時,作業是不存在主節點的。那有同學就有疑問了?

      LeaderService#electLeader()

       沒做這個限制呀,作業注冊作業啟動資訊時也進行了選舉。在「4. 删除主節點」小結,我們會解開這個答案。這裡大家先記住這個結論。
    • 根據上面我們說的結論,這裡就很好了解了,

      #isActiveElection()

       方法判斷了兩個條件:( 1 ) 不存在主節點;( 2 ) 開啟作業,不再禁用,是以需要進行主節點選舉落。
    • 這裡判斷開啟作業的方法 

      #isLocalServerEnabled(...)

       有點特殊,它不是通過作業節點是否處于開啟狀态,而是該資料不是将作業節點更新成關閉狀态。舉個例子:作業節點處于禁用狀态,使用運維平台設定作業節點開啟,會進行主節點選舉;作業節點處于開啟狀态,使用運維平台設定作業節點禁用,不會進行主節點選舉。
  • 被動選舉 

    #isPassiveElection(...)

private boolean isPassiveElection(final String path, final Type eventType) {
  return isLeaderCrashed(path, eventType) // 主節點 Crashed
          && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()); // 目前節點正在運作中(未挂掉)
}
   
private boolean isLeaderCrashed(final String path, final Type eventType) {
  return leaderNode.isLeaderInstancePath(path) && Type.NODE_REMOVED == eventType;
}
           
    • 當主節點因為各種情況( 「4. 删除主節點」會列舉 )被删除,需要重新進行選舉。對的,必須主節點被删除後才可以重新進行選舉。
    • #isPassiveElection(...)

       方法判斷了兩個條件:( 1 ) 原主節點被删除;( 2 ) 目前節點正在運作中(未挂掉),可以參加主節點選舉。
    • #isLeaderCrashed(...)

       方法雖然命名帶有 

      Crashed

       英文,實際主作業節點正常退出也符合被動選舉條件。

等待主節點選舉完成

必須在主節點執行的操作,執行之前,需要判斷目前節點是否為主節點。如果主節點已經選舉好,可以直接進行判斷。但是,不排除主節點還沒選舉到,因而需要阻塞等待到主節點選舉完成後才能進行判斷。

實作代碼如下:

// LeaderService.java
    
/**
* 判斷目前節點是否是主節點.
* 
* 如果主節點正在選舉中而導緻取不到主節點, 則阻塞至主節點選舉完成再傳回.
* 
* @return 目前節點是否是主節點
*/
public boolean isLeaderUntilBlock() {
   // 不存在主節點 && 有可用的伺服器節點
   while (!hasLeader() && serverService.hasAvailableServers()) {
       log.info("Leader is electing, waiting for {} ms", 100);
       BlockUtils.waitingShortTime();
       if (!JobRegistry.getInstance().isShutdown(jobName)
               && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) { // 目前伺服器節點可用
           electLeader();
       }
   }
   // 傳回目前節點是否是主節點
   return isLeader();
}
           
  • 調用 

    BlockUtils#waitingShortTime()

     方法,選舉不到主節點進行等待,避免不間斷、無間隔的進行主節點選舉。

4. 删除主節點

有主節點的選舉,必然有主節點的删除,否則怎麼進行重新選舉。

實作代碼如下:

// LeaderService.java
/**
* 删除主節點供重新選舉.
*/
public void removeLeader() {
   jobNodeStorage.removeJobNodeIfExisted(LeaderNode.INSTANCE);
}
           

删除主節點時機

第一種,主節點程序正常關閉時。

public final class JobShutdownHookPlugin extends ShutdownHookPlugin {
    
    @Override
    public void shutdown() {
        CoordinatorRegistryCenter regCenter = JobRegistry.getInstance().getRegCenter(jobName);
        if (null == regCenter) {
            return;
        }
        LeaderService leaderService = new LeaderService(regCenter, jobName);
        if (leaderService.isLeader()) {
            leaderService.removeLeader(); // 移除主節點
        }
        new InstanceService(regCenter, jobName).removeInstance();
    }
}
           
  • 這個比較好了解,退出程序,若該程序為主節點,需要将自己移除。

第二種,主節點程序 CRASHED 時。

${JOB_NAME}/leader/electron/instance

 是臨時節點,主節點程序 CRASHED 後,超過最大會話時間,Zookeeper 自動進行删除,觸發重新選舉邏輯。

第三種,作業被禁用時。

class LeaderAbdicationJobListener extends AbstractJobListener {
   
   @Override
   protected void dataChanged(final String path, final Type eventType, final String data) {
       if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
           leaderService.removeLeader();
       }
   }
   
   private boolean isLocalServerDisabled(final String path, final String data) {
       return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
   }
}
           
  • 這裡就解答上面我們遺留的疑問。被禁用的作業注冊作業啟動資訊時即使進行了主節點選舉,也會被該監聽器處理,移除該選舉的主節點。

第四種,主節點程序遠端關閉。

// InstanceShutdownStatusJobListener.java
class InstanceShutdownStatusJobListener extends AbstractJobListener {
        
   @Override
   protected void dataChanged(final String path, final Type eventType, final String data) {
       if (!JobRegistry.getInstance().isShutdown(jobName)
               && !JobRegistry.getInstance().getJobScheduleController(jobName).isPaused() // 作業未暫停排程
               && isRemoveInstance(path, eventType) // 移除【運作執行個體】事件
               && !isReconnectedRegistryCenter()) { // 運作執行個體被移除
           schedulerFacade.shutdownInstance();
       }
   }
   
   private boolean isRemoveInstance(final String path, final Type eventType) {
       return instanceNode.isLocalInstancePath(path) && Type.NODE_REMOVED == eventType;
   }
   
   private boolean isReconnectedRegistryCenter() {
       return instanceService.isLocalJobInstanceExisted();
   }
}

// SchedulerFacade.java
/**
* 終止作業排程.
*/
public void shutdownInstance() {
   if (leaderService.isLeader()) {
       leaderService.removeLeader(); // 移除主節點
   }
   monitorService.close();
   if (reconcileService.isRunning()) {
       reconcileService.stopAsync();
   }
   JobRegistry.getInstance().shutdown(jobName);
}
           

遠端關閉作業節點有兩種方式:

  • zkClient 發起指令:

    rmr /${NAMESPACE}/${JOB_NAME}/instances/${JOB_INSTANCE_ID}

  • 運維平台發起 

    Shutdown

     操作。

    Shutdown

     操作實質上就是第一種。
    Elastic-Job-Lite 源碼分析 —— 主節點選舉1. 概述2. 為什麼需要選舉主節點3. 選舉主節點4. 删除主節點

繼續閱讀