<b>3.6 建立任務排程器taskscheduler</b>
taskscheduler也是sparkcontext的重要組成部分,負責任務的送出,并且請求叢集管理器對任務排程。taskscheduler也可以看做任務排程的用戶端。建立taskscheduler的代碼如下。
private[spark] var (schedulerbackend,
taskscheduler) =
sparkcontext.createtaskscheduler(this, master)
createtaskscheduler方法會根據master的配置比對部署模式,建立taskschedulerimpl,并生成不同的schedulerbackend。本章為了使讀者更容易了解spark的初始化流程,故以local模式為例,其餘模式将在第7章詳解。master比對local模式的代碼如下。
master match {
case "local" =>
val scheduler = new taskschedulerimpl(sc, max_local_task_failures,
islocal = true)
val backend = new localbackend(scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
<b>3.6.1 建立taskschedulerimpl</b>
taskschedulerimpl的構造過程如下:
1)從sparkconf中讀取配置資訊,包括每個任務配置設定的cpu數、排程模式(排程模式有fair和fifo兩種,預設為fifo,可以修改屬性spark.scheduler.mode來改變)等。
2)建立taskresultgetter,它的作用是通過線程池(executors.newfixedthreadpool建立的,預設4個線程,線程名字以task-result-getter開頭,線程工廠預設是executors.default-threadfactory)對worker上的executor發送的task的執行結果進行處理。
taskschedulerimpl的實作見代碼清單3-29。
代碼清單3-29 taskschedulerimpl的實作
var dagscheduler: dagscheduler = null
var backend: schedulerbackend = null
val mapoutputtracker =
sparkenv.get.mapoutputtracker
var schedulablebuilder: schedulablebuilder
= null
var rootpool: pool = null
// default scheduler is fifo
private val schedulingmodeconf =
conf.get("spark.scheduler.mode", "fifo")
val schedulingmode: schedulingmode = try {
schedulingmode.withname(schedulingmodeconf.touppercase)
} catch {
case e: java.util.nosuchelementexception =>
throw new sparkexception(s"unrecognized spark.scheduler.mode:
$scheduling-modeconf")
}
// this is a var so that we can reset it
for testing purposes.
private[spark] var taskresultgetter = new
taskresultgetter(sc.env, this)
taskschedulerimpl的排程模式有fair和fifo兩種。任務的最終排程實際都是落實到接口schedulerbackend的具體實作上的。為友善分析,我們先來看看local模式中schedulerbackend的實作localbackend。localbackend依賴于localactor與actorsystem進行消息通信。localbackend的實作參見代碼清單3-30。
代碼清單3-30 localbackend的實作
private[spark] class
localbackend(scheduler: taskschedulerimpl, val totalcores: int)
extends schedulerbackend with executorbackend {
private val appid = "local-" + system.currenttimemillis
var localactor: actorref = null
override def start() {
localactor = sparkenv.get.actorsystem.actorof(
props(new localactor(scheduler, this, totalcores)),
"localbackendactor")
}
override def stop() {
localactor ! stopexecutor
override def reviveoffers() {
localactor ! reviveoffers
override def defaultparallelism() =
scheduler.conf.getint("spark.default.parallelism", totalcores)
override def killtask(taskid: long, executorid: string, interruptthread:
boolean) {
localactor ! killtask(taskid, interruptthread)
override def statusupdate(taskid: long, state: taskstate,
serializeddata: bytebuffer) {
localactor ! statusupdate(taskid, state, serializeddata)
override
def applicationid(): string = appid
<b>3.6.2 taskschedulerimpl的初始化</b>
建立完taskschedulerimpl和localbackend後,對taskschedulerimpl調用方法initialize進行初始化。以預設的fifo排程為例,taskschedulerimpl的初始化過程如下:
1)使taskschedulerimpl持有localbackend的引用。
2)建立pool,pool中緩存了排程隊列、排程算法及tasksetmanager集合等資訊。
3)建立fifoschedulablebuilder,fifoschedulablebuilder用來操作pool中的排程隊列。
initialize方法的實作見代碼清單3-31。
代碼清單3-31 taskschedulerimpl的初始化
def initialize(backend: schedulerbackend) {
this.backend = backend
rootpool = new pool("", schedulingmode, 0, 0)
schedulablebuilder = {
schedulingmode match {
case schedulingmode.fifo =>
new
fifoschedulablebuilder(rootpool)
case schedulingmode.fair =>
new fairschedulablebuilder(rootpool, conf)
schedulablebuilder.buildpools()