上篇文章 《
Spark 源碼解析 : DAGScheduler中的DAG劃分與送出
》 介紹了DAGScheduler的Stage劃分算法。 原創文章,轉載請注明: 轉載自 聽風居士部落格( http://blog.csdn.net/zhouzx2010)
本文繼續分析Stage被封裝成TaskSet,并将TaskSet送出到叢集的Executor執行的過程
在DAGScheduler的submitStage方法中,将Stage劃分完成,生成拓撲結構,當一個stage沒有父stage時候,會調用 DAGScheduler的 submitMissingTasks方法來送出該stage包含tasks。 首先來分析一下 DAGScheduler的 submitMissingTasks方法
1.擷取Task的最佳計算位置:
- val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
- stage match {
- case s: ShuffleMapStage =>
- partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
- case s: ResultStage =>
- val job = s.activeJob.get
- partitionsToCompute.map { id =>
- val p = s.partitions(id)
- (id, getPreferredLocs(stage.rdd, p))
- }.toMap
- }
- }
核心是其中的getPreferredLocs方法,根據RDD的資料資訊得到task的最佳計算位置,進而擷取較好的資料本地性。其中的細節這裡先跳過,在以後的文章在做分析
2.序列化Task的Binary,并進行廣播。Executor端在執行task時會向反序列化Task。
3.根據stage的不同類型建立,為stage的每個分區建立建立task,并封裝成TaskSet。Stage分兩種類型ShuffleMapStage生成ShuffleMapTask,ResultStage生成ResultTask。
- val tasks: Seq[Task[_]] = try {
- stage match {
- case stage: ShuffleMapStage =>
- partitionsToCompute.map { id =>
- val locs = taskIdToLocations(id)
- val part = stage.rdd.partitions(id)
- new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, stage.internalAccumulators)
- }
-
- case stage: ResultStage =>
- val job = stage.activeJob.get
- partitionsToCompute.map { id =>
- val p: Int = stage.partitions(id)
- val part = stage.rdd.partitions(p)
- val locs = taskIdToLocations(id)
- new ResultTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, id, stage.internalAccumulators)
- }
- }
4.調用TaskScheduler的submitTasks,送出TaskSet
- logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
- stage.pendingPartitions ++= tasks.map(_.partitionId)
- logDebug("New pending partitions: " + stage.pendingPartitions)
- taskScheduler.submitTasks(new TaskSet(
- tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
- stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
submitTasks方法的實作在TaskScheduler的實作類TaskSchedulerImpl中。
4.1 TaskSchedulerImpl的submitTasks方法首先建立TaskSetManager。
- val manager = createTaskSetManager(taskSet, maxTaskFailures)
- val stage = taskSet.stageId
- val stageTaskSets =
- taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
- stageTaskSets(taskSet.stageAttemptId) = manager
TaskSetManager負責管理 TaskSchedulerImpl中 一個單獨TaskSet,跟蹤每一個task,如果task失敗,負責重試task直到達到task重試次數的最多次數。并且通過延遲排程來執行task的位置感覺排程。
- private[spark] class TaskSetManager(
- sched: TaskSchedulerImpl,//綁定的TaskSchedulerImpl
- val taskSet: TaskSet,
- val maxTaskFailures: Int, //失敗最大重試次數
- clock: Clock = new SystemClock())
- extends Schedulable with Logging
4.2 将TaskSetManger加入schedulableBuilder
- schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) //将TaskSetManager加入rootPool排程池中,由schedulableBuilder決定排程順序
schedulableBuilder的類型是 SchedulerBuilder, SchedulerBuilder是一個trait,有兩個實作FIFO SchedulerBuilder和 Fair SchedulerBuilder,并且預設采用的是FIFO方式
- // default scheduler is FIFO
- private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
而 schedulableBuilder的建立是在SparkContext建立SchedulerBackend和TaskScheduler後調用TaskSchedulerImpl的初始化方法進行建立的。
- def initialize(backend: SchedulerBackend) {
- this.backend = backend
- // temporarily set rootPool name to empty
- rootPool = new Pool("", schedulingMode, 0, 0)
- schedulableBuilder = {
- schedulingMode match {
- case SchedulingMode.FIFO =>
- new FIFOSchedulableBuilder(rootPool)
- case SchedulingMode.FAIR =>
- new FairSchedulableBuilder(rootPool, conf)
- }
- }
- schedulableBuilder.buildPools()
- }
schedulableBuilder是TaskScheduler中一個重要成員,他根據排程政策決定了TaskSetManager的排程順序。
4.3 接下來調用SchedulerBackend的riviveOffers方法對Task進行排程,決定task具體運作在哪個Executor中。
調用CoarseGrainedSchedulerBackend的riviveOffers方法,該方法給driverEndpoint發送ReviveOffer消息
- override def reviveOffers() {
- driverEndpoint.send(ReviveOffers)
- }
driverEndpoint收到 ReviveOffer消息後調用makeOffers方法
- // Make fake resource offers on all executors
- private def makeOffers() {
- //過濾出活躍狀态的Executor
- val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
- //将Executor封裝成WorkerOffer對象
- val workOffers = activeExecutors.map { case (id, executorData) =>
- new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
- }.toSeq
-
- launchTasks(scheduler.resourceOffers(workOffers))
- }
注意:上面代碼中的executorDataMap,在客戶的向Master注冊Application的時候,Master已經為 Application配置設定并啟動好Executor,然後注冊給 CoarseGrainedSchedulerBackend,注冊資訊就是存儲在executorDataMap資料結構中。
準備好計算資源後,接下來TaskSchedulerImpl基于這些計算資源為task配置設定Executor。 我們看一下 TaskSchedulerImpl的resourceOffers方法:
- // 随機打亂offers
- val shuffledOffers = Random.shuffle(offers)
-
- // 建構一個二維數組,儲存每個Executor上将要配置設定的那些task
- val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
- val availableCpus = shuffledOffers.map(o => o.cores).toArray
-
- // 根據SchedulerBuilder的排程算法,給TaskManager排好序
- val sortedTaskSets = rootPool.getSortedTaskSetQueue
- for (taskSet <- sortedTaskSets) {
- logDebug("parentName: %s, name: %s, runningTasks: %s".format(
- taskSet.parent.name, taskSet.name, taskSet.runningTasks))
- if (newExecAvail) {
- taskSet.executorAdded()
- }
- }
-
- // 使用雙重循環,對每一個taskset 依照排程的順序,依次按照本地性級别順序嘗試啟動task
- // 資料本地性級别順序: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
- var launchedTask = false
- for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
- do {
- launchedTask = resourceOfferSingleTaskSet(
- taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
- } while (launchedTask)
- }
-
- if (tasks.size > 0) {
- hasLaunchedTask = true
- }
- return tasks
下面看看 resourceOfferSingleTaskSet 方法:
用目前的資料本地性,調用TaskSetManager的resourceOffer方法,在目前executor上配置設定task
- private def resourceOfferSingleTaskSet(
- taskSet: TaskSetManager,
- maxLocality: TaskLocality,
- shuffledOffers: Seq[WorkerOffer],
- availableCpus: Array[Int],
- tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
- var launchedTask = false
- for (i <- 0 until shuffledOffers.size) {
- val execId = shuffledOffers(i).executorId
- val host = shuffledOffers(i).host
- //如果executor 的cup數大于 每個task的cup數目(值為1)
- if (availableCpus(i) >= CPUS_PER_TASK) {
- try {
- //
- for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
- tasks(i) += task
- val tid = task.taskId
- taskIdToTaskSetManager(tid) = taskSet
- taskIdToExecutorId(tid) = execId
- executorIdToTaskCount(execId) += 1
- executorsByHost(host) += execId
- availableCpus(i) -= CPUS_PER_TASK
- assert(availableCpus(i) >= 0)
- launchedTask = true
- }
- }
為Task配置設定好資源之後,DriverEndpint調用launchTask方法将task在Executor上啟動運作。task在Executor上的啟動運作過程,在後面的文章中會繼續分析,敬請關注。
總結一下調用過程: TaskSchedulerImpl# submitTasks
CoarseGrainedSchedulerBackend# riviveOffers
CoarseGrainedSchedulerBackend$DriverEndpoint#makeOffers
|-TaskSchedulerImpl#resourceOffers(offers) 為offers配置設定task |- TaskSchedulerImpl # resourceOfferSingleTaskSet
CoarseGrainedSchedulerBackend$DriverEndpoint#launchTask
原創文章,轉載請注明: 轉載自 聽風居士部落格( http://blog.csdn.net/zhouzx2010)