天天看點

深入了解Spark:核心思想與源碼分析. 3.7 建立和啟動DAGScheduler

<b>3.7 建立和啟動dagscheduler</b>

dagscheduler主要用于在任務正式交給taskschedulerimpl送出之前做一些準備工作,包括:建立job,将dag中的rdd劃分到不同的stage,送出stage,等等。建立dag-scheduler的代碼如下。

@volatile private[spark] var dagscheduler:

dagscheduler = _

dagscheduler = new dagscheduler(this)

dagscheduler的資料結構主要維護jobid和stageid的關系、stage、activejob,以及緩存的rdd的partitions的位置資訊,見代碼清單3-32。

代碼清單3-32 dagscheduler維護的資料結構

private[scheduler] val nextjobid = new

atomicinteger(0)

private[scheduler] def numtotaljobs: int =

nextjobid.get()

private val nextstageid = new

private[scheduler] val jobidtostageids =

new hashmap[int, hashset[int]]

private[scheduler] val stageidtostage = new

hashmap[int, stage]

private[scheduler] val shuffletomapstage =

new hashmap[int, stage]

private[scheduler] val jobidtoactivejob =

new hashmap[int, activejob]

// stages we need to run whose parents aren't done

private[scheduler] val waitingstages = new hashset[stage]

// stages we are running right now

private[scheduler] val runningstages = new hashset[stage]

// stages that must be resubmitted due to fetch failures

private[scheduler] val failedstages = new hashset[stage]

private[scheduler] val activejobs = new hashset[activejob]

// contains the locations that each rdd's partitions are cached on

private val cachelocs = new hashmap[int, array[seq[tasklocation]]]

private val failedepoch = new hashmap[string, long]

private val dagscheduleractorsupervisor =

env.actorsystem.actorof(props(new dagscheduleractorsupervisor(this)))

private val closureserializer =

sparkenv.get.closureserializer.newinstance()

在構造dagscheduler的時候會調用initializeeventprocessactor方法建立dagscheduler-eventprocessactor,見代碼清單3-33。

代碼清單3-33 dagschedulereventprocessactor的初始化

private[scheduler] var eventprocessactor: actorref = _

private def initializeeventprocessactor() {

// blocking the thread until supervisor is started, which ensures

eventprocess-actor is

// not null before any job is submitted

implicit val timeout = timeout(30 seconds)

val initeventactorreply =

dagscheduleractorsupervisor ? props(new

dagschedulereventprocessactor(this))

eventprocessactor = await.result(initeventactorreply, timeout.duration).

asinstanceof[actorref]

}

initializeeventprocessactor()

這裡的dagscheduleractorsupervisor主要作為dagschedulereventprocessactor的監管者,負責生成dagschedulereventprocessactor。從代碼清單3-34可以看出,dagscheduler-actorsupervisor對于dagschedulereventprocessactor采用了akka的一對一監管政策。dag-scheduleractorsupervisor一旦生成dagschedulereventprocessactor,并注冊到actorsystem,actorsystem就會調用dagschedulereventprocessactor的prestart,taskscheduler于是就持有了dagscheduler,見代碼清單3-35。從代碼清單3-35我們還看到dag-schedulereventprocessactor所能處理的消息類型,比如jobsubmitted、beginevent、completionevent等。dagscheduler-eventprocessactor接受這些消息後會有不同的處理動作。在本章,讀者隻需要了解到這裡即可,後面章節用到時會詳細分析。

代碼清單3-34 dagscheduleractorsupervisor的監管政策

private[scheduler] class

dagscheduleractorsupervisor(dagscheduler: dagscheduler)

extends actor with logging {

override val supervisorstrategy =

oneforonestrategy() {

            case x: exception =&gt;

logerror("eventprocesseractor failed; shutting down

sparkcontext", x)

                try {

dagscheduler.docancelalljobs()

                } catch {

                    case t: throwable =&gt; logerror("dagscheduler

failed to cancel all jobs.", t)

                }

                dagscheduler.sc.stop()

                stop

    }

def receive = {

case p: props =&gt; sender ! context.actorof(p)

case _ =&gt; logwarning("received unknown message in

dagscheduleractorsupervisor")

代碼清單3-35 dagschedulereventprocessactor的實作

dagschedulereventprocessactor(dagscheduler: dags-cheduler)

override def prestart() {

dagscheduler.taskscheduler.setdagscheduler(dagscheduler)

/**

    *

the main event loop of the dag scheduler.

*/

case jobsubmitted(jobid, rdd, func, partitions, allowlocal, callsite,

listener, properties) =&gt;

dagscheduler.handlejobsubmitted(jobid, rdd, func, partitions,

allowlocal, callsite,

                listener, properties)

case stagecancelled(stageid) =&gt;

dagscheduler.handlestagecancellation(stageid)

case jobcancelled(jobid) =&gt;

dagscheduler.handlejobcancellation(jobid)

case jobgroupcancelled(groupid) =&gt;

dagscheduler.handlejobgroupcancelled(groupid)

case alljobscancelled =&gt;

case executoradded(execid, host) =&gt;

dagscheduler.handleexecutoradded(execid, host)

case executorlost(execid) =&gt;

dagscheduler.handleexecutorlost(execid, fetchfailed = false)

case beginevent(task, taskinfo) =&gt;

dagscheduler.handlebeginevent(task, taskinfo)

case gettingresultevent(taskinfo) =&gt;

dagscheduler.handlegettaskresult(taskinfo)

case completion @ completionevent(task, reason, _, _, taskinfo,

taskmetrics) =&gt;

dagscheduler.handletaskcompletion(completion)

case tasksetfailed(taskset, reason) =&gt;

dagscheduler.handletasksetfailed(taskset, reason)

case resubmitfailedstages =&gt;

dagscheduler.resubmitfailedstages()

override def poststop() {

// cancel any active jobs in poststop hook

dagscheduler.cleanupafterschedulerstop()

繼續閱讀