Master作為Spark standalone模式的核心,如果Master出現異常,那麼叢集就不能正常工作。是以Spark會從Standby中選擇一個節點作為Master.
Spark支援以下幾種政策,這種政策可以通過配置檔案spark-env.sh配置spark.deploy.recoveryMode
# ZOOKEEPER: 叢集中繼資料持久化到zookeeper,當master出現異常的時候,zookeeper會通過選舉機制選舉出新的Master,新的Master接管叢集時需要從zookeeper擷取持久化資訊,并根據這些資訊恢複叢集狀态
# FILESYSTEM: 叢集的中繼資料持久化到檔案系統,當Master出現異常的時候,隻要在該機器上重新開機Master,啟動後的Master擷取持久化資訊并根據持久化資訊恢複叢集狀态
# CUSTOM: 自定義恢複模式,實作StandaloneRecoveryModeFactory抽象類進行實作,并把該類配置到配置檔案,當Master出現異常,會根據使用者自定義的方式進行恢複叢集狀況
# NONE: 不持久化叢集中繼資料,當Master出現異常時,新啟動的Master不進行恢複叢集狀态
我們知道Master繼承了ThreadSafeRpcEndpoint,進而可以進行Rpc通信,而且它還繼承了特質LeaderElectable,進而可以進行選舉Leader和撤銷Leader身份。
trait LeaderElectable {
def electedLeader(): Unit
def revokedLeadership(): Unit
}
一 HA相關的屬性
state = RecoveryState.STANDBY: 初始狀态設定為standby
PersistenceEngine persistenceEngine:持久化引擎,用于持久化叢集中繼資料
LeaderElectionAgent leaderElectionAgent:用于Leader選舉的代理
二 根據恢複模式政策,初始化對應的持久化對象和選舉代理對象
在Master啟動的時候,會根據目前配置的spark.deploy.recoveryMode政策,如果沒有預設是None得到對應的持久化引擎和用于選舉Leader的持久化代理
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
// 如果恢複模式是ZOOKEEPER,那麼通過zookeeper來持久化恢複狀态
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
// 如果恢複模式是檔案系統,那麼通過檔案系統來持久化恢複狀态
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
// 如果恢複模式是定制的,那麼指定你定制的全路徑類名,然後産生相關操作來持久化恢複狀态
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
三 Master異常進行恢複

3.1 開始進行選舉
case ElectedLeader =>
// 根據配置的spark.deploy.recoveryMode,決定使用哪一種recovery 模式,然後決定采用什麼持久化engine
// 然後根據持久化engine,讀取持久化資料,得到一個已經存儲的(applicationInfo,driverInfo,workerInfo)
// 的一個三元組
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
// 根據讀取的持久化資料是否都為空,判斷RecoveryState狀态是否是alive還是recovering
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
// 如果處于recovering狀态
if (state == RecoveryState.RECOVERING) {
// 開始恢複資料
beginRecovery(storedApps, storedDrivers, storedWorkers)
// 背景線程排程一個線程去定期檢查master完成了恢複工作
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
// 如果是CompleteRecovery,則調用completeRecovery
case CompleteRecovery => completeRecovery()
3.2 開始進行恢複
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
storedWorkers: Seq[WorkerInfo]) {
// 周遊每一個存儲的application,注冊該application,并且發送MasterChanged請求
for (app <- storedApps) {
logInfo("Trying to recover app: " + app.id)
try {
registerApplication(app)
// 将該application狀态置為UNKNOWN狀态
app.state = ApplicationState.UNKNOWN
// 然後這個app向master發送MasterChanged請求
app.driver.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
}
}
// 周遊每一個存儲的driver, 更新master所維護的driver集合
for (driver <- storedDrivers) {
drivers += driver
}
// 周遊每一個存儲的wroker,然後向master注冊worker
for (worker <- storedWorkers) {
logInfo("Trying to recover worker: " + worker.id)
try {
// 注冊worker,就是更新master的woker集合,和worker相關的映射清單
registerWorker(worker)
// 将該worker狀态置為UNKNOWN狀态
worker.state = WorkerState.UNKNOWN
// 然後改worker向master發送MasterChanged請求
worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
}
}
}
3.3 worker接受到MasterChange消息,向master發送消息:
WorkerSchedulerStateResponse
case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
// 擷取新的master的url和master,連接配接狀态置為true,取消之前的嘗試重新注冊
changeMaster(masterRef, masterWebUiUrl)
// 建立目前節點executors的簡單描述對象ExecutorDescription
val execs = executors.values.
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
// 向新的master發送WorkerSchedulerStateResponse消息,然後會做一些操作
masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))
3.4 Application接受到MasterChange消息,向Master發送消息:MasterChangeAcknowledged
case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
master = Some(masterRef)
alreadyDisconnected = false
masterRef.send(MasterChangeAcknowledged(appId.get))
3.5 Master接收到WorkerSchedulerStateResponse和MasterChangeAcknowledged消息後,調用completeRecovery操作,kill掉那些不響應但是狀态不是UNKNOWN的worker和application
case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
// 根據workerId擷取worker
idToWorker.get(workerId) match {
case Some(worker) =>
logInfo("Worker has been re-registered: " + workerId)
// worker狀态置為alive
worker.state = WorkerState.ALIVE
// 從指定的executor中過濾出哪些是有效的executor
val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
// 周遊有效的executors
for (exec <- validExecutors) {
// 擷取executor所對應的app
val app = idToApp.get(exec.appId).get
// 為app設定executor,比如哪一個worker,多少核數等資源
val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
// 将該executor添加到該woker上
worker.addExecutor(execInfo)
execInfo.copyState(exec)
}
// 将所有的driver設定為RUNNING然後加入到worker中
for (driverId <- driverIds) {
drivers.find(_.id == driverId).foreach { driver =>
driver.worker = Some(worker)
driver.state = DriverState.RUNNING
worker.drivers(driverId) = driver
}
}
case None =>
logWarning("Scheduler state from unknown worker: " + workerId)
}
// 判斷目前是否可以進行completeRecovery操作,如果可以進行completeRecovery操作
if (canCompleteRecovery) { completeRecovery() }
case MasterChangeAcknowledged(workerId, executors, driverIds) =>
// 根據workerId擷取worker
idToWorker.get(workerId) match {
case Some(worker) =>
logInfo("Worker has been re-registered: " + workerId)
// worker狀态置為alive
worker.state = WorkerState.ALIVE
// 從指定的executor中過濾出哪些是有效的executor
val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
// 周遊有效的executors
for (exec <- validExecutors) {
// 擷取executor所對應的app
val app = idToApp.get(exec.appId).get
// 為app設定executor,比如哪一個worker,多少核數等資源
val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
// 将該executor添加到該woker上
worker.addExecutor(execInfo)
execInfo.copyState(exec)
}
// 将所有的driver設定為RUNNING然後加入到worker中
for (driverId <- driverIds) {
drivers.find(_.id == driverId).foreach { driver =>
driver.worker = Some(worker)
driver.state = DriverState.RUNNING
worker.drivers(driverId) = driver
}
}
case None =>
logWarning("Scheduler state from unknown worker: " + workerId)
}
// 判斷目前是否可以進行completeRecovery操作,如果可以進行completeRecovery操作
if (canCompleteRecovery) { completeRecovery() }
3.6 完成恢複
private def completeRecovery() {
// 如果狀态不是recovering則傳回
if (state != RecoveryState.RECOVERING) { return }
// 然後狀态置為completing_recovery(正處于恢複中)
state = RecoveryState.COMPLETING_RECOVERY
// 殺掉那些不響應但是狀态不是UNKNOWN的worker和application
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// 重新排程未被任何worker聲稱的driver,即還沒有worker來運作
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
// 如果是driver是監管者,則重新發起driver,否則删除driver
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
// 然後狀态置為alive
state = RecoveryState.ALIVE
// 重新配置設定資源,排程driver和application
schedule()
logInfo("Recovery complete - resuming operations!")
}
四 觸發選舉流程
我們這裡主要以zookeeper為例子:
當節點啟動的時候,會調用onstart方法,然後個根據恢複模式zookeeper,會初始化ZooKeeperPersistenceEngine和ZooKeeperLeaderElectionAgent。ZooKeeperPersistenceEngine主要負責持久化app,driver,woker等資訊,ZooKeeperLeaderElectionAgent主要負責監聽master狀态變化和選舉Leader。
ZooKeeperLeaderElectionAgent在初始化的時候,就會啟動,它在啟動的時候會建立LeaderLatch,Leader啟動就會啟動一個背景線程去檢測Leader狀态,然後ZooKeeperLeaderElectionAgent根據目前節點狀态判斷是不是Leader,向Master發送ElectedLeader和RevokedLeadership消息