天天看点

spark的task调度器(一)SPARK的调度器

SPARK的调度器

spark的调度器在SparkContext初始化时,通过生成TaskSchedulerImpl实例时根据配置生成,可配置的调度器包含FIFO/FAIR两种调度器.

如果是FAIR调度器时,这个调度器的子调度器可以是FAIR或者FIFO,

具体采用的是先进先出调度还是公平调度通过进行spark.scheduler.mode配置,默认是先进先出调用.

在TaskSchedulerImpl实例生成时,

private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")

val schedulingMode: SchedulingMode = try {

  SchedulingMode.withName(schedulingModeConf.toUpperCase)

} catch {

  case e: java.util.NoSuchElementException =>

    throw new SparkException(s"Unrecognized spark.scheduler.mode: 

       $schedulingModeConf")

}

def initialize(backend: SchedulerBackend) {

  this.backend = backend

初始的rootPool的minShare与weight都为0,如果是FIFO时,rootPool就是调度器使用的队列本身,如果是FAIR时,rootPool中一定包含有子的Pool实例.

  // temporarily set rootPool name to empty

  rootPool = new Pool("", schedulingMode, 0, 0)

  schedulableBuilder = {

    schedulingMode match {

      case SchedulingMode.FIFO =>

        new FIFOSchedulableBuilder(rootPool)

      case SchedulingMode.FAIR =>

        new FairSchedulableBuilder(rootPool, conf)

    }

  }

  schedulableBuilder.buildPools()

}

在对一个taskSet(也就是一个stage执行submit时),通过调用TaskSchedulerImpl中的submitTasks时,会调用对应的调用器的addTaskSetManager函数.

上面生成的Pool实例中,针对不同的调度器,生成的调度算法:

var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {

  schedulingMode match {

    case SchedulingMode.FAIR =>

      new FairSchedulingAlgorithm()

    case SchedulingMode.FIFO =>

      new FIFOSchedulingAlgorithm()

  }

}

后面的文章中会根据fifo与fair两种不同的调度算法进行分析.