天天看點

Spark Scheduler子產品源碼分析之DAGScheduler一、DAGScheduler的建構二、Job的送出三、Stage的劃分四、Stage的送出

  本文主要結合Spark-1.6.0的源碼,對Spark中任務排程子產品的執行過程進行分析。Spark Application在遇到Action操作時才會真正的送出任務并進行計算。這時Spark會根據Action操作之前一系列Transform操作的關聯關系,生成一個DAG,在後續的操作中,對DAG進行Stage劃分,生成Task并最終運作。整個過程如下圖所示,DAGScheduler用于對Application進行分析,然後根據各RDD之間的依賴關系劃分Stage,根據這些劃分好的Stage,對應每個Stage會生成一組Task,将Task Set送出到TaskScheduler後,會由TaskScheduler啟動Executor進行任務的計算。

  

Spark Scheduler子產品源碼分析之DAGScheduler一、DAGScheduler的建構二、Job的送出三、Stage的劃分四、Stage的送出

  在任務排程子產品中最重要的三個類是:

1. org.apache.spark.scheduler.DAGScheduler

2. org.apache.spark.scheduler.SchedulerBackend

3. org.apache.spark.scheduler.TaskScheduler

這裡面SchedulerBackend主要起到的作用是為Task配置設定計算資源。

  由于TaskScheduler與SchedulerBackend結合比較緊密,并且從生成來看都是在同一個方法生成,是以接下來分成兩篇部落格對這三個主要的類進行分析,本文分析DAGScheduler的執行過程。有關SchedulerBackend和TaskScheduler的分析,可以通路Spark Scheduler子產品源碼分析之TaskScheduler和SchedulerBackend。

一、DAGScheduler的建構

  Spark在構造SparkContext時就會生成DAGScheduler的執行個體。

val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched//生成schedulerBackend
    _taskScheduler = ts//生成taskScheduler
    _dagScheduler = new DAGScheduler(this)//生成dagScheduler,傳入目前sparkContext對象。
           

  在生成_dagScheduler之前,已經生成了_schedulerBackend和_taskScheduler對象。這兩個對象會在接下來第二和第三部分中介紹。之是以taskScheduler對象在dagScheduler對象構造之前先生成,是由于在生成DAGScheduler的構造方法中會從傳入的SparkContext中擷取到taskScheduler對象

def this(sc: SparkContext) = this(sc, sc.taskScheduler)

  看一下DAGScheduler對象的主構造方法,

class DAGScheduler(
    private[scheduler] val sc: SparkContext, // 獲得目前SparkContext對象
    private[scheduler] val taskScheduler: TaskScheduler,  // 獲得目前saprkContext内置的taskScheduler
    listenerBus: LiveListenerBus,     // 異步處理事件的對象,從sc中擷取
    mapOutputTracker: MapOutputTrackerMaster, //運作在Driver端管理shuffle map task的輸出,從sc中擷取
    blockManagerMaster: BlockManagerMaster, //運作在driver端,管理整個Job的Block資訊,從sc中擷取
    env: SparkEnv, // 從sc中擷取
    clock: Clock = new SystemClock())
           

  其中有關LiveListenerBus會在Spark-1.6.0之Application運作資訊記錄器JobProgressListener中有具體介紹。MapOutputTrackerMaster,BlockManagerMaster後續也會寫部落格進行分析。

DAGScheduler的資料結構

  在DAGScheduler的源代碼中,定義了很多變量,在剛構造出來時,僅僅隻是初始化這些變量,具體使用是在後面Job送出的過程中了。

private[scheduler] val nextJobId = new AtomicInteger() // 生成JobId
  private[scheduler] def numTotalJobs: Int = nextJobId.get() // 總的Job數
  private val nextStageId = new AtomicInteger() // 下一個StageId

  // 記錄某個job對應的包含的所有stage
  private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] 
  // 記錄StageId對應的Stage
  private[scheduler] val stageIdToStage = new HashMap[Int, Stage] 
  // 記錄每一個shuffle對應的ShuffleMapStage,key為shuffleId
  private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage] 
  // 記錄處于Active狀态的job,key為jobId, value為ActiveJob類型對象
  private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]  

  // 等待運作的Stage,一般這些是在等待Parent Stage運作完成才能開始
  private[scheduler] val waitingStages = new HashSet[Stage]

  // 處于Running狀态的Stage
  private[scheduler] val runningStages = new HashSet[Stage]

  // 失敗原因為fetch failures的Stage,并等待重新送出
  private[scheduler] val failedStages = new HashSet[Stage]
  // active狀态的Job清單
  private[scheduler] val activeJobs = new HashSet[ActiveJob]
  // 處理Scheduler事件的對象
  private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
           

  DAGScheduler構造完成,并初始化一個eventProcessLoop執行個體後,會調用其

eventProcessLoop.start()

方法,啟動一個多線程,然後把各種event都送出到eventProcessLoop中。這個eventProcessLoop比較重要,在後面也會提到。

二、Job的送出

  一個Job實際上是從RDD調用一個Action操作開始的,該Action操作最終會進入到

org.apache.spark.SparkContext.runJob()

方法中,在SparkContext中有多個重載的runJob方法,最終入口是下面這個:

// SparkContext.runJob
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }
           

  這裡調用

dagScheduler.runJob()

方法後,正式進入之前構造的DAGScheduler對象中。在這個方法中,後續一系列的過程以此為:

1. DAGScheduler#runJob

  執行過程中各變量的内容如下圖所示

  

Spark Scheduler子產品源碼分析之DAGScheduler一、DAGScheduler的建構二、Job的送出三、Stage的劃分四、Stage的送出

  調用DAGScheduler.submitJob方法後會得到一個JobWaiter執行個體來監聽Job的執行情況。針對Job的Succeeded狀态和Failed狀态,在接下來代碼中都有不同的處理方式。

  

2. DAGScheduler#submitJob

  進入submitJob方法,首先會去檢查rdd的分區資訊,在確定rdd分區資訊正确的情況下,給目前job生成一個jobId,nexJobId在剛構造出來時是從0開始編号的,在同一個SparkContext中,jobId會逐漸順延。然後構造出一個JobWaiter對象傳回給上一級調用函數。通過上面提到的eventProcessLoop送出該任務,最終會調用到

DAGScheduler.handleJobSubmitted

來處理這次送出的Job。handleJobSubmitted在下面的Stage劃分部分會有提到。

  

Spark Scheduler子產品源碼分析之DAGScheduler一、DAGScheduler的建構二、Job的送出三、Stage的劃分四、Stage的送出

  

3. DAGSchedulerEventProcessLoop#post

  在前面的方法中,調用post方法傳入的是一個JobSubmitted執行個體。DAGSchedulerEventProcessLoop類繼承自EventLoop類,其中的post方法也是在EventLoop中定義的。在EventLoop中維持了一個LinkedBlockingDeque類型的事件隊列,将該Job送出事件存入該隊列後,事件線程會從隊列中取出事件并進行處理。

private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() // 事件隊列
  def post(event: E): Unit = {
    eventQueue.put(event) // 将JobSubmitted,Job送出事件存入該隊列中
  }
           

4、EventLoop#run

  該方法從eventQueue隊列中順序取出event,調用onReceive方法處理事件

val event = eventQueue.take()
try {
   onReceive(event)
}
           

5、DAGSchedulerEventProcessLoop#onReceive

  在onReceive方法中,進一步調用doOnReceive方法

override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
      doOnReceive(event)
    } finally {
      timerContext.stop()
    }
  }
           

6、DAGSchedulerEventProcessLoop#doOnReceive

  在該方法中,根據事件類别分别比對不同的方法進一步處理。本次傳入的是JobSubmitted方法,那麼進一步調用的方法是DAGScheduler.handleJobSubmitted。這部分的邏輯,以及還可以處理的其他事件,都在下面的源代碼中。

  

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
// 處理Job送出事件
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
// 處理Map Stage送出事件
    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
// 處理Stage取消事件
    case StageCancelled(stageId) =>
      dagScheduler.handleStageCancellation(stageId)
// 處理Job取消事件
    case JobCancelled(jobId) =>
      dagScheduler.handleJobCancellation(jobId)
// 處理Job組取消事件
    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)
// 處理是以Job取消事件
    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()
// 處理Executor配置設定事件
    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)
// 處理Executor丢失事件
    case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)
// 處理完成事件
    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)
// 處理task集失敗事件
    case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
// 處理重新送出失敗Stage事件
    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }
           

7、DAGScheduler#handleJobSubmitted

  當Job送出後,JobSubmitted事件會被eventProcessLoop捕獲到,然後進入本方法中。開始處理Job,并執行Stage的劃分。這一部分會銜接下一節,是以這個方法的源碼以及Stage如何劃分會在下一節中較長的描述。

  

三、Stage的劃分

  Stage的劃分過程中,會涉及到寬依賴和窄依賴的概念,寬依賴是Stage的分界線,連續的窄依賴都屬于同一Stage。

  

Spark Scheduler子產品源碼分析之DAGScheduler一、DAGScheduler的建構二、Job的送出三、Stage的劃分四、Stage的送出

  比如上圖中,在RDD G處調用了Action操作,在劃分Stage時,會從G開始逆向分析,G依賴于B和F,其中對B是窄依賴,對F是寬依賴,是以F和G不能算在同一個Stage中,即在F和G之間會有一個Stage分界線。上圖中還有一處寬依賴在A和B之間,是以這裡還會分出一個Stage。最終形成了3個Stage,由于Stage1和Stage2是互相獨立的,是以可以并發執行,等Stage1和Stage2準備就緒後,Stage3才能開始執行。

  Stage有兩個類型,最後的Stage為ResultStage類型,除此之外的Stage都是ShuffleMapStage類型。

1、DAGScheduler#handleJobSubmitted

  這個方法的具體代碼如下所示,前面提到了Stage的劃分是從最後一個Stage開始逆推的,每遇到一個寬依賴處,就分裂成另外一個Stage,依此類推直到Stage劃分完畢為止。并且,隻有最後一個Stage的類型是ResultStage類型。

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      // Stage劃分過程是從最後一個Stage開始往前執行的,最後一個Stage的類型是ResultStage
      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    //為該Job生成一個ActiveJob對象,并準備計算這個finalStage
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job // 該job進入active狀态
    activeJobs += job
    finalStage.setActiveJob(job) 
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post( // 向LiveListenerBus發送Job送出事件
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)   //送出目前Stage

    submitWaitingStages()
  }
           

2、DAGScheduler#newResultStage

  在這個方法中,會根據最後調用Action的那個RDD,以及方法調用過程callSite,生成的jobId,partitions等資訊生成最後那個Stage。

private def newResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    // 擷取目前Stage的parent Stage,這個方法是劃分Stage的核心實作
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)// 建立目前最後的ResultStage
    stageIdToStage(id) = stage // 将ResultStage與stageId相關聯
    updateJobIdStageIdMaps(jobId, stage) // 更新該job中包含的stage
    stage
  }
           

3、DAGScheduler#getParentStagesAndId

  這個方法主要是為目前的RDD向前探索,找到寬依賴處劃分出parentStage,并為目前RDD所屬Stage生成一個stageId。在這個方法中,getParentStages的調用鍊最終遞歸調用到了這個方法,是以,最後一個Stage的stageId最大,越往前的stageId就越小,stageId小的Stage先執行。

private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
    val parentStages = getParentStages(rdd, firstJobId) // 傳入rdd和jobId,生成parentStage
    // 生成目前stage的stageId。同一Application中Stage初始編号為
    val id = nextStageId.getAndIncrement() 
    (parentStages, id)
  }
           

4、DAGScheduler#getParentStages

  從目前rdd開始往前探索父rdd,在每一個寬依賴處生成一個parentStage,而窄依賴的rdd,繼續壓入棧中,等待下一輪分析窄依賴父rdd的父rdd,一直找到寬依賴生成新的stage,或者直到第一個rdd為止。同時,使用一個HashSet來儲存通路過的rdd,後面分析時遇到重複依賴時也能保證每個rdd隻被分析了一次。一個Job中,除了最後一個Stage是ResultStage類型之外,他的Stage都是ShuffleMapStage結構。

private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    val parents = new HashSet[Stage] // 存儲目前stage的所有parent stage
    val visited = new HashSet[RDD[_]] // 存儲通路過的rdd
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]] // 一個棧,儲存未通路過的rdd,先進後出
    def visit(r: RDD[_]) {
      if (!visited(r)) { // 如果棧中彈出的rdd被未通路過
        visited += r // 首先将其标記為已通路
        // Kind of ugly: need to register RDDs with the cache here since
        // we can't do it in its constructor because # of partitions is unknown
        for (dep <- r.dependencies) { // 讀取當然rdd的依賴
          dep match {
            case shufDep: ShuffleDependency[_, _, _] => // 如果是寬依賴,則擷取依賴rdd所在的ShuffleMapStage
              parents += getShuffleMapStage(shufDep, firstJobId)
            case _ =>
              // 如果是窄依賴,将依賴的rdd也壓入棧中,下次循環時會探索該rdd的依賴情況,直到找到款依賴劃分新的stage為止
              waitingForVisit.push(dep.rdd) 
          }
        }
      }
    }
    waitingForVisit.push(rdd) // 将目前rdd壓入棧中
    while (waitingForVisit.nonEmpty) { // 如果棧中有未被通路的rdd
      visit(waitingForVisit.pop()) // 
    }
    parents.toList
  }
           

5、DAGScheduler.newOrUsedShuffleStage

  這裡會為目前Shuffle生成一個ShuffleMapStage,并且會與MapOutputTracker打交道,記錄本次Shuffle的一些資訊。

private def newOrUsedShuffleStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    val numTasks = rdd.partitions.length // 根據目前rdd的paritions個數,計算出目前Stage的task個數。
    // 為目前rdd生成ShuffleMapStage
    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite) 
    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { 
      // 如果目前shuffle已經在MapOutputTracker中注冊過
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      ( until locs.length).foreach { i => // 更新Shuffle的Shuffle Write路徑
        if (locs(i) ne null) {
          // locs(i) will be null if missing
          stage.addOutputLoc(i, locs(i))
        }
      }
    } else { // 如果目前Shuffle沒有在MapOutputTracker中注冊過
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) // 注冊
    }
    stage
  }
           

6、DAGScheduler#getShuffleMapStage

  為目前寬依賴的Map端生成一個新的ShuffleMapStage類型的Stage。同時也為目前Shuffle的父Shuffle生成一個Stage。通過

DAGScheduler.getAncestorShuffleDependencies

擷取目前Shuffle的父Shuffle,這個方法的邏輯和上面的

DAGScheduler.getParentStages

擷取目前Stage的父Stage類似。

private def getShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    shuffleToMapStage.get(shuffleDep.shuffleId) match { // 從Shuffle和Stage映射中取出目前Shuffle對應的Stage
      case Some(stage) => stage // 如果該shuffle已經生成過stage,則直接傳回
      case None => // 否則為目前shuffle生成新的stage
        // We are going to register ancestor shuffle dependencies
        getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => 
          // 為目前shuffle的父shuffle都生成一個ShuffleMapStage
          shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
        }
        // Then register current shuffleDep
        val stage = newOrUsedShuffleStage(shuffleDep, firstJobId) // 為目前shuffle生成一個ShuffleMapStage
        shuffleToMapStage(shuffleDep.shuffleId) = stage // 更新Shuffle和Stage的映射關系
        stage
    }
  }
           

7、DAGScheduler#newShuffleMapStage

  這個和2類似,不同的是2是生成最終的ResultStage,而這裡是生成ShuffleMapStage,不過這兩者都會調用方法3,最終形成了一個遞歸調用。

private def newShuffleMapStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int,
      callSite: CallSite): ShuffleMapStage = {
    // 擷取目前rdd的父Stage和stageId
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId) 
    // 生成新的ShuffleMapStage
    val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages, 
      firstJobId, callSite, shuffleDep)

    stageIdToStage(id) = stage // 将ShuffleMapStage與stageId相關聯
    updateJobIdStageIdMaps(firstJobId, stage) // 更新該job中包含的stage
    stage
  }
           

四、Stage的送出

  這部分主要梳理Stage生成後如何送出,任務的送出和生成入口在前面DAGScheduler#handleJobSubmitted方法中。

1、DAGScheduler#handleJobSubmitted

  這個方法的代碼可以到第三節中檢視。生成了finalStage後,就會為該Job生成一個ActiveJob對象了,并準備計算這個finalStage。

  ActiveJob對象中的資訊比較少,可以看其類定義

private[spark] class ActiveJob(
    val jobId: Int,
    val finalStage: Stage,
    val callSite: CallSite,
    val listener: JobListener,
    val properties: Properties) {

  /**
   * 該Job需要計算的partitions個數
   */
  val numPartitions = finalStage match {
    case r: ResultStage => r.partitions.length
    case m: ShuffleMapStage => m.rdd.partitions.length
  }

  /** 一個Boolean類型的數組,初始值為false
   * 數組長度為partitions個數,哪個partition被計算了,則對應的
   * 值标記為true
   */
  val finished = Array.fill[Boolean](numPartitions)(false)
  // 處理完成的partition個數
  var numFinished = 
}
           

  在

DAGScheduler.handleJobSubmitted

方法的最後,調用了

DAGScheduler.submitStage

方法,在送出finalSate的前面,會通過listenerBus的post方法,把Job開始的事件送出到Listener中。

2、DAGScheduler#submitStage

  送出Job的送出,是從最後那個Stage開始的。如果目前stage已經被送出過,處于waiting或者waiting狀态,或者目前stage已經處于failed狀态則不作任何處理,否則繼續送出該stage。

  在送出時,需要目前Stage需要滿足依賴關系,其前置的Parent Stage都運作完成後才能輪得到目前Stage運作。如果還有Parent Stage未運作完成,則優先送出Parent Stage。通過調用方法

DAGScheduler.getMissingParentStages

方法擷取未執行的Parent Stage。

  如果目前Stage滿足上述兩個條件後,調用

DAGScheduler.submitMissingTasks

方法,送出目前Stage。

/** Submits stage, but first recursively submits any missing parents. */
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage) // 擷取目前送出Stage所屬的Job
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      // 首先判斷目前stage的狀态,如果目前Stage不是處于waiting, running以及failed狀态
      // 則送出該stage
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { 
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {    //如果所有的parent stage都以及完成,那麼就會送出該stage所包含的task
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)  //過程見下面的方法描述
        } else {    //否則遞歸的去送出未完成的parent stage
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage  //目前stage進入等待隊列
        }
      }
    } else {    //如果jobId沒被定義,即無效的stage則直接停止
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }
           

2.1 DAGScheduler#getMissingParentStage

  這個方法用于擷取stage未執行的Parent Stage。在上面方法中,擷取到Parent Stage後,遞歸調用上面那個方法按照StageId小的先送出的原則,這個方法的邏輯和DAGScheduler#getParentStages方法類似,這裡不再分析了。總之就是根據目前Stage,遞歸調用其中的visit方法,依次對每一個Stage追溯其未運作的Parent Stage。

  

3、DAGScheduler.submitMissingTasks

  當Stege的Parent Stage都運作完畢,才能調用這個方法真正的送出目前Stage中包含的Task。這個方法涉及到了Task,會在Spark Scheduler子產品源碼分析之TaskScheduler和SchedulerBackend中進一步分析。

  到這裡,本文主要分析了Scheduler子產品中DAGScheduler的作用,構成,以及Stage劃分和Stage最終的送出過程,仔細觀察這一部分的主要代碼中,在多處都會看到

listenerBus.post

方法的調用,針對不同的Stage事件,會将這個事件送出到LiveListenerBus中,将Stage事件相關過程進行記錄,并使得Spark其他部分能夠及時擷取到Stage的最新狀态。這一部分可以參考Spark-1.6.0之Application運作資訊記錄器JobProgressListener。

繼續閱讀