天天看點

Spark源碼分析之Master主備切換機制一 HA相關的屬性二 根據恢複模式政策,初始化對應的持久化對象和選舉代理對象四 觸發選舉流程

Master作為Spark standalone模式的核心,如果Master出現異常,那麼叢集就不能正常工作。是以Spark會從Standby中選擇一個節點作為Master.

Spark支援以下幾種政策,這種政策可以通過配置檔案spark-env.sh配置spark.deploy.recoveryMode

# ZOOKEEPER: 叢集中繼資料持久化到zookeeper,當master出現異常的時候,zookeeper會通過選舉機制選舉出新的Master,新的Master接管叢集時需要從zookeeper擷取持久化資訊,并根據這些資訊恢複叢集狀态

# FILESYSTEM: 叢集的中繼資料持久化到檔案系統,當Master出現異常的時候,隻要在該機器上重新開機Master,啟動後的Master擷取持久化資訊并根據持久化資訊恢複叢集狀态

# CUSTOM: 自定義恢複模式,實作StandaloneRecoveryModeFactory抽象類進行實作,并把該類配置到配置檔案,當Master出現異常,會根據使用者自定義的方式進行恢複叢集狀況

# NONE: 不持久化叢集中繼資料,當Master出現異常時,新啟動的Master不進行恢複叢集狀态

我們知道Master繼承了ThreadSafeRpcEndpoint,進而可以進行Rpc通信,而且它還繼承了特質LeaderElectable,進而可以進行選舉Leader和撤銷Leader身份。

trait LeaderElectable {

  def electedLeader(): Unit

  def revokedLeadership(): Unit

}

一 HA相關的屬性

state = RecoveryState.STANDBY: 初始狀态設定為standby

PersistenceEngine persistenceEngine:持久化引擎,用于持久化叢集中繼資料

LeaderElectionAgent leaderElectionAgent:用于Leader選舉的代理

二 根據恢複模式政策,初始化對應的持久化對象和選舉代理對象

在Master啟動的時候,會根據目前配置的spark.deploy.recoveryMode政策,如果沒有預設是None得到對應的持久化引擎和用于選舉Leader的持久化代理

val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
  // 如果恢複模式是ZOOKEEPER,那麼通過zookeeper來持久化恢複狀态
  case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
    val zkFactory =
      new ZooKeeperRecoveryModeFactory(conf, serializer)
    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
  // 如果恢複模式是檔案系統,那麼通過檔案系統來持久化恢複狀态
  case "FILESYSTEM" =>
    val fsFactory =
      new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  // 如果恢複模式是定制的,那麼指定你定制的全路徑類名,然後産生相關操作來持久化恢複狀态
  case "CUSTOM" =>
    val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
    val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
      .newInstance(conf, serializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_      

三 Master異常進行恢複

Spark源碼分析之Master主備切換機制一 HA相關的屬性二 根據恢複模式政策,初始化對應的持久化對象和選舉代理對象四 觸發選舉流程

3.1 開始進行選舉

case ElectedLeader =>
  // 根據配置的spark.deploy.recoveryMode,決定使用哪一種recovery 模式,然後決定采用什麼持久化engine
  // 然後根據持久化engine,讀取持久化資料,得到一個已經存儲的(applicationInfo,driverInfo,workerInfo)
  // 的一個三元組
  val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
  // 根據讀取的持久化資料是否都為空,判斷RecoveryState狀态是否是alive還是recovering
  state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
    RecoveryState.ALIVE
  } else {
    RecoveryState.RECOVERING
  }
  logInfo("I have been elected leader! New state: " + state)
  // 如果處于recovering狀态
  if (state == RecoveryState.RECOVERING) {
    // 開始恢複資料
    beginRecovery(storedApps, storedDrivers, storedWorkers)
    // 背景線程排程一個線程去定期檢查master完成了恢複工作
    recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
      override def run(): Unit = Utils.tryLogNonFatalError {
        self.send(CompleteRecovery)
      }
    }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
  }
// 如果是CompleteRecovery,則調用completeRecovery
case CompleteRecovery => completeRecovery()      

3.2 開始進行恢複

private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
    storedWorkers: Seq[WorkerInfo]) {
  // 周遊每一個存儲的application,注冊該application,并且發送MasterChanged請求
  for (app <- storedApps) {
    logInfo("Trying to recover app: " + app.id)
    try {
      registerApplication(app)
      // 将該application狀态置為UNKNOWN狀态
      app.state = ApplicationState.UNKNOWN
      // 然後這個app向master發送MasterChanged請求
      app.driver.send(MasterChanged(self, masterWebUiUrl))
    } catch {
      case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
    }
  }
  // 周遊每一個存儲的driver, 更新master所維護的driver集合
  for (driver <- storedDrivers) {
    drivers += driver
  }
  // 周遊每一個存儲的wroker,然後向master注冊worker
  for (worker <- storedWorkers) {
    logInfo("Trying to recover worker: " + worker.id)
    try {
      // 注冊worker,就是更新master的woker集合,和worker相關的映射清單
      registerWorker(worker)
      // 将該worker狀态置為UNKNOWN狀态
      worker.state = WorkerState.UNKNOWN
      // 然後改worker向master發送MasterChanged請求
      worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
    } catch {
      case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
    }
  }
}      

3.3 worker接受到MasterChange消息,向master發送消息:

WorkerSchedulerStateResponse

case MasterChanged(masterRef, masterWebUiUrl) =>
  logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
  // 擷取新的master的url和master,連接配接狀态置為true,取消之前的嘗試重新注冊
  changeMaster(masterRef, masterWebUiUrl)
  // 建立目前節點executors的簡單描述對象ExecutorDescription
  val execs = executors.values.
    map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
  // 向新的master發送WorkerSchedulerStateResponse消息,然後會做一些操作
  masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))      

3.4 Application接受到MasterChange消息,向Master發送消息:MasterChangeAcknowledged

case MasterChanged(masterRef, masterWebUiUrl) =>
  logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
  master = Some(masterRef)
  alreadyDisconnected = false
  masterRef.send(MasterChangeAcknowledged(appId.get))      

3.5 Master接收到WorkerSchedulerStateResponse和MasterChangeAcknowledged消息後,調用completeRecovery操作,kill掉那些不響應但是狀态不是UNKNOWN的worker和application

case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
  // 根據workerId擷取worker
  idToWorker.get(workerId) match {
    case Some(worker) =>
      logInfo("Worker has been re-registered: " + workerId)
      // worker狀态置為alive
      worker.state = WorkerState.ALIVE
      // 從指定的executor中過濾出哪些是有效的executor
      val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
      // 周遊有效的executors
      for (exec <- validExecutors) {
        // 擷取executor所對應的app
        val app = idToApp.get(exec.appId).get
        // 為app設定executor,比如哪一個worker,多少核數等資源
        val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
        // 将該executor添加到該woker上
        worker.addExecutor(execInfo)
        execInfo.copyState(exec)
      }
      // 将所有的driver設定為RUNNING然後加入到worker中
      for (driverId <- driverIds) {
        drivers.find(_.id == driverId).foreach { driver =>
          driver.worker = Some(worker)
          driver.state = DriverState.RUNNING
          worker.drivers(driverId) = driver
        }
      }
    case None =>
      logWarning("Scheduler state from unknown worker: " + workerId)
  }
  // 判斷目前是否可以進行completeRecovery操作,如果可以進行completeRecovery操作
  if (canCompleteRecovery) { completeRecovery() }      
case MasterChangeAcknowledged(workerId, executors, driverIds) =>
  // 根據workerId擷取worker
  idToWorker.get(workerId) match {
    case Some(worker) =>
      logInfo("Worker has been re-registered: " + workerId)
      // worker狀态置為alive
      worker.state = WorkerState.ALIVE
      // 從指定的executor中過濾出哪些是有效的executor
      val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
      // 周遊有效的executors
      for (exec <- validExecutors) {
        // 擷取executor所對應的app
        val app = idToApp.get(exec.appId).get
        // 為app設定executor,比如哪一個worker,多少核數等資源
        val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
        // 将該executor添加到該woker上
        worker.addExecutor(execInfo)
        execInfo.copyState(exec)
      }
      // 将所有的driver設定為RUNNING然後加入到worker中
      for (driverId <- driverIds) {
        drivers.find(_.id == driverId).foreach { driver =>
          driver.worker = Some(worker)
          driver.state = DriverState.RUNNING
          worker.drivers(driverId) = driver
        }
      }
    case None =>
      logWarning("Scheduler state from unknown worker: " + workerId)
  }
  // 判斷目前是否可以進行completeRecovery操作,如果可以進行completeRecovery操作
  if (canCompleteRecovery) { completeRecovery() }      

3.6 完成恢複

private def completeRecovery() {
  // 如果狀态不是recovering則傳回
  if (state != RecoveryState.RECOVERING) { return }
  // 然後狀态置為completing_recovery(正處于恢複中)
  state = RecoveryState.COMPLETING_RECOVERY

  // 殺掉那些不響應但是狀态不是UNKNOWN的worker和application
  workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
  apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)

  // 重新排程未被任何worker聲稱的driver,即還沒有worker來運作
  drivers.filter(_.worker.isEmpty).foreach { d =>
    logWarning(s"Driver ${d.id} was not found after master recovery")
    // 如果是driver是監管者,則重新發起driver,否則删除driver
    if (d.desc.supervise) {
      logWarning(s"Re-launching ${d.id}")
      relaunchDriver(d)
    } else {
      removeDriver(d.id, DriverState.ERROR, None)
      logWarning(s"Did not re-launch ${d.id} because it was not supervised")
    }
  }
  // 然後狀态置為alive
  state = RecoveryState.ALIVE
  // 重新配置設定資源,排程driver和application
  schedule()
  logInfo("Recovery complete - resuming operations!")
}      

四 觸發選舉流程

我們這裡主要以zookeeper為例子:

當節點啟動的時候,會調用onstart方法,然後個根據恢複模式zookeeper,會初始化ZooKeeperPersistenceEngine和ZooKeeperLeaderElectionAgent。ZooKeeperPersistenceEngine主要負責持久化app,driver,woker等資訊,ZooKeeperLeaderElectionAgent主要負責監聽master狀态變化和選舉Leader。

ZooKeeperLeaderElectionAgent在初始化的時候,就會啟動,它在啟動的時候會建立LeaderLatch,Leader啟動就會啟動一個背景線程去檢測Leader狀态,然後ZooKeeperLeaderElectionAgent根據目前節點狀态判斷是不是Leader,向Master發送ElectedLeader和RevokedLeadership消息

繼續閱讀