Spark使用ZooKeeper進行資料恢複的邏輯過程如下:
1.初始化:建立<CuratorFramwork,LeaderLatch,LeaderLatchListener>用于選舉
建立CuratorFramework用于資料恢複。
2.選舉:啟動LeaderLatch,Curator開始接管選舉工作了。
3.恢複:當某個Master被選舉為Leader後,就會調用LeaderLatchListener的isLeader()方法,這個方法内部開始進行邏輯上的資料恢複工作,具體細節是這樣的,向Master發送ElectedLeader消息,Master從ZooKeeperPersistenceEngine中讀取資料到記憶體緩存中,ZooKeeperPersistenceEngine從ZooKeeper的/spark/master_status/目錄下讀取storedApps,storedDrivers,storedWorkers。
下面來進行一下源碼的走讀,友善日後回憶。
1.初始化:Master啟動時建立ZooKeeperLeaderElectionAgent和 ZooKeeperPersistenceEngine,前者用于選舉,後者用于資料恢複。
Master初始化源碼如下:
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serialization)
extends StandaloneRecoveryModeFactory(conf, serializer) {
def createPersistenceEngine(): PersistenceEngine = {
new ZooKeeperPersistenceEngine(conf, serializer)
}
def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
new ZooKeeperLeaderElectionAgent(master, conf)
}
}
private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)
extends PersistenceEngine
with Logging {
private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
//建立zookeeper用戶端
private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
//建立WORKING_DIR目錄
SparkCuratorUtil.mkdir(zk, WORKING_DIR)
}
建立ZooKeeperLeaderElectionAgent時會建立用于選舉的CuratorFramwork,LeaderLatch,LeaderLatchListener。其中的LeaderLatch用于選舉Leader,當某個LeaderLatch被選舉為Leader之後,就會調用對應的LeaderLatchListener的isLeader(),如下:
private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
private var zk: CuratorFramework = _
private var leaderLatch: LeaderLatch = _
private var status = LeadershipStatus.NOT_LEADER
start()
private def start() {
logInfo("Starting ZooKeeper LeaderElection agent")
zk = SparkCuratorUtil.newClient(conf)
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
leaderLatch.addListener(this)
leaderLatch.start()
}
2.選舉,調用LeaderLatch的start開始進行選舉
3.資料恢複:如果某個master被成功選舉為alive master,那麼會調用isLeader()。這個方法内部會向Master發送ElectedLeader消息,然後Master會從ZookeeperPersistenceEngin中也就是ZooKeeper中讀取storedApps,storedDrivers,storedWorkers并将他們恢複到記憶體緩存中去。
override def isLeader() {
synchronized {
// could have lost leadership by now.
if (!leaderLatch.hasLeadership) {
return
}
logInfo("We have gained leadership")
updateLeadershipStatus(true)
}
}
private def updateLeadershipStatus(isLeader: Boolean) {
if (isLeader && status == LeadershipStatus.NOT_LEADER) {
status = LeadershipStatus.LEADER
masterActor.electedLeader()
} else if (!isLeader && status == LeadershipStatus.LEADER) {
status = LeadershipStatus.NOT_LEADER
masterActor.revokedLeadership()
}
}
開始真正的資料恢複工作:
case ElectedLeader => {
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedDrivers, storedWorkers)
recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
CompleteRecovery)
}
}
持久化資料存儲在ZooKeeper中的/spark/master_status目錄下。以app為例,當向ZooKeeperPersistenceEngine中寫入app時,假設這個appId是1,那麼就會建立一個/spark/master_status/app_1的持久化節點,節點資料内容就是序列化的app對象。
/spark/master_status
/app_appid
/worker_workerId
/driver_driverId