天天看點

深入了解Spark:核心思想與源碼分析. 3.6 建立任務排程器TaskScheduler

<b>3.6 建立任務排程器taskscheduler</b>

taskscheduler也是sparkcontext的重要組成部分,負責任務的送出,并且請求叢集管理器對任務排程。taskscheduler也可以看做任務排程的用戶端。建立taskscheduler的代碼如下。

private[spark] var (schedulerbackend,

taskscheduler) =

sparkcontext.createtaskscheduler(this, master)

createtaskscheduler方法會根據master的配置比對部署模式,建立taskschedulerimpl,并生成不同的schedulerbackend。本章為了使讀者更容易了解spark的初始化流程,故以local模式為例,其餘模式将在第7章詳解。master比對local模式的代碼如下。

master match {

case "local" =&gt;

val scheduler = new taskschedulerimpl(sc, max_local_task_failures,

islocal = true)

val backend = new localbackend(scheduler, 1)

scheduler.initialize(backend)

(backend, scheduler)

<b>3.6.1 建立taskschedulerimpl</b>

taskschedulerimpl的構造過程如下:

1)從sparkconf中讀取配置資訊,包括每個任務配置設定的cpu數、排程模式(排程模式有fair和fifo兩種,預設為fifo,可以修改屬性spark.scheduler.mode來改變)等。

2)建立taskresultgetter,它的作用是通過線程池(executors.newfixedthreadpool建立的,預設4個線程,線程名字以task-result-getter開頭,線程工廠預設是executors.default-threadfactory)對worker上的executor發送的task的執行結果進行處理。

taskschedulerimpl的實作見代碼清單3-29。

代碼清單3-29 taskschedulerimpl的實作

var dagscheduler: dagscheduler = null

var backend: schedulerbackend = null

val mapoutputtracker =

sparkenv.get.mapoutputtracker

var schedulablebuilder: schedulablebuilder

= null

var rootpool: pool = null

// default scheduler is fifo

private val schedulingmodeconf =

conf.get("spark.scheduler.mode", "fifo")

val schedulingmode: schedulingmode = try {

schedulingmode.withname(schedulingmodeconf.touppercase)

} catch {

case e: java.util.nosuchelementexception =&gt;

throw new sparkexception(s"unrecognized spark.scheduler.mode:

$scheduling-modeconf")

}

// this is a var so that we can reset it

for testing purposes.

private[spark] var taskresultgetter = new

taskresultgetter(sc.env, this)

taskschedulerimpl的排程模式有fair和fifo兩種。任務的最終排程實際都是落實到接口schedulerbackend的具體實作上的。為友善分析,我們先來看看local模式中schedulerbackend的實作localbackend。localbackend依賴于localactor與actorsystem進行消息通信。localbackend的實作參見代碼清單3-30。

代碼清單3-30 localbackend的實作

private[spark] class

localbackend(scheduler: taskschedulerimpl, val totalcores: int)

extends schedulerbackend with executorbackend {

private val appid = "local-" + system.currenttimemillis

var localactor: actorref = null

override def start() {

localactor = sparkenv.get.actorsystem.actorof(

props(new localactor(scheduler, this, totalcores)),

"localbackendactor")

    }

override def stop() {

localactor ! stopexecutor

override def reviveoffers() {

localactor ! reviveoffers

override def defaultparallelism() =

scheduler.conf.getint("spark.default.parallelism", totalcores)

override def killtask(taskid: long, executorid: string, interruptthread:

boolean) {

localactor ! killtask(taskid, interruptthread)

override def statusupdate(taskid: long, state: taskstate,

serializeddata: bytebuffer) {

localactor ! statusupdate(taskid, state, serializeddata)

    override

def applicationid(): string = appid

<b>3.6.2 taskschedulerimpl的初始化</b>

建立完taskschedulerimpl和localbackend後,對taskschedulerimpl調用方法initialize進行初始化。以預設的fifo排程為例,taskschedulerimpl的初始化過程如下:

1)使taskschedulerimpl持有localbackend的引用。

2)建立pool,pool中緩存了排程隊列、排程算法及tasksetmanager集合等資訊。

3)建立fifoschedulablebuilder,fifoschedulablebuilder用來操作pool中的排程隊列。

initialize方法的實作見代碼清單3-31。

代碼清單3-31 taskschedulerimpl的初始化

def initialize(backend: schedulerbackend) {

this.backend = backend

rootpool = new pool("", schedulingmode, 0, 0)

schedulablebuilder = {

schedulingmode match {

case schedulingmode.fifo =&gt;

                new

fifoschedulablebuilder(rootpool)

case schedulingmode.fair =&gt;

                new fairschedulablebuilder(rootpool, conf)

schedulablebuilder.buildpools()

繼續閱讀