天天看點

Spark1.6-----源碼解讀之TaskScheduler

 TaskScheduler是SparkContext重要成員之一,負責任務的送出,并且請求叢集管理器對任務排程。他也可以看做任務排程的用戶端。

SparkContext 522行 建立TaskScheduler:

val (sched, ts) = SparkContext.createTaskScheduler(this, master)
           

SparkContext 2592行 為createTaskScheduler具體實作方法:

private def createTaskScheduler(
      sc: SparkContext,
      master: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._

    // When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)
           

它會根據不同的master  産生不同的行為本文以Local為例子。它會建立TaskSchedulerImpl  并且建立LocalBackend:

構造代碼TaskSchedulerImpl 102行:

var dagScheduler: DAGScheduler = null

  var backend: SchedulerBackend = null

  val mapOutputTracker = SparkEnv.get.mapOutputTracker

  var schedulableBuilder: SchedulableBuilder = null
  var rootPool: Pool = null
  // default scheduler is FIFO
  private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
  val schedulingMode: SchedulingMode = try {
    SchedulingMode.withName(schedulingModeConf.toUpperCase)
  } catch {
    case e: java.util.NoSuchElementException =>
      throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf")
  }

  // This is a var so that we can reset it for testing purposes.
  private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
           

解析:(1)擷取配置資訊比如排程模式(FIFO,FAIR)

            (2)建立TaskResultGetter 作用是通過線程池對Worker上的Executor發送Task的執行結果進行處理。

TaskScheduleImpl的排程方式有兩種,但任務的最終排程都會落到ScheduleBackend的具體實作。

SparkContext 2603行 建立LoaclBackend:

val backend = new LocalBackend(sc.getConf, scheduler, 1)
           

LoaclBackend比較注意的方法 123行 :

override def start() {
    val rpcEnv = SparkEnv.get.rpcEnv
    val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)
    localEndpoint = rpcEnv.setupEndpoint("LocalBackendEndpoint", executorEndpoint)
    listenerBus.post(SparkListenerExecutorAdded(
      System.currentTimeMillis,
      executorEndpoint.localExecutorId,
      new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty)))
    launcherBackend.setAppId(appId)
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }
           

解析:它會建立LocalEndpoint,可以看出LoaclBackend會同過LoaclEndpoint來進行消息的通信。

TaskSchedulerImpl和LoaclBackEnd建立好了便進行初始化。

SparkContext 2616行 調用初始化方法:

scheduler.initialize(backend)
           

調用TaskSchedulerImpl 126行:

def initialize(backend: SchedulerBackend) {
    //獲得LoaclBackend引用
    this.backend = backend
    // temporarily set rootPool name to empty建立緩存隊列
    rootPool = new Pool("", schedulingMode, 0, 0)
    //建立不同的排程政策來操作隊列
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
      }
    }
    schedulableBuilder.buildPools()
  }
           

TaskScheduler建立完畢。

繼續閱讀