天天看點

Spark排程管理【DAGScheduler,TaskScheduler】

一.DAGScheduler

  SparkContext在初始化時,建立了DAG排程和Task排程來負責RDD Action操作的排程執行。

  DAGScheduler負責Spark的最進階别的任務排程,排程的粒度是Stage,它為每個Job的所有Stage計算一個有向無環圖,控制它們的并發,并找到一個最佳路徑來執行它們。具體的執行過程是将Stage下的Task任務集送出給TaskScheduler對象,由它來送出到叢集上去申請資源并最終完成執行。

  DAGScheduler初始化時除了需要一個SparkContext對象外,最重要的是需要輸入一個TaskScheduler對象來負責Task的執行。源碼如下:

  

Spark排程管理【DAGScheduler,TaskScheduler】

  1.runJob過程

    所有需要執行的RDD Action,都會調用SparkContext.runJob來送出任務,而SparkContext.runJob調用的是DAGScheduler.runJob。如下:

    

Spark排程管理【DAGScheduler,TaskScheduler】

    runJob調用submitJob送出任務,并等待任務結束。任務送出後的處理過程如下:

    1.submitJob生成新的Job ID,發送消息JobSubmitted。

    2.DAG收到JobSubmitted消息,調用handleJobSubmitted來處理。

    3.handleJobSubmitted建立一個ResultStage,并使用submitStage來送出這個ResultStage。

    上面的過程看起來沒執行完,實際上大的過程已經結束了,存在某種問題或陰謀在submitStage中。Spark的執行過程是懶加載的,這在這裡得到了完整的展現。任務送出時,不是按Job的先後順序送出的,而是倒序的。每個Job的最後一個操作是Action操作,DAG把這個最後的Action操作當作一個Stage,首先送出,然後逆向逐級遞歸填補缺少的上級Stage,進而生成一顆實作最後Action操作的最短的【都是必須的】有向無環圖,然後再從頭開始計算。submitStage方法的實作代碼如下:

Spark排程管理【DAGScheduler,TaskScheduler】

    可以看到,這是一個逆向遞歸的過程,先查找所有缺失的上級Stage并送出,待所有上級Stage都送出執行了,才輪到執行目前Stage對應的Task。查找上級Stage的過程,其實就是遞歸向上周遊所有RDD依賴清單并生成Stage的過程,代碼如下:   

private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new ArrayStack[RDD[_]]
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
          for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_, _, _] =>
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    missing.toList
  }      

    周遊的過程是非遞歸的層序周遊【不是前序、中序或後序周遊】,使用了一個堆棧來協助周遊,而且保證了層序的順序與DAG中的依賴順序一緻。

  2.Stage

    值得注意的是,僅對依賴類型是ShuffleDependency的RDD操作建立Stage,其它的RDD操作并沒有建立Stage。RDD操作有兩類依賴:一類是窄依賴,一個RDD分區隻依賴上一個RDD的部分分區,而且這些分區都在相同的節點上;另外一類依賴是Shuffle依賴,一個RDD分區可能會依賴上一級RDD的全部分區,一個典型的例子是groupBy聚合操作。這兩類操作在計算上有明顯的差別,窄依賴都在同一個節點上進行計算,而Shuffle依賴垮越多個節點,甚至所有涉及的計算節點。是以,DAG在排程時,對于在相同節點上進行的Task計算,會合并為一個Stage。

二.TaskScheduler

  相對DAGScheduler而言,TaskScheduler是低級别的排程接口,允許實作不同的Task排程器。目前,已經實作的Task排程器除了自帶的以外,還有YARN和Mesos排程器。每個TaskScheduler對象隻服務于一個SparkContext的Task排程。TaskScheduler從DAGScheduler的每個Stage接收一組Task,并負責将它們發送到叢集上,運作它們,如果出錯還會重試,最後傳回消息給DAGScheduler。

  TaskScheduler的主要接口包括一個鈎子接口【也稱hook,表示定義好之後,不是使用者主動調用的】,被調用的時機是在初始化完成之後和排程啟動之前:

  def postStartHook(){}

  還有啟動和停止排程的指令:

  def start(): Unit

  def stop():Unit

  此外,還有送出和撤銷Task集的指令:

  def submitTasks(taskSet : TaskSet): Unit

  def cancelTasks(stageId: Int, interruptThread: Boolean): Unit

繼續閱讀