天天看點

Spark源碼分析之Job觸發原理一 Job的執行流程二 Job觸發流程源碼

Spark源碼分析之Job觸發原理一 Job的執行流程二 Job觸發流程源碼

一 Job的執行流程

1.1 從資料源加載資料,資料源可以是本地資料檔案和HDFS檔案,也可以你是記憶體裡的資料結構或者HBase等,建立初始的RDD

1.2 對RDD進行一系列的transformation操作,每一個transformation可能産生一個或者多個RDD

1.3 對最後的final RDD進行action操作,觸發job操作,将最後每一個分區計算後得到結果

1.4 對每一個分區的結果傳回到Driver端,進行最後的計算。比如count實際上包含了action和sum兩個步驟的計算。RDD可以被cache到記憶體,也可以checkpoint到磁盤。

二 Job觸發流程源碼

2.1 調用action操作,運作job

我們以count這個action操作為例子,它首先會調用SparkContext的runJob方法

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

2.2runJob

在一個RDD的所有分區上運作job,并且傳回結果

def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = {
  // 判斷SparkContext是否停止或者關閉
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  // 清除閉包
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  // 委托給DAGScheduler的runJob方法,送出Job
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  // RDD執行checkpoint操作
  rdd.doCheckpoint()
}      

2.3submitJob

def submitJob[T, U](rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {
  // 判斷任務處理的分區是否存在
  val maxPartitions = rdd.partitions.length
  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
    throw new IllegalArgumentException(
      "Attempting to access a non-existent partition: " + p + ". " +
        "Total number of partitions: " + maxPartitions)
  }
  // 擷取jobId,如果作業隻包含0個任務,則立即傳回JobWaiter
  val jobId = nextJobId.getAndIncrement()
  if (partitions.size == 0) {
    return new JobWaiter[U](this, jobId, 0, resultHandler)
  }

  assert(partitions.size > 0)
  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
  // 建立JobWaiter對象
  val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
  // 建立JobSubmitted對象,放入隊列eventProcessLoop
  eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))
  waiter
}      

繼續閱讀