天天看點

6. Spark源碼解析之Worker執行個體化流程解析

前面解讀了Worker的啟動流程,5. Spark源碼解析之worker啟動流程解析

這裡解讀下Worker的執行個體化流程。

Worker執行個體化

通過RPC中new Worker,開始真正的執行個體化Worker。

private[deploy] class Worker(
    // 定義worker需要的參數,不用解讀了
    override val rpcEnv: RpcEnv,
    webUiPort: Int,
    cores: Int,
    memory: Int,
    masterRpcAddresses: Array[RpcAddress],
    // endpointName=Worker
    endpointName: String,
    workDirPath: String = null,
    val conf: SparkConf,
    val securityMgr: SecurityManager)
  extends ThreadSafeRpcEndpoint with Logging {

  // 擷取RPC的host和port
  private val host = rpcEnv.address.host
  private val port = rpcEnv.address.port

  // 檢測host和port
  Utils.checkHost(host)
  assert (port > 0)

  // A scheduled executor used to send messages at the specified time.
  // 定時發送消息的排程器
  private val forwordMessageScheduler =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")

  // A separated thread to clean up the workDir and the directories of finished applications.
  // Used to provide the implicit parameter of `Future` methods.
  // 清理workerDir和已完成任務的子線程
  private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
    ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))

  // For worker and executor IDs
  // 擷取時間作為worker及executor的ID
  private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
  // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
  // 每15秒發送一次心跳報告
  private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4

  // Model retries to connect to the master, after Hadoop's model.
  // The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds)
  // Afterwards, the next 10 attempts are between 30 and 90 seconds.
  // A bit of randomness is introduced so that not all of the workers attempt to reconnect at
  // the same time.
  // 連接配接master失敗後的重連設定
  // 前5次重試間隔5-15秒之間,後面10次間隔在30-90秒之間
  // 引入了随機性,是以基本不存多個worker在同一時間重連
  // 後10次重連的開始數
  private val INITIAL_REGISTRATION_RETRIES = 6
  // 總重試15次
  private val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
  // 重試最低間隔0.5秒
  private val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500
  private val REGISTRATION_RETRY_FUZZ_MULTIPLIER = {
    val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits)
    // 每次重連間隔+0.5秒+随機數
    randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND
  }
  // 前5次的重連間隔時間 5-15秒
  private val INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(10 *
    REGISTRATION_RETRY_FUZZ_MULTIPLIER))
  // 後10次的重連間隔 60-90秒
  private val PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(60
    * REGISTRATION_RETRY_FUZZ_MULTIPLIER))

  // 預設不清理,對應上面的cleanupThreadExecutor
  private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
  // How often worker will clean up old app folders
  // 清理設定,清理檔案時間,預設半小時
  private val CLEANUP_INTERVAL_MILLIS =
    conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
  // TTL for app folders/data;  after TTL expires it will be cleaned up
  // 程式和資料儲存時間,到期才會進行清理,預設永久儲存
  private val APP_DATA_RETENTION_SECONDS =
    conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)

  // 是否開啟測試
  private val testing: Boolean = sys.props.contains("spark.testing")
  private var master: Option[RpcEndpointRef] = None

  /**
   * Whether to use the master address in `masterRpcAddresses` if possible. If it's disabled, Worker
   * will just use the address received from Master.
   */
  // 是否使用RPC中的master的位址,否則使用從mastr擷取的位址,也就是host:port,預設關閉
  private val preferConfiguredMasterAddress =
    conf.getBoolean("spark.worker.preferConfiguredMasterAddress", false)
  /**
   * The master address to connect in case of failure. When the connection is broken, worker will
   * use this address to connect. This is usually just one of `masterRpcAddresses`. However, when
   * a master is restarted or takes over leadership, it will be an address sent from master, which
   * may not be in `masterRpcAddresses`.
   */
  // 斷開後重連的master位址
  // 一般是重連RPC中注冊的master位址,但master重新開機或選主後的新的位址可能不在RPC中
  private var masterAddressToConnect: Option[RpcAddress] = None
  private var activeMasterUrl: String = ""
  private[worker] var activeMasterWebUiUrl : String = ""
  private var workerWebUiUrl: String = ""
  // 通過RPC擷取的workerUri資訊
  private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
  private var registered = false
  private var connected = false
  // worker的ID,就是注冊host和port的系統時間
  private val workerId = generateWorkerId()
  // spark目錄
  private val sparkHome =
    if (testing) {
      assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
      new File(sys.props("spark.test.home"))
    } else {
      new File(sys.env.get("SPARK_HOME").getOrElse("."))
    }

  var workDir: File = null
  // executor完成後的資訊
  val finishedExecutors = new LinkedHashMap[String, ExecutorRunner]
  // driver資訊
  val drivers = new HashMap[String, DriverRunner]
  // executors資訊
  val executors = new HashMap[String, ExecutorRunner]
  // driver執行完的資訊
  val finishedDrivers = new LinkedHashMap[String, DriverRunner]
  // app存放目錄
  val appDirectories = new HashMap[String, Seq[String]]
  // app送出完成的資訊
  val finishedApps = new HashSet[String]

  // webUI中儲存executor數量,預設1000
  val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors",
    WorkerWebUI.DEFAULT_RETAINED_EXECUTORS)
  // webUI中儲存Driver程式數量,預設1000
  val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers",
    WorkerWebUI.DEFAULT_RETAINED_DRIVERS)

  // The shuffle service is not actually started unless configured.
  // shuffle服務,需要配置才會啟用,預設關閉
  private val shuffleService = new ExternalShuffleService(conf, securityMgr)

  private val publicAddress = {
    val envVar = conf.getenv("SPARK_PUBLIC_DNS")
    if (envVar != null) envVar else host
  }
  private var webUi: WorkerWebUI = null

  private var connectionAttemptCount = 0

  // 向metrics注冊,metrics系統後續解析
  private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
  private val workerSource = new WorkerSource(this)

  // 是否啟用反向代理,預設關閉
  val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)

  // master注冊的備用位址
  private var registerMasterFutures: Array[JFuture[_]] = null
  // 注冊的時間記錄
  private var registrationRetryTimer: Option[JScheduledFuture[_]] = None

  // A thread pool for registering with masters. Because registering with a master is a blocking
  // action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
  // time so that we can register with all masters.
  // 向多個master同時注冊用的線程池
  private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
    "worker-register-master-threadpool",
    masterRpcAddresses.length // Make sure we can register with all masters at the same time
  )

  // cpu記憶體使用資訊
  var coresUsed = 0
  var memoryUsed = 0

  def coresFree: Int = cores - coresUsed
  def memoryFree: Int = memory - memoryUsed
}
           

上面的參數在Worker執行個體化時初始化好。

Worker繼承了ThreadSafeRpcEndpoint ,而ThreadSafeRpcEndpoint繼承的RpcEndpoint

private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
           

RpcEndpoint是通過RpcEnvFactory工廠模式建立的RpcEnv執行個體。 

private[spark] trait RpcEnvFactory {

  def create(config: RpcEnvConfig): RpcEnv
}
           

 這裡Worker同樣重寫了RpcEnv的onStart、receive和onStop方法,是以初始化Worker時也會執行onStart方法。

onStart()

override def onStart() {
    // 判斷registered是否定義
    // 上面初始化了 private var registered = false
    assert(!registered)
    //啟動worker的提示資訊
    logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
      host, port, cores, Utils.megabytesToString(memory)))
    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
    logInfo("Spark home: " + sparkHome)

    // 調用方法建立WorkerDir
    createWorkDir()
    // 啟動shuffle服務
    startExternalShuffleService()
    // 初始化webUI
    webUi = new WorkerWebUI(this, workDir, webUiPort)
    // web設定到http伺服器
    webUi.bind()

    // 綁定webUI端口
    workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
    // 向master注冊worker
    registerWithMaster()

    // 注冊metrics服務
    metricsSystem.registerSource(workerSource)
    // 啟動metrics服務
    metricsSystem.start()
    // Attach the worker metrics servlet handler to the web ui after the metrics system is started.
    // 将metrics服務添加到webUI
    metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
  }
           

createWorkDir() 

private def createWorkDir() {
    // 如果沒有設定workDir,這個目錄預設在spark目錄下建立
    workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
    try {
      // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
      // So attempting to create and then check if directory was created or not.
      // 建立目錄,如果存在報錯退出
      workDir.mkdirs()
      if ( !workDir.exists() || !workDir.isDirectory) {
        logError("Failed to create work directory " + workDir)
        System.exit(1)
      }
      // 判斷路徑是否目錄,不是則提示錯誤資訊
      assert (workDir.isDirectory)
    } catch {
      // 捕獲錯誤,提示退出
      case e: Exception =>
        logError("Failed to create work directory " + workDir, e)
        System.exit(1)
    }
  }
           

 startExternalShuffleService()

external shuffle服務主要是協調多個Worker共享Spark叢集資源,後面再解讀,這裡是檢查開啟狀态。

private def startExternalShuffleService() {
    try {
      shuffleService.startIfEnabled()
    } catch {
      case e: Exception =>
        logError("Failed to start external shuffle service", e)
        System.exit(1)
    }
  }
           

繼續看onStart往下,webUi = new WorkerWebUI(this, workDir, webUiPort),初始化了WorkerWebUI

WorkerWebUI,設定webUI布置等資訊。

private[worker]
class WorkerWebUI(
    val worker: Worker,
    val workDir: File,
    requestedPort: Int)
  extends WebUI(worker.securityMgr, worker.securityMgr.getSSLOptions("standalone"),
    requestedPort, worker.conf, name = "WorkerUI")
  with Logging {

  // 連接配接逾時,參數在下面object中初始化
  private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf)

  // 初始化webUI
  initialize()

  /** Initialize all components of the server. */
  // 執行初始化web服務的所有元件
  def initialize() {
    // LogPage類初始化頁面布置
    val logPage = new LogPage(this)
    // 将web頁面添加到UI設定
    attachPage(logPage)
    // 将worker的配置加載到UI設定
    attachPage(new WorkerPage(this))
    // 指定靜态目錄用來提供檔案服務
    attachHandler(createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"))
    // 指定固定通路路徑
    attachHandler(createServletHandler("/log",
      (request: HttpServletRequest) => logPage.renderLog(request),
      worker.securityMgr,
      worker.conf))
  }
}

private[worker] object WorkerWebUI {
  // 靜态目錄位置
  val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
  // webUI中儲存的driver數量
  val DEFAULT_RETAINED_DRIVERS = 1000
  // webUI中儲存的executor數量
  val DEFAULT_RETAINED_EXECUTORS = 1000
}
           

然後 webUi.bind() 将webUI設定綁定到http伺服器。

/** Bind to the HTTP server behind this web interface. */
  def bind(): Unit = {
    // 如果設定為空,提示多次綁定
    assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!")
    try {
      // 擷取host,并從jetty擷取資訊
      val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
      serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name))
      logInfo(s"Bound $className to $host, and started at $webUrl")
    } catch {
      case e: Exception =>
        // 無法擷取配置,提示失敗退出
        logError(s"Failed to bind $className", e)
        System.exit(1)
    }
  }
           

 再往下,registerWithMaster()開始注冊,這個方法也會被re-registerWithMaster() 多次重試調用。

// 注冊到master
  private def registerWithMaster() {
    // onDisconnected may be triggered multiple times, so don't attempt registration
    // if there are outstanding registration attempts scheduled.
    // 判斷如果有服務正在連接配接,不要再次連接配接
    registrationRetryTimer match {
      // 沒有正在連接配接的服務,正常嘗試注冊到所有master
      case None =>
        registered = false
        // 通過registerMasterThreadPool線程池注冊到所有master
        registerMasterFutures = tryRegisterAllMasters()
        // 沒有連接配接,是以連接配接重試為0
        connectionAttemptCount = 0
        // 注冊逾時重試
        registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
          new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              // 周遊所有master,發送注冊資訊再次重連
              Option(self).foreach(_.send(ReregisterWithMaster))
            }
          },
          // 初始化時有設定,前5次重試時間間隔為5-15秒
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          // 後10次間隔為60-90秒
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          TimeUnit.SECONDS))
      // 如果有已經在連接配接的服務,提示等待連接配接完成
      case Some(_) =>
        logInfo("Not spawning another attempt to register with the master, since there is an" +
          " attempt scheduled already.")
    }
  }
           

tryRegisterAllMasters(),先嘗試注冊

private def tryRegisterAllMasters(): Array[JFuture[_]] = {
    // 擷取RPC中注冊的所有master位址進行注冊
    masterRpcAddresses.map { masterAddress =>
      // 線上程池中開啟線程對周遊的每個master進行注冊
      registerMasterThreadPool.submit(new Runnable {
        override def run(): Unit = {
          try {
            logInfo("Connecting to master " + masterAddress + "...")
            // 擷取RPC中注冊的所有master
            val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            // 發送注冊資訊到每個master
            sendRegisterMessageToMaster(masterEndpoint)
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        }
      })
    }
  }
           

 sendRegisterMessageToMaster(masterEndpoint)

調用了RpcEndpointRef類的send()将Worker的配置資訊發送給Master進行注冊。這個才是真正的發送注冊資訊。

private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = {
    masterEndpoint.send(RegisterWorker(
      workerId,
      host,
      port,
      self,
      cores,
      memory,
      workerWebUiUrl,
      masterEndpoint.address))
  }
           

參數self,其實就是RpcEndpointRef 執行個體。

final def self: RpcEndpointRef = {
    require(rpcEnv != null, "rpcEnv has not been initialized")
    rpcEnv.endpointRef(this)
  }
           

send()

private[spark] abstract class RpcEndpointRef(conf: SparkConf){
    ...

 /**
   * Sends a one-way asynchronous message. Fire-and-forget semantics.
   */
  def send(message: Any): Unit
}
           

這裡可以傳回Master的 receive() 看看怎麼接收的。

override def receive: PartialFunction[Any, Unit] = {
    // 接收選主的資訊等等...
    case ElectedLeader =>
        ...
      }

    case CompleteRecovery => completeRecovery()

    // 接收撤銷主資訊
    case RevokedLeadership =>
      logError("Leadership has been revoked -- master shutting down.")
      System.exit(0)

    // 其實跟Worer發送的資訊相對應
    case RegisterWorker(
      id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
      // 注冊的log資訊
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))

      // STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY
      // 如果master是standby狀态,調用傳入的workerRef(RpcEndpointRef)的send()傳回master不存在的資訊
      if (state == RecoveryState.STANDBY) {
        workerRef.send(MasterInStandby)
        // 不是standby,判斷workerId是否存在,idToWorker是master中初始化的workerID,存在提示worker存在
      } else if (idToWorker.contains(id)) {
        workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
        // 正常注冊worker
      } else {
        // 先通過傳入的參數建立worker執行個體
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          workerRef, workerWebUiUrl)
        // 持久化判斷,如果worker執行個體正常建立,持久化worker,添加worker
        if (registerWorker(worker)) {
          persistenceEngine.addWorker(worker)
          workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
          schedule()
        } else {
          // worker執行個體沒有正常建立,擷取worker位址,提示并發送重試錯誤資訊
          val workerAddress = worker.endpoint.address
          logWarning("Worker registration failed. Attempted to re-register worker at same " +
            "address: " + workerAddress)
          workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
            + workerAddress))
        }
      }
    }
           

 如果注冊成功,通過workerRef.send()發送Worker序列化執行個體。

private[deploy] sealed trait DeployMessage extends Serializable

/** Contains messages sent between Scheduler endpoint nodes. */
// 排程的終端發送到各個節點的worker資訊
private[deploy] object DeployMessages {
  case class RegisteredWorker(
      master: RpcEndpointRef,
      masterWebUiUrl: String,
      masterAddress: RpcAddress) extends DeployMessage with RegisterWorkerResponse

  ...
}
           

 schedule(),資源的排程,具體排程元件後續解析。

/**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   */
  // 資源排程,如果等待的app或者新app加入時,有可用資源就會排程這個方法
  // 如果master不存活,退出
  private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) {
      return
    }
    // Drivers take strict precedence over executors
    // driver的執行優先于executors
    // 這裡是判斷driver的執行情況
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      var launched = false
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
    startExecutorsOnWorkers()
  }
           

注冊階段完成,傳回onStart()中,metricsSystem類後續解讀,onStart基本就完成了。

繼續傳回org.apache.spark.deploy.worker.Worker的startRpcEnvAndEndpoint()中,Worker的執行個體化也就完成了。

Worker裡面還有一些其他的方法,以後調用的時候再解讀。

繼續閱讀