<b>3.7 建立和啟動dagscheduler</b>
dagscheduler主要用于在任務正式交給taskschedulerimpl送出之前做一些準備工作,包括:建立job,将dag中的rdd劃分到不同的stage,送出stage,等等。建立dag-scheduler的代碼如下。
@volatile private[spark] var dagscheduler:
dagscheduler = _
dagscheduler = new dagscheduler(this)
dagscheduler的資料結構主要維護jobid和stageid的關系、stage、activejob,以及緩存的rdd的partitions的位置資訊,見代碼清單3-32。
代碼清單3-32 dagscheduler維護的資料結構
private[scheduler] val nextjobid = new
atomicinteger(0)
private[scheduler] def numtotaljobs: int =
nextjobid.get()
private val nextstageid = new
private[scheduler] val jobidtostageids =
new hashmap[int, hashset[int]]
private[scheduler] val stageidtostage = new
hashmap[int, stage]
private[scheduler] val shuffletomapstage =
new hashmap[int, stage]
private[scheduler] val jobidtoactivejob =
new hashmap[int, activejob]
// stages we need to run whose parents aren't done
private[scheduler] val waitingstages = new hashset[stage]
// stages we are running right now
private[scheduler] val runningstages = new hashset[stage]
// stages that must be resubmitted due to fetch failures
private[scheduler] val failedstages = new hashset[stage]
private[scheduler] val activejobs = new hashset[activejob]
// contains the locations that each rdd's partitions are cached on
private val cachelocs = new hashmap[int, array[seq[tasklocation]]]
private val failedepoch = new hashmap[string, long]
private val dagscheduleractorsupervisor =
env.actorsystem.actorof(props(new dagscheduleractorsupervisor(this)))
private val closureserializer =
sparkenv.get.closureserializer.newinstance()
在構造dagscheduler的時候會調用initializeeventprocessactor方法建立dagscheduler-eventprocessactor,見代碼清單3-33。
代碼清單3-33 dagschedulereventprocessactor的初始化
private[scheduler] var eventprocessactor: actorref = _
private def initializeeventprocessactor() {
// blocking the thread until supervisor is started, which ensures
eventprocess-actor is
// not null before any job is submitted
implicit val timeout = timeout(30 seconds)
val initeventactorreply =
dagscheduleractorsupervisor ? props(new
dagschedulereventprocessactor(this))
eventprocessactor = await.result(initeventactorreply, timeout.duration).
asinstanceof[actorref]
}
initializeeventprocessactor()
這裡的dagscheduleractorsupervisor主要作為dagschedulereventprocessactor的監管者,負責生成dagschedulereventprocessactor。從代碼清單3-34可以看出,dagscheduler-actorsupervisor對于dagschedulereventprocessactor采用了akka的一對一監管政策。dag-scheduleractorsupervisor一旦生成dagschedulereventprocessactor,并注冊到actorsystem,actorsystem就會調用dagschedulereventprocessactor的prestart,taskscheduler于是就持有了dagscheduler,見代碼清單3-35。從代碼清單3-35我們還看到dag-schedulereventprocessactor所能處理的消息類型,比如jobsubmitted、beginevent、completionevent等。dagscheduler-eventprocessactor接受這些消息後會有不同的處理動作。在本章,讀者隻需要了解到這裡即可,後面章節用到時會詳細分析。
代碼清單3-34 dagscheduleractorsupervisor的監管政策
private[scheduler] class
dagscheduleractorsupervisor(dagscheduler: dagscheduler)
extends actor with logging {
override val supervisorstrategy =
oneforonestrategy() {
case x: exception =>
logerror("eventprocesseractor failed; shutting down
sparkcontext", x)
try {
dagscheduler.docancelalljobs()
} catch {
case t: throwable => logerror("dagscheduler
failed to cancel all jobs.", t)
}
dagscheduler.sc.stop()
stop
}
def receive = {
case p: props => sender ! context.actorof(p)
case _ => logwarning("received unknown message in
dagscheduleractorsupervisor")
代碼清單3-35 dagschedulereventprocessactor的實作
dagschedulereventprocessactor(dagscheduler: dags-cheduler)
override def prestart() {
dagscheduler.taskscheduler.setdagscheduler(dagscheduler)
/**
*
the main event loop of the dag scheduler.
*/
case jobsubmitted(jobid, rdd, func, partitions, allowlocal, callsite,
listener, properties) =>
dagscheduler.handlejobsubmitted(jobid, rdd, func, partitions,
allowlocal, callsite,
listener, properties)
case stagecancelled(stageid) =>
dagscheduler.handlestagecancellation(stageid)
case jobcancelled(jobid) =>
dagscheduler.handlejobcancellation(jobid)
case jobgroupcancelled(groupid) =>
dagscheduler.handlejobgroupcancelled(groupid)
case alljobscancelled =>
case executoradded(execid, host) =>
dagscheduler.handleexecutoradded(execid, host)
case executorlost(execid) =>
dagscheduler.handleexecutorlost(execid, fetchfailed = false)
case beginevent(task, taskinfo) =>
dagscheduler.handlebeginevent(task, taskinfo)
case gettingresultevent(taskinfo) =>
dagscheduler.handlegettaskresult(taskinfo)
case completion @ completionevent(task, reason, _, _, taskinfo,
taskmetrics) =>
dagscheduler.handletaskcompletion(completion)
case tasksetfailed(taskset, reason) =>
dagscheduler.handletasksetfailed(taskset, reason)
case resubmitfailedstages =>
dagscheduler.resubmitfailedstages()
override def poststop() {
// cancel any active jobs in poststop hook
dagscheduler.cleanupafterschedulerstop()