天天看点

Spark源码分析之Master状态改变处理机制原理一Master故障挥着宕机,可能触发新的Master选举二Worker和AppClient会接受到来自Master的MasterChanged消息三 Master会接受来自Worker和AppClient的消息

一Master故障挥着宕机,可能触发新的Master选举

当重新选择Leader的时候,会进行集群的恢复,在恢复的过程中,就会向Worker和AppClient发送MasterChanged消息。

private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
    storedWorkers: Seq[WorkerInfo]) {
  // 遍历每一个存储的application,注册该application,并且发送MasterChanged请求
  for (app <- storedApps) {
    logInfo("Trying to recover app: " + app.id)
    try {
      registerApplication(app)
      // 将该application状态置为UNKNOWN状态
      app.state = ApplicationState.UNKNOWN
      // 然后这个app向master发送MasterChanged请求
      app.driver.send(MasterChanged(self, masterWebUiUrl))
    } catch {
      case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
    }
  }
  // 遍历每一个存储的driver, 更新master所维护的driver集合
  for (driver <- storedDrivers) {
    drivers += driver
  }
  // 遍历每一个存储的wroker,然后向master注册worker
  for (worker <- storedWorkers) {
    logInfo("Trying to recover worker: " + worker.id)
    try {
      // 注册worker,就是更新master的woker集合,和worker相关的映射列表
      registerWorker(worker)
      // 将该worker状态置为UNKNOWN状态
      worker.state = WorkerState.UNKNOWN
      // 然后改worker向master发送MasterChanged请求
      worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
    } catch {
      case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
    }
  }
}      

二Worker和AppClient会接受到来自Master的MasterChanged消息

2.1 Worker在收到MasterChanged消息

# 获取新的master的url和master,连接状态置为true,取消之前的尝试重新注册

# 向新的master发送WorkerSchedulerStateResponse消息,然后会做一些操作

case MasterChanged(masterRef, masterWebUiUrl) =>

  logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)

  // 获取新的master的url和master,连接状态置为true,取消之前的尝试重新注册

  changeMaster(masterRef, masterWebUiUrl)

  // 创建当前节点executors的简单描述对象ExecutorDescription

  val execs = executors.values.

    map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))

  // 向新的master发送WorkerSchedulerStateResponse消息,然后会做一些操作

  masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))

2.2 AppClient在收到MasterChanged消息

# 更新master

# 向新的master发送MasterChangeAcknowledged消息

case MasterChanged(masterRef, masterWebUiUrl) =>
  logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
  // 更新master
  master = Some(masterRef)
  alreadyDisconnected = false
  // 向新的master发送MasterChangeAcknowledged消息
  masterRef.send(MasterChangeAcknowledged(appId.get))      

三 Master会接受来自Worker和AppClient的消息

3.1 Master在收到Worker的WorkerSchedulerStateResponse消息

由于这是新的master,所以worker需要重新注册,然后新的master再次把之前相关的应用程序在worker上进行恢复

case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
  // 根据workerId获取worker
  idToWorker.get(workerId) match {
    case Some(worker) =>
      logInfo("Worker has been re-registered: " + workerId)
      // worker状态置为alive
      worker.state = WorkerState.ALIVE
      // 从指定的executor中过滤出哪些是有效的executor
      val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
      // 遍历有效的executors
      for (exec <- validExecutors) {
        // 获取executor所对应的app
        val app = idToApp.get(exec.appId).get
        // 为app设置executor,比如哪一个worker,多少核数等资源
        val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
        // 将该executor添加到该woker上
        worker.addExecutor(execInfo)
        execInfo.copyState(exec)
      }
      // 将所有的driver设置为RUNNING然后加入到worker中
      for (driverId <- driverIds) {
        drivers.find(_.id == driverId).foreach { driver =>
          driver.worker = Some(worker)
          driver.state = DriverState.RUNNING
          worker.drivers(driverId) = driver
        }
      }
    case None =>
      logWarning("Scheduler state from unknown worker: " + workerId)
  }
  // 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作
  if (canCompleteRecovery) { completeRecovery() }      

3.2 Master收到AppClient的MasterChangeAcknowledged消息

# 更新application状态为WAITTING

# 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作

case MasterChangeAcknowledged(appId) =>
  idToApp.get(appId) match {
    case Some(app) =>
      logInfo("Application has been re-registered: " + appId)
      app.state = ApplicationState.WAITING
    case None =>
      logWarning("Master change ack from unknown app: " + appId)
  }
    // 判断当前是否可以进行completeRecovery操作,如果可以进行completeRecovery操作
  if (canCompleteRecovery) { completeRecovery() }      

继续阅读