天天看點

Spark作業排程階段分析

Spark作為分布式的大資料處理架構必然或涉及到大量的作業排程,如果能夠了解Spark中的排程對我們編寫或優化Spark程式都是有很大幫助的;

  在Spark中存在轉換操作(Transformation Operation)與 行動操作(Action Operation)兩種;而轉換操作隻是會從一個RDD中生成另一個RDD且是lazy的,Spark中隻有行動操作(Action Operation)才會觸發作業的送出,進而引發作業排程;在一個計算任務中可能會多次調用 轉換操作這些操作生成的RDD可能存在着依賴關系,而由于轉換都是lazy是以當行動操作(Action Operation )觸發時才會有真正的RDD生成,這一系列的RDD中就存在着依賴關系形成一個DAG(Directed Acyclc Graph),在Spark中DAGScheuler是基于DAG的頂層排程子產品;

相關名詞

  Application:使用Spark編寫的應用程式,通常需要送出一個或多個作業;

  Job:在觸發RDD Action操作時産生的計算作業

  Task:一個分區資料集中最小處理單元也就是真正執行作業的地方

  TaskSet:由多個Task所組成沒有Shuffle依賴關系的任務集

  Stage:一個任務集對應的排程階段 ,每個Job會被拆分成諾幹個Stage

    

Spark作業排程階段分析

          1.1 作業排程關系圖

RDD Action作業送出流程

  這裡根據Spark源碼跟蹤觸發Action操作時觸發的Job送出流程,Count()是RDD中的一個Action操作是以調用Count時會觸發Job送出;

  在RDD源碼count()調用SparkContext的runJob,在runJob方法中根據partitions(分區)大小建立Arrays存放傳回結果;

RDD.scala

/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

SparkContext.scala

def runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  resultHandler: (Int, U) => Unit): Unit = {

  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
}
           

  在SparkContext中将調用DAGScheduler的runJob方法送出作業,DAGScheduler主要任務是計算作業與任務依賴關系,處理調用邏輯;DAGScheduler提供了submitJob與runJob方法用于 送出作業,runJob方法會一直等待作業完成,submitJob則傳回JobWaiter對象可以用于判斷作業執行結果;

  在runJob方法中将調用submitJob,在submitJob中把送出操作放入到事件循環隊列(DAGSchedulerEventProcessLoop)中;

def submitJob[T, U](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  callSite: CallSite,
  resultHandler: (Int, U) => Unit,
  properties: Properties): JobWaiter[U] = {
      ......  
      eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
      ......
  }  
           

  在事件循環隊列中将調用eventprocessLoop的onReceive方法;

Stage拆分

  送出作業時DAGScheduler會從RDD依賴鍊尾部開始,周遊整個依賴鍊劃分排程階段;劃分階段以ShuffleDependency為依據,當沒有ShuffleDependency時整個Job 隻會有一個Stage;在事件循環隊列中将會調用DAGScheduler的handleJobSubmitted方法,此方法會拆分Stage、送出Stage;

private[scheduler] def handleJobSubmitted(jobId: Int,
  finalRDD: RDD[_],
  func: (TaskContext, Iterator[_]) => _,
  partitions: Array[Int],
  callSite: CallSite,
  listener: JobListener,
  properties: Properties) {
var finalStage: ResultStage = null
......
  finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
......

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
......
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
  SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)

submitWaitingStages()
}
           

排程階段送出

  在送出Stage時會先調用getMissingParentStages擷取父階段Stage,疊代該階段所依賴的父排程階段如果存在則先送出該父階段的Stage 當不存在父Stage或父Stage執行完成時會對目前Stage進行送出;

private def submitStage(stage: Stage) {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      val missing = getMissingParentStages(stage).sortBy(_.id)
      if (missing.isEmpty) {
        submitMissingTasks(stage, jobId.get)
      } else {
        for (parent <- missing) {
          submitStage(parent)
        }
        waitingStages += stage
      }
    }
  }
  ......
}
           

參考資料:

http://spark.apache.org/docs/latest/

文章首發位址:Solinx

http://www.solinx.co/archives/579