天天看点

深入理解Spark:核心思想与源码分析. 3.8 TaskScheduler的启动

<b>3.8 taskscheduler的启动</b>

3.6节介绍了任务调度器taskscheduler的创建,要想taskscheduler发挥作用,必须要启动它,代码如下。

taskscheduler.start()

taskscheduler在启动的时候,实际调用了backend的start方法。

override def start() {

backend.start()

    }

以localbackend为例,启动localbackend时向actorsystem注册了localactor,见代码清单3-30所示。

3.8.1 创建localactor

创建localactor的过程主要是构建本地的executor,见代码清单3-36。

代码清单3-36 localactor的实现

private[spark] class localactor(scheduler:

taskschedulerimpl, executorbackend: localbackend,

private val totalcores: int) extends actor with actorlogreceive with

logging {

import context.dispatcher   // to

use akka's scheduler.scheduleonce()

private var freecores = totalcores

    private

val localexecutorid = sparkcontext.driver_identifier

private val localexecutorhostname = "localhost"

val executor = new executor(

localexecutorid, localexecutorhostname, scheduler.conf.getall,

totalcores, islocal = true)

override def receivewithlogging = {

case reviveoffers =&gt;

reviveoffers()

case statusupdate(taskid, state, serializeddata) =&gt;

scheduler.statusupdate(taskid, state, serializeddata)

if (taskstate.isfinished(state)) {

                freecores +=

scheduler.cpus_per_task

                reviveoffers()

}

case killtask(taskid, interruptthread) =&gt;

executor.killtask(taskid, interruptthread)

case stopexecutor =&gt;

    executor.stop()

executor的构建,见代码清单3-37,主要包括以下步骤。

1)创建并注册executorsource。executorsource是做什么的呢?笔者将在3.8.2节详细介绍。

2)获取sparkenv。如果是非local模式,worker上的coarsegrainedexecutorbackend向driver上的coarsegrainedexecutorbackend注册executor时,则需要新建sparkenv。可以修改属性spark.executor.port(默认为0,表示随机生成)来配置executor中的actorsystem的端口号。

3)创建并注册executoractor。executoractor负责接受发送给executor的消息。

4)urlclassloader的创建。为什么需要创建这个classloader?在非local模式中,driver或者worker上都会有多个executor,每个executor都设置自身的urlclassloader,用于加载任务上传的jar包中的类,有效对任务的类加载环境进行隔离。

5)创建executor执行task的线程池。此线程池用于执行任务。

6)启动executor的心跳线程。此线程用于向driver发送心跳。

此外,还包括akka发送消息的帧大小(10 485 760字节)、结果总大小的字节限制(1 073 741 824字节)、正在运行的task的列表、设置serializer的默认classloader为创建的classloader等。

代码清单3-37 executor的构建

val executorsource = new executorsource(this, executorid)

private val env = {

if (!islocal) {

val port = conf.getint("spark.executor.port", 0)

val _env = sparkenv.createexecutorenv(

                conf, executorid,

executorhostname, port, numcores, islocal, actorsystem)

sparkenv.set(_env)

_env.metricssystem.registersource(executorsource)

_env.blockmanager.initialize(conf.getappid)

_env

} else {

sparkenv.get

private val executoractor = env.actorsystem.actorof(

props(new executoractor(executorid)), "executoractor")

private val urlclassloader = createclassloader()

private val replclassloader = addreplclassloaderifneeded(urlclassloader)

env.serializer.setdefaultclassloader(urlclassloader)

private val akkaframesize = akkautils.maxframesizebytes(conf)

private val maxresultsize = utils.getmaxresultsize(conf)

val threadpool = utils.newdaemoncachedthreadpool("executor task

launch worker")

private val runningtasks = new concurrenthashmap[long, taskrunner]

startdriverheartbeater()

3.8.2 executorsource的创建与注册

executorsource用于测量系统。通过metricregistry的register方法注册计量,这些计量信息包括threadpool.activetasks、threadpool.completetasks、threadpool.currentpool_size、thread-pool.maxpool_size、filesystem.hdfs.write_bytes、filesystem.hdfs.read_ops、filesystem.file.write_bytes、filesystem.hdfs.largeread_ops、filesystem.hdfs.write_ops等,executorsource的实现见代码清单3-38。metric接口的具体实现,参考附录d。

代码清单3-38 executorsource的实现

private[spark] class executorsource(val

executor: executor, executorid: string) extends source {

private def filestats(scheme: string) : option[filesystem.statistics] =

filesystem.getallstatistics().filter(s =&gt;

s.getscheme.equals(scheme)).headoption

private def registerfilesystemstat[t](

scheme: string, name: string, f: filesystem.statistics =&gt; t,

defaultvalue: t) = {

metricregistry.register(metricregistry.name("filesystem",

scheme, name), new gauge[t] {

override def getvalue: t = filestats(scheme).map(f).getorelse

(defaultvalue)

})

override val metricregistry = new metricregistry()

override val sourcename = "executor"

metricregistry.register(metricregistry.name("threadpool",

"activetasks"), new gauge[int] {

override def getvalue: int = executor.threadpool.getactivecount()

"completetasks"), new gauge[long] {

override def getvalue: long =

executor.threadpool.getcompletedtaskcount()

"currentpool_size"), new gauge[int] {

override def getvalue: int = executor.threadpool.getpoolsize()

"maxpool_size"), new gauge[int] {

override def getvalue: int = executor.threadpool.getmaximumpoolsize()

// gauge for file system stats of this executor

for (scheme &lt;- array("hdfs", "file")) {

registerfilesystemstat(scheme, "read_bytes", _.getbytesread(),

0l)

registerfilesystemstat(scheme, "write_bytes",

_.getbyteswritten(), 0l)

registerfilesystemstat(scheme, "read_ops", _.getreadops(), 0)

registerfilesystemstat(scheme, "largeread_ops",

_.getlargereadops(), 0)

registerfilesystemstat(scheme, "write_ops", _.getwriteops(),

0)

创建完executorsource后,调用metricssystem的registersource方法将executorsource注册到metricssystem。registersource方法使用metricregistry的register方法,将source注册到metricregistry,见代码清单3-39。关于metricregistry,具体参阅附录d。

代码清单3-39 metricssystem注册source的实现

def registersource(source: source) {

sources += source

    try {

val regname = buildregistryname(source)

registry.register(regname, source.metricregistry)

catch {

case e: illegalargumentexception =&gt; loginfo("metrics already

registered", e)

3.8.3 executoractor的构建与注册

executoractor很简单,当接收到sparkui发来的消息时,将所有线程的栈信息发送回去,代码实现如下。

case triggerthreaddump =&gt;

sender ! utils.getthreaddump()

3.8.4 spark自身classloader的创建

获取要创建的classloader的父加载器currentloader,然后根据currentjars生成url数组,spark.files.userclasspathfirst属性指定加载类时是否先从用户的classpath下加载,最后创建executorurlclassloader或者childexecutorurlclassloader,见代码清单3-40。

代码清单3-40 spark自身classloader的创建

private def createclassloader():

mutableurlclassloader = {

val currentloader = utils.getcontextorsparkclassloader

val urls = currentjars.keyset.map { uri =&gt;

new file(uri.split("/").last).touri.tourl

}.toarray

val userclasspathfirst =

conf.getboolean("spark.files.userclasspathfirst", false)

userclasspathfirst match {

case true =&gt; new childexecutorurlclassloader(urls, currentloader)

case false =&gt; new executorurlclassloader(urls, currentloader)

utils.getcontextorsparkclassloader的实现见附录a。executorurlclassloader或者child-executorurlclassloader实际上都继承了urlclassloader,见代码清单3-41。

代码清单3-41 childexecutorurlclassloader和executorlirlclassloader的实现

private[spark] class

childexecutorurlclassloader(urls: array[url], parent: classloader)

extends mutableurlclassloader {

private object userclassloader extends urlclassloader(urls, null){

override def addurl(url: url) {

super.addurl(url)

override def findclass(name: string): class[_] = {

super.findclass(name)

private val parentclassloader = new

parentclassloader(parent)

override def findclass(name: string):

class[_] = {

try {

userclassloader.findclass(name)

case e: classnotfoundexception =&gt; {

parentclassloader.loadclass(name)

def addurl(url: url) {

userclassloader.addurl(url)

def geturls() = {

userclassloader.geturls()

executorurlclassloader(urls: array[url], parent: classloader)

extends urlclassloader(urls, parent) with mutableurlclassloader {

如果需要repl交互,还会调用addreplclassloaderifneeded创建replclassloader,见代码清单3-42。

代码清单3-42 addreplclassloaderifneeded的实现

private def

addreplclassloaderifneeded(parent: classloader): classloader = {

val classuri = conf.get("spark.repl.class.uri", null)

if (classuri != null) {

loginfo("using repl class uri: " + classuri)

val userclasspathfirst: java.lang.boolean =

val klass =

class.forname("org.apache.spark.repl.executorclassloader")

.asinstanceof[class[_ &lt;: classloader]]

val constructor = klass.getconstructor(classof[sparkconf],

classof[string],

classof[classloader], classof[boolean])

constructor.newinstance(conf, classuri, parent, userclasspathfirst)

case _: classnotfoundexception =&gt;

logerror("could not find org.apache.spark.repl.executorclassloader

on classpath!")

system.exit(1)

null

else {

parent

3.8.5 启动executor的心跳线程

executor的心跳由startdriverheartbeater启动,见代码清单3-43。executor心跳线程的间隔由属性spark.executor.heartbeatinterval配置,默认是10 000毫秒。此外,超时时间是30秒,超时重试次数是3次,重试间隔是3000毫秒,使用actorsystem.actorselection

(url)方法查找到匹配的actor引用, url是akka.tcp://sparkdriver@

$driverhost:$driverport/user/heartbeat-receiver,最终创建一个运行过程中,每次会休眠10

000~20 000毫秒的线程。此线程从runningtasks获取最新的有关task的测量信息,将其与executorid、blockmanagerid封装为heartbeat消息,向heartbeatreceiver发送heartbeat消息。

代码清单3-43 启动executor的心跳线程

def startdriverheartbeater() {

val interval = conf.getint("spark.executor.heartbeatinterval",

10000)

val timeout = akkautils.lookuptimeout(conf)

    val

retryattempts = akkautils.numretries(conf)

val retryintervalms = akkautils.retrywaitms(conf)

val heartbeatreceiverref =

akkautils.makedriverref("heartbeatreceiver", conf,env.actorsystem)

val t = new thread() {

override def run() {

// sleep a random interval so the heartbeats don't end up in sync

thread.sleep(interval + (math.random * interval).asinstanceof[int])

while (!isstopped) {

                val tasksmetrics = new

arraybuffer[(long, taskmetrics)]()

                val curgctime = gctime

                for (taskrunner &lt;-

runningtasks.values()) {

                    if

(!taskrunner.attemptedtask.isempty) {

option(taskrunner.task).flatmap(_.metrics).foreach { metrics =&gt;

metrics.updateshufflereadmetrics

                            metrics.jvmgctime =

curgctime - taskrunner.startgctime

                            if (islocal) {

                                val

copiedmetrics = utils.deserialize[taskmetrics](utils.serialize(metrics))

                                tasksmetrics +=

((taskrunner.taskid, copiedmetrics))

                        } else {

                            // it will be

copied by serialization

                            tasksmetrics +=

((taskrunner.taskid, metrics))

                        }

                    }

                }

val message = heartbeat(executorid, tasksmetrics.toarray,

env.blockmanager.blockmanagerid)

     val response =

akkautils.askwithreply[heartbeatresponse](message, heartbeatreceiverref,

                    retryattempts,

retryintervalms, timeout)

                if

(response.reregisterblockmanager) {

                    logwarning("told to

re-register on heartbeat")

env.blockmanager.reregister()

} catch {

                case nonfatal(t) =&gt;

logwarning("issue communicating with driver in heartbeater", t)

thread.sleep(interval)

t.setdaemon(true)

t.setname("driver heartbeater")

t.start()

这个心跳线程的作用是什么呢?其作用有两个:

更新正在处理的任务的测量信息;

通知blockmanagermaster,此executor上的blockmanager依然活着。

下面对心跳线程的实现详细分析下,读者可以自行选择是否需要阅读。

初始化taskschedulerimpl后会创建心跳接收器heartbeatreceiver。heartbeatreceiver接收所有分配给当前driver application的executor的心跳,并将task、task计量信息、心跳等交给taskschedulerimpl和dagscheduler作进一步处理。创建心跳接收器的代码如下。

private val heartbeatreceiver =

env.actorsystem.actorof(

props(new heartbeatreceiver(taskscheduler)), "heartbeatreceiver")

heartbeatreceiver在收到心跳消息后,会调用taskscheduler的executorheartbeatreceived方法,代码如下。

case heartbeat(executorid, taskmetrics, blockmanagerid) =&gt;

val response = heartbeatresponse(

!scheduler.executorheartbeatreceived(executorid, taskmetrics,

blockmanagerid))

sender ! response

  }

executorheartbeatreceived的实现代码如下。

val metricswithstageids: array[(long, int,

int, taskmetrics)] = synchronized {

taskmetrics.flatmap { case (id, metrics) =&gt;

taskidtotasksetid.get(id)

.flatmap(activetasksets.get)

.map(tasksetmgr =&gt; (id, tasksetmgr.stageid,

tasksetmgr.taskset.attempt, metrics))

dagscheduler.executorheartbeatreceived(execid,

metricswithstageids, blockmanagerid)

这段程序通过遍历taskmetrics,依据taskidtotasksetid和activetasksets找到taskset-manager。然后将taskid、tasksetmanager.stageid、tasksetmanager .taskset.attempt、taskmetrics封装到类型为array[(long,

int, int, taskmetrics)]的数组metricswithstageids中。最后调用了dag-scheduler的executorheartbeatreceived方法,其实现如下。

listenerbus.post(sparklistenerexecutormetricsupdate(execid,

taskmetrics))

implicit val timeout = timeout(600 seconds)

await.result(

blockmanagermaster.driveractor ? blockmanagerheartbeat(blockmanagerid),

timeout.duration).asinstanceof[boolean]

dagscheduler将executorid、metricswithstageids封装为sparklistenerexecutormetricsupdate事件,并post到listenerbus中,此事件用于更新stage的各种测量数据。最后给blockmanagermaster持有的blockmanagermasteractor发送blockmanagerheartbeat消息。blockmanagermasteractor在收到消息后会匹配执行heartbeatreceived方法(参见4.3.1节)。heartbeatreceived最终更新blockmanagermaster对blockmanger的最后可见时间(即更新block-managerid对应的blockmanagerinfo的_lastseenms,见代码清单3-44)。

代码清单3-44 blockmanagermasteractor的心跳处理

heartbeatreceived(blockmanagerid: blockmanagerid): boolean = {

if (!blockmanagerinfo.contains(blockmanagerid)) {

blockmanagerid.isdriver &amp;&amp; !islocal

blockmanagerinfo(blockmanagerid).updatelastseenms()

true

local模式下executor的心跳通信过程,可以用图3-3来表示。

在非local模式中,executor发送心跳的过程是一样的,主要的区别是executor进程与driver不在同一个进程,甚至不在同一个节点上。

接下来会初始化块管理器blockmanager,代码如下。

图3-3 executor的心跳通信过程

env.blockmanager.initialize(applicationid)

具体的初始化过程,请参阅第4章。