天天看點

Apache Spark源碼走讀之19 -- standalone cluster模式下資源的申請與釋放概要資源上報彙聚過程資源配置設定過程資源回收過程異常情況下的資源回收

歡迎轉載,轉載請注明出處,徽滬一郎。

概要

本文主要講述在standalone cluster部署模式下,Spark Application在整個運作期間,資源(主要是cpu core和記憶體)的申請與釋放。

構成Standalone cluster部署模式的四大組成部件如下圖所示,分别為Master, worker, executor和driver,它們各自運作于獨立的JVM程序。

從資源管理的角度來說

  • Master  掌管整個cluster的資源,主要是指cpu core和memory,但Master自身并不擁有這些資源
  • Worker 計算資源的實際貢獻者,須向Master彙報自身擁有多少cpu core和memory, 在master的訓示下負責啟動executor
  • Executor 執行真正計算的苦力,由master來決定該程序擁有的core和memory數值
  • Driver 資源的實際占用者,Driver會送出一到多個job,每個job在拆分成多個task之後,會分發到各個executor真正的執行

這些内容在standalone cluster模式下的容錯性分析中也有所涉及,今天主要講一下資源在配置設定之後不同場景下是如何被順利回收的。

資源上報彙聚過程

standalone cluster下最主要的當然是master,master必須先于worker和driver程式正常啟動。

當master順利啟動完畢,可以開始worker的啟動工作,worker在啟動的時候需要向master發起注冊,在注冊消息中帶有本worker節點的cpu core和記憶體。

調用順序如下preStart->registerWithMaster->tryRegisterAllMasters

看一看tryRegisterAllMasters的代碼

def tryRegisterAllMasters() {
    for (masterUrl <- masterUrls) {
      logInfo("Connecting to master " + masterUrl + "...")
      val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
      actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
    }
  }
           

我們的疑問是RegisterWorker構造函數所需的參數memory和cores是從哪裡擷取的呢?

注意一下Worker中的main函數會建立WorkerArguments,

def main(argStrings: Array[String]) {
    SignalLogger.register(log)
    val args = new WorkerArguments(argStrings)
    val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir)
    actorSystem.awaitTermination()
  }
           

 memory通過函數inferDefaultMemory擷取,而cores通過inferDefaultCores擷取。

def inferDefaultCores(): Int = {
    Runtime.getRuntime.availableProcessors()
  }

  def inferDefaultMemory(): Int = {
    val ibmVendor = System.getProperty("java.vendor").contains("IBM")
    var totalMb = 0
    try {
      val bean = ManagementFactory.getOperatingSystemMXBean()
      if (ibmVendor) {
        val beanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean")
        val method = beanClass.getDeclaredMethod("getTotalPhysicalMemory")
        totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
      } else {
        val beanClass = Class.forName("com.sun.management.OperatingSystemMXBean")
        val method = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize")
        totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
      }
    } catch {
      case e: Exception => {
        totalMb = 2*1024
        System.out.println("Failed to get total physical memory. Using " + totalMb + " MB")
      }
    }
    // Leave out 1 GB for the operating system, but don't return a negative memory size
    math.max(totalMb - 1024, 512)
  }
           

 如果已經在配置檔案中為顯示指定了每個worker的core和memory,則使用配置檔案中的值,具體配置參數為SPARK_WORKER_CORES和SPARK_WORKER_MEMORY

Master在收到RegisterWork消息之後,根據上報的資訊為每一個worker建立相應的WorkerInfo.

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
    {
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else if (idToWorker.contains(id)) {
        sender ! RegisterWorkerFailed("Duplicate worker ID")
      } else {
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          sender, workerUiPort, publicAddress)
        if (registerWorker(worker)) {
          persistenceEngine.addWorker(worker)
          sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
          schedule()
        } else {
          val workerAddress = worker.actor.path.address
          logWarning("Worker registration failed. Attempted to re-register worker at same " +
            "address: " + workerAddress)
          sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "
            + workerAddress)
        }
      }
           

資源配置設定過程

如果在worker注冊上來的時候,已經有Driver Application注冊上來,那麼就需要将原先處于未配置設定資源狀态的driver application啟動相應的executor。

WorkerInfo在schedule函數中會被使用到,schedule函數處理邏輯概述如下

  1. 檢視目前存活的worker中剩餘的記憶體是否能夠滿足application每個task的最低需求,如果是則将該worker加入到可配置設定資源的隊列
  2. 根據分發政策,如果是決定将工作平攤到每個worker,則每次在一個worker上占用一個core,直到所有可配置設定資源耗盡或已經滿足driver的需求
  3. 如果分發政策是分發到盡可能少的worker,則一次占用盡worker上的可配置設定core,直到driver的core需求得到滿足
  4. 根據步驟2或3的結果在每個worker上添加相應的executor,處理函數是addExecutor

為了叙述簡單,現僅列出平攤到各個worker的配置設定處理過程

for (worker > workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
        for (app <- waitingApps if app.coresLeft > 0) {
          if (canUse(app, worker)) {
            val coresToUse = math.min(worker.coresFree, app.coresLeft)
            if (coresToUse > 0) {
              val exec = app.addExecutor(worker, coresToUse)
              launchExecutor(worker, exec)
              app.state = ApplicationState.RUNNING
            }
          }
        }
      }
           

launchExecutor主要負責兩件事情

  1. 記錄下新添加的executor使用掉的cpu core和記憶體數目,記錄過程發生在worker.addExecutor
  2. 向worker發送LaunchExecutor指令
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    worker.actor ! LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
    exec.application.driver ! ExecutorAdded(
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
  }
           

worker在收到LaunchExecutor指令後,也會記一筆賬,将要使用掉的cpu core和memory從可用資源中減去,然後使用ExecutorRunner來負責生成Executor程序,注意Executor運作于獨立的程序。代碼如下

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
          val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
            self, workerId, host,
            appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
            workDir, akkaUrl, conf, ExecutorState.RUNNING)
          executors(appId + "/" + execId) = manager
          manager.start()
          coresUsed += cores_
          memoryUsed += memory_
          masterLock.synchronized {
            master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
          }
        } catch {
          case e: Exception => {
            logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            masterLock.synchronized {
              master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
            }
          }
        }
      }
           

在資源配置設定過程中需要注意到的是如果有多個Driver Application處于等待狀态,資源配置設定的原則是FIFO,先到先得。

資源回收過程

worker中上報的資源最終被driver application中送出的job task所占用,如果application結束(包括正常和異常退出),application所占用的資源就應該被順利回收,即将占用的資源重新歸入可配置設定資源行列。

現在的問題轉換成Master和Executor如何知道Driver Application已經退出了呢?

有兩種不同的處理方式,一種是先道别後離開,一種是不告而别。現分别闡述。

何為先道别後離開,即driver application顯式的通知master和executor,任務已經完成了,我要bye了。應用程式顯式的調用SparkContext.stop

def stop() {
    postApplicationEnd()
    ui.stop()
    // Do this only if not stopped already - best case effort.
    // prevent NPE if stopped more than once.
    val dagSchedulerCopy = dagScheduler
    dagScheduler = null
    if (dagSchedulerCopy != null) {
      metadataCleaner.cancel()
      cleaner.foreach(_.stop())
      dagSchedulerCopy.stop()
      taskScheduler = null
      // TODO: Cache.stop()?
      env.stop()
      SparkEnv.set(null)
      ShuffleMapTask.clearCache()
      ResultTask.clearCache()
      listenerBus.stop()
      eventLogger.foreach(_.stop())
      logInfo("Successfully stopped SparkContext")
    } else {
      logInfo("SparkContext already stopped")
    }
  }
           

顯式調用SparkContext.stop的一個主要功能是會去顯式的停止Executor,具體下達StopExecutor指令的代碼見于CoarseGrainedSchedulerBackend中的stop函數

override def stop() {
    stopExecutors()
    try {
      if (driverActor != null) {
        val future = driverActor.ask(StopDriver)(timeout)
        Await.ready(future, timeout)
      }
    } catch {
      case e: Exception =>
        throw new SparkException("Error stopping standalone scheduler's driver actor", e)
    }
  }
           

那麼Master又是如何知道Driver Application退出的呢?這要歸功于Akka的通訊機制了,當互相通訊的任意一方異常退出,另一方都會收到DisassociatedEvent, Master也就是在這個消息進行中移除已經停止的Driver Application。

case DisassociatedEvent(_, address, _) => {
      // The disconnected client could've been either a worker or an app; remove whichever it was
      logInfo(s"$address got disassociated, removing it.")
      addressToWorker.get(address).foreach(removeWorker)
      addressToApp.get(address).foreach(finishApplication)
      if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
    }
           

不告而别的方式下Executor是如何知道自己所服務的application已經順利完成使命了呢?道理和master的一樣,還是通過DisassociatedEvent來感覺。詳見CoarseGrainedExecutorBackend中的receive函數

case x: DisassociatedEvent =>
      logError(s"Driver $x disassociated! Shutting down.")
      System.exit(1)
           

異常情況下的資源回收

由于Master和Worker之間的心跳機制,如果worker異常退出, Master會由心跳機制感覺到其消亡,進而将其上報的資源移除。

Executor異常退出時,Worker中的監控線程ExecutorRunner會立即感覺,進而上報給Master,Master會回收資源,并重新要求worker啟動executor。