Spark作為分布式的大資料處理架構必然或涉及到大量的作業排程,如果能夠了解Spark中的排程對我們編寫或優化Spark程式都是有很大幫助的;
在Spark中存在轉換操作(Transformation Operation)與 行動操作(Action Operation)兩種;而轉換操作隻是會從一個RDD中生成另一個RDD且是lazy的,Spark中隻有行動操作(Action Operation)才會觸發作業的送出,進而引發作業排程;在一個計算任務中可能會多次調用 轉換操作這些操作生成的RDD可能存在着依賴關系,而由于轉換都是lazy是以當行動操作(Action Operation )觸發時才會有真正的RDD生成,這一系列的RDD中就存在着依賴關系形成一個DAG(Directed Acyclc Graph),在Spark中DAGScheuler是基于DAG的頂層排程子產品;
相關名詞
Application:使用Spark編寫的應用程式,通常需要送出一個或多個作業;
Job:在觸發RDD Action操作時産生的計算作業
Task:一個分區資料集中最小處理單元也就是真正執行作業的地方
TaskSet:由多個Task所組成沒有Shuffle依賴關系的任務集
Stage:一個任務集對應的排程階段 ,每個Job會被拆分成諾幹個Stage

1.1 作業排程關系圖
RDD Action作業送出流程
這裡根據Spark源碼跟蹤觸發Action操作時觸發的Job送出流程,Count()是RDD中的一個Action操作是以調用Count時會觸發Job送出;
在RDD源碼count()調用SparkContext的runJob,在runJob方法中根據partitions(分區)大小建立Arrays存放傳回結果;
RDD.scala
/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
SparkContext.scala
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
}
在SparkContext中将調用DAGScheduler的runJob方法送出作業,DAGScheduler主要任務是計算作業與任務依賴關系,處理調用邏輯;DAGScheduler提供了submitJob與runJob方法用于 送出作業,runJob方法會一直等待作業完成,submitJob則傳回JobWaiter對象可以用于判斷作業執行結果;
在runJob方法中将調用submitJob,在submitJob中把送出操作放入到事件循環隊列(DAGSchedulerEventProcessLoop)中;
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
......
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
......
}
在事件循環隊列中将調用eventprocessLoop的onReceive方法;
Stage拆分
送出作業時DAGScheduler會從RDD依賴鍊尾部開始,周遊整個依賴鍊劃分排程階段;劃分階段以ShuffleDependency為依據,當沒有ShuffleDependency時整個Job 隻會有一個Stage;在事件循環隊列中将會調用DAGScheduler的handleJobSubmitted方法,此方法會拆分Stage、送出Stage;
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
......
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
......
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
......
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
submitWaitingStages()
}
排程階段送出
在送出Stage時會先調用getMissingParentStages擷取父階段Stage,疊代該階段所依賴的父排程階段如果存在則先送出該父階段的Stage 當不存在父Stage或父Stage執行完成時會對目前Stage進行送出;
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
if (missing.isEmpty) {
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
}
......
}
參考資料:
http://spark.apache.org/docs/latest/
文章首發位址:Solinx
http://www.solinx.co/archives/579