天天看點

Spark 源碼解析:徹底了解TaskScheduler的任務送出和task最佳位置算法 Spark 源碼解析 : DAGScheduler中的DAG劃分與送出

上篇文章 《  

Spark 源碼解析 : DAGScheduler中的DAG劃分與送出

》 介紹了DAGScheduler的Stage劃分算法。 原創文章,轉載請注明: 轉載自 聽風居士部落格( http://blog.csdn.net/zhouzx2010)   

本文繼續分析Stage被封裝成TaskSet,并将TaskSet送出到叢集的Executor執行的過程

在DAGScheduler的submitStage方法中,将Stage劃分完成,生成拓撲結構,當一個stage沒有父stage時候,會調用 DAGScheduler的 submitMissingTasks方法來送出該stage包含tasks。 首先來分析一下 DAGScheduler的 submitMissingTasks方法

1.擷取Task的最佳計算位置:

  1. val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
  2. stage match {
  3. case s: ShuffleMapStage =>
  4. partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
  5. case s: ResultStage =>
  6. val job = s.activeJob.get
  7. partitionsToCompute.map { id =>
  8. val p = s.partitions(id)
  9. (id, getPreferredLocs(stage.rdd, p))
  10. }.toMap
  11. }
  12. }

核心是其中的getPreferredLocs方法,根據RDD的資料資訊得到task的最佳計算位置,進而擷取較好的資料本地性。其中的細節這裡先跳過,在以後的文章在做分析

2.序列化Task的Binary,并進行廣播。Executor端在執行task時會向反序列化Task。

3.根據stage的不同類型建立,為stage的每個分區建立建立task,并封裝成TaskSet。Stage分兩種類型ShuffleMapStage生成ShuffleMapTask,ResultStage生成ResultTask。

  1. val tasks: Seq[Task[_]] = try {
  2. stage match {
  3. case stage: ShuffleMapStage =>
  4. partitionsToCompute.map { id =>
  5. val locs = taskIdToLocations(id)
  6. val part = stage.rdd.partitions(id)
  7. new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
  8. taskBinary, part, locs, stage.internalAccumulators)
  9. }
  10. case stage: ResultStage =>
  11. val job = stage.activeJob.get
  12. partitionsToCompute.map { id =>
  13. val p: Int = stage.partitions(id)
  14. val part = stage.rdd.partitions(p)
  15. val locs = taskIdToLocations(id)
  16. new ResultTask(stage.id, stage.latestInfo.attemptId,
  17. taskBinary, part, locs, id, stage.internalAccumulators)
  18. }
  19. }

4.調用TaskScheduler的submitTasks,送出TaskSet

  1. logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
  2. stage.pendingPartitions ++= tasks.map(_.partitionId)
  3. logDebug("New pending partitions: " + stage.pendingPartitions)
  4. taskScheduler.submitTasks(new TaskSet(
  5. tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  6. stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

submitTasks方法的實作在TaskScheduler的實作類TaskSchedulerImpl中。

4.1 TaskSchedulerImpl的submitTasks方法首先建立TaskSetManager。

  1. val manager = createTaskSetManager(taskSet, maxTaskFailures)
  2. val stage = taskSet.stageId
  3. val stageTaskSets =
  4. taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
  5. stageTaskSets(taskSet.stageAttemptId) = manager

TaskSetManager負責管理 TaskSchedulerImpl中 一個單獨TaskSet,跟蹤每一個task,如果task失敗,負責重試task直到達到task重試次數的最多次數。并且通過延遲排程來執行task的位置感覺排程。

  1. private[spark] class TaskSetManager(
  2. sched: TaskSchedulerImpl,//綁定的TaskSchedulerImpl
  3. val taskSet: TaskSet,
  4. val maxTaskFailures: Int, //失敗最大重試次數
  5. clock: Clock = new SystemClock())
  6. extends Schedulable with Logging

4.2 将TaskSetManger加入schedulableBuilder

  1. schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) //将TaskSetManager加入rootPool排程池中,由schedulableBuilder決定排程順序

schedulableBuilder的類型是 SchedulerBuilder, SchedulerBuilder是一個trait,有兩個實作FIFO SchedulerBuilder和  Fair SchedulerBuilder,并且預設采用的是FIFO方式

  1. // default scheduler is FIFO
  2. private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")

而 schedulableBuilder的建立是在SparkContext建立SchedulerBackend和TaskScheduler後調用TaskSchedulerImpl的初始化方法進行建立的。

  1. def initialize(backend: SchedulerBackend) {
  2. this.backend = backend
  3. // temporarily set rootPool name to empty
  4. rootPool = new Pool("", schedulingMode, 0, 0)
  5. schedulableBuilder = {
  6. schedulingMode match {
  7. case SchedulingMode.FIFO =>
  8. new FIFOSchedulableBuilder(rootPool)
  9. case SchedulingMode.FAIR =>
  10. new FairSchedulableBuilder(rootPool, conf)
  11. }
  12. }
  13. schedulableBuilder.buildPools()
  14. }

schedulableBuilder是TaskScheduler中一個重要成員,他根據排程政策決定了TaskSetManager的排程順序。

4.3 接下來調用SchedulerBackend的riviveOffers方法對Task進行排程,決定task具體運作在哪個Executor中。

調用CoarseGrainedSchedulerBackend的riviveOffers方法,該方法給driverEndpoint發送ReviveOffer消息

  1. override def reviveOffers() {
  2. driverEndpoint.send(ReviveOffers)
  3. }

driverEndpoint收到 ReviveOffer消息後調用makeOffers方法

  1. // Make fake resource offers on all executors
  2. private def makeOffers() {
  3. //過濾出活躍狀态的Executor
  4. val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
  5.      //将Executor封裝成WorkerOffer對象
  6. val workOffers = activeExecutors.map { case (id, executorData) =>
  7. new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
  8. }.toSeq
  9. launchTasks(scheduler.resourceOffers(workOffers))
  10. }

注意:上面代碼中的executorDataMap,在客戶的向Master注冊Application的時候,Master已經為 Application配置設定并啟動好Executor,然後注冊給 CoarseGrainedSchedulerBackend,注冊資訊就是存儲在executorDataMap資料結構中。

準備好計算資源後,接下來TaskSchedulerImpl基于這些計算資源為task配置設定Executor。 我們看一下 TaskSchedulerImpl的resourceOffers方法:

  1. // 随機打亂offers
  2. val shuffledOffers = Random.shuffle(offers)
  3. // 建構一個二維數組,儲存每個Executor上将要配置設定的那些task
  4. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
  5. val availableCpus = shuffledOffers.map(o => o.cores).toArray
  1.    // 根據SchedulerBuilder的排程算法,給TaskManager排好序
    1. val sortedTaskSets = rootPool.getSortedTaskSetQueue
  2. for (taskSet <- sortedTaskSets) {
  3. logDebug("parentName: %s, name: %s, runningTasks: %s".format(
  4. taskSet.parent.name, taskSet.name, taskSet.runningTasks))
  5. if (newExecAvail) {
  6. taskSet.executorAdded()
  7. }
  8. }
  9. // 使用雙重循環,對每一個taskset 依照排程的順序,依次按照本地性級别順序嘗試啟動task
  10. // 資料本地性級别順序: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
  11. var launchedTask = false
  12. for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
  13. do {
  14. launchedTask = resourceOfferSingleTaskSet(
  15. taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
  16. } while (launchedTask)
  17. }
  18. if (tasks.size > 0) {
  19. hasLaunchedTask = true
  20. }
  21. return tasks

下面看看  resourceOfferSingleTaskSet 方法:

用目前的資料本地性,調用TaskSetManager的resourceOffer方法,在目前executor上配置設定task      
  1. private def resourceOfferSingleTaskSet(
  2. taskSet: TaskSetManager,
  3. maxLocality: TaskLocality,
  4. shuffledOffers: Seq[WorkerOffer],
  5. availableCpus: Array[Int],
  6. tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
  7. var launchedTask = false
  8. for (i <- 0 until shuffledOffers.size) {
  9. val execId = shuffledOffers(i).executorId
  10. val host = shuffledOffers(i).host
  11.        //如果executor 的cup數大于 每個task的cup數目(值為1)
  12. if (availableCpus(i) >= CPUS_PER_TASK) {
  13. try {
  14.        //
  15. for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
  16. tasks(i) += task
  17. val tid = task.taskId
  18. taskIdToTaskSetManager(tid) = taskSet
  19. taskIdToExecutorId(tid) = execId
  20. executorIdToTaskCount(execId) += 1
  21. executorsByHost(host) += execId
  22. availableCpus(i) -= CPUS_PER_TASK
  23. assert(availableCpus(i) >= 0)
  24. launchedTask = true
  25. }
  26. }

為Task配置設定好資源之後,DriverEndpint調用launchTask方法将task在Executor上啟動運作。task在Executor上的啟動運作過程,在後面的文章中會繼續分析,敬請關注。

總結一下調用過程: TaskSchedulerImpl# submitTasks

CoarseGrainedSchedulerBackend# riviveOffers

CoarseGrainedSchedulerBackend$DriverEndpoint#makeOffers

  |-TaskSchedulerImpl#resourceOffers(offers) 為offers配置設定task      |-  TaskSchedulerImpl # resourceOfferSingleTaskSet

CoarseGrainedSchedulerBackend$DriverEndpoint#launchTask

原創文章,轉載請注明: 轉載自 聽風居士部落格( http://blog.csdn.net/zhouzx2010)   

繼續閱讀