一.DAGScheduler
SparkContext在初始化時,建立了DAG排程和Task排程來負責RDD Action操作的排程執行。
DAGScheduler負責Spark的最進階别的任務排程,排程的粒度是Stage,它為每個Job的所有Stage計算一個有向無環圖,控制它們的并發,并找到一個最佳路徑來執行它們。具體的執行過程是将Stage下的Task任務集送出給TaskScheduler對象,由它來送出到叢集上去申請資源并最終完成執行。
DAGScheduler初始化時除了需要一個SparkContext對象外,最重要的是需要輸入一個TaskScheduler對象來負責Task的執行。源碼如下:

1.runJob過程
所有需要執行的RDD Action,都會調用SparkContext.runJob來送出任務,而SparkContext.runJob調用的是DAGScheduler.runJob。如下:
runJob調用submitJob送出任務,并等待任務結束。任務送出後的處理過程如下:
1.submitJob生成新的Job ID,發送消息JobSubmitted。
2.DAG收到JobSubmitted消息,調用handleJobSubmitted來處理。
3.handleJobSubmitted建立一個ResultStage,并使用submitStage來送出這個ResultStage。
上面的過程看起來沒執行完,實際上大的過程已經結束了,存在某種問題或陰謀在submitStage中。Spark的執行過程是懶加載的,這在這裡得到了完整的展現。任務送出時,不是按Job的先後順序送出的,而是倒序的。每個Job的最後一個操作是Action操作,DAG把這個最後的Action操作當作一個Stage,首先送出,然後逆向逐級遞歸填補缺少的上級Stage,進而生成一顆實作最後Action操作的最短的【都是必須的】有向無環圖,然後再從頭開始計算。submitStage方法的實作代碼如下:
可以看到,這是一個逆向遞歸的過程,先查找所有缺失的上級Stage并送出,待所有上級Stage都送出執行了,才輪到執行目前Stage對應的Task。查找上級Stage的過程,其實就是遞歸向上周遊所有RDD依賴清單并生成Stage的過程,代碼如下:
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ArrayStack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
waitingForVisit.push(stage.rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
missing.toList
}
周遊的過程是非遞歸的層序周遊【不是前序、中序或後序周遊】,使用了一個堆棧來協助周遊,而且保證了層序的順序與DAG中的依賴順序一緻。
2.Stage
值得注意的是,僅對依賴類型是ShuffleDependency的RDD操作建立Stage,其它的RDD操作并沒有建立Stage。RDD操作有兩類依賴:一類是窄依賴,一個RDD分區隻依賴上一個RDD的部分分區,而且這些分區都在相同的節點上;另外一類依賴是Shuffle依賴,一個RDD分區可能會依賴上一級RDD的全部分區,一個典型的例子是groupBy聚合操作。這兩類操作在計算上有明顯的差別,窄依賴都在同一個節點上進行計算,而Shuffle依賴垮越多個節點,甚至所有涉及的計算節點。是以,DAG在排程時,對于在相同節點上進行的Task計算,會合并為一個Stage。
二.TaskScheduler
相對DAGScheduler而言,TaskScheduler是低級别的排程接口,允許實作不同的Task排程器。目前,已經實作的Task排程器除了自帶的以外,還有YARN和Mesos排程器。每個TaskScheduler對象隻服務于一個SparkContext的Task排程。TaskScheduler從DAGScheduler的每個Stage接收一組Task,并負責将它們發送到叢集上,運作它們,如果出錯還會重試,最後傳回消息給DAGScheduler。
TaskScheduler的主要接口包括一個鈎子接口【也稱hook,表示定義好之後,不是使用者主動調用的】,被調用的時機是在初始化完成之後和排程啟動之前:
def postStartHook(){}
還有啟動和停止排程的指令:
def start(): Unit
def stop():Unit
此外,還有送出和撤銷Task集的指令:
def submitTasks(taskSet : TaskSet): Unit
def cancelTasks(stageId: Int, interruptThread: Boolean): Unit