天天看點

Spark ZooKeeper資料恢複

Spark使用ZooKeeper進行資料恢複的邏輯過程如下:

1.初始化:建立<CuratorFramwork,LeaderLatch,LeaderLatchListener>用于選舉

            建立CuratorFramework用于資料恢複。

2.選舉:啟動LeaderLatch,Curator開始接管選舉工作了。

3.恢複:當某個Master被選舉為Leader後,就會調用LeaderLatchListener的isLeader()方法,這個方法内部開始進行邏輯上的資料恢複工作,具體細節是這樣的,向Master發送ElectedLeader消息,Master從ZooKeeperPersistenceEngine中讀取資料到記憶體緩存中,ZooKeeperPersistenceEngine從ZooKeeper的/spark/master_status/目錄下讀取storedApps,storedDrivers,storedWorkers。

下面來進行一下源碼的走讀,友善日後回憶。

1.初始化:Master啟動時建立ZooKeeperLeaderElectionAgent和 ZooKeeperPersistenceEngine,前者用于選舉,後者用于資料恢複。

Master初始化源碼如下:

case "ZOOKEEPER" =>
        logInfo("Persisting recovery state to ZooKeeper")
        val zkFactory =
          new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
        (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))      
private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serialization)
  extends StandaloneRecoveryModeFactory(conf, serializer) {

  def createPersistenceEngine(): PersistenceEngine = {
    new ZooKeeperPersistenceEngine(conf, serializer)
  }

  def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
    new ZooKeeperLeaderElectionAgent(master, conf)
  }
}      
private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)
  extends PersistenceEngine
  with Logging {

  private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
  //建立zookeeper用戶端
  private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)

  //建立WORKING_DIR目錄
  SparkCuratorUtil.mkdir(zk, WORKING_DIR)
}      

建立ZooKeeperLeaderElectionAgent時會建立用于選舉的CuratorFramwork,LeaderLatch,LeaderLatchListener。其中的LeaderLatch用于選舉Leader,當某個LeaderLatch被選舉為Leader之後,就會調用對應的LeaderLatchListener的isLeader(),如下:

private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
    conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging  {

  val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"

  private var zk: CuratorFramework = _
  private var leaderLatch: LeaderLatch = _
  private var status = LeadershipStatus.NOT_LEADER

  start()

  private def start() {
    logInfo("Starting ZooKeeper LeaderElection agent")
    zk = SparkCuratorUtil.newClient(conf)
    leaderLatch = new LeaderLatch(zk, WORKING_DIR)
    leaderLatch.addListener(this)
    leaderLatch.start()
  }      

2.選舉,調用LeaderLatch的start開始進行選舉

3.資料恢複:如果某個master被成功選舉為alive master,那麼會調用isLeader()。這個方法内部會向Master發送ElectedLeader消息,然後Master會從ZookeeperPersistenceEngin中也就是ZooKeeper中讀取storedApps,storedDrivers,storedWorkers并将他們恢複到記憶體緩存中去。

override def isLeader() {
    synchronized {
      // could have lost leadership by now.
      if (!leaderLatch.hasLeadership) {
        return
      }

      logInfo("We have gained leadership")
      updateLeadershipStatus(true)
    }
  }      
private def updateLeadershipStatus(isLeader: Boolean) {
    if (isLeader && status == LeadershipStatus.NOT_LEADER) {
      status = LeadershipStatus.LEADER
      masterActor.electedLeader()
    } else if (!isLeader && status == LeadershipStatus.LEADER) {
      status = LeadershipStatus.NOT_LEADER
      masterActor.revokedLeadership()
    }
  }      

開始真正的資料恢複工作:

case ElectedLeader => {
      val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
      state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
        RecoveryState.ALIVE
      } else {
        RecoveryState.RECOVERING
      }
      logInfo("I have been elected leader! New state: " + state)
      if (state == RecoveryState.RECOVERING) {
        beginRecovery(storedApps, storedDrivers, storedWorkers)
        recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
          CompleteRecovery)
      }
    }      

持久化資料存儲在ZooKeeper中的/spark/master_status目錄下。以app為例,當向ZooKeeperPersistenceEngine中寫入app時,假設這個appId是1,那麼就會建立一個/spark/master_status/app_1的持久化節點,節點資料内容就是序列化的app對象。

/spark/master_status

                               /app_appid

                              /worker_workerId

                             /driver_driverId

繼續閱讀