<b>3.8 taskscheduler的启动</b>
3.6节介绍了任务调度器taskscheduler的创建,要想taskscheduler发挥作用,必须要启动它,代码如下。
taskscheduler.start()
taskscheduler在启动的时候,实际调用了backend的start方法。
override def start() {
backend.start()
}
以localbackend为例,启动localbackend时向actorsystem注册了localactor,见代码清单3-30所示。
3.8.1 创建localactor
创建localactor的过程主要是构建本地的executor,见代码清单3-36。
代码清单3-36 localactor的实现
private[spark] class localactor(scheduler:
taskschedulerimpl, executorbackend: localbackend,
private val totalcores: int) extends actor with actorlogreceive with
logging {
import context.dispatcher // to
use akka's scheduler.scheduleonce()
private var freecores = totalcores
private
val localexecutorid = sparkcontext.driver_identifier
private val localexecutorhostname = "localhost"
val executor = new executor(
localexecutorid, localexecutorhostname, scheduler.conf.getall,
totalcores, islocal = true)
override def receivewithlogging = {
case reviveoffers =>
reviveoffers()
case statusupdate(taskid, state, serializeddata) =>
scheduler.statusupdate(taskid, state, serializeddata)
if (taskstate.isfinished(state)) {
freecores +=
scheduler.cpus_per_task
reviveoffers()
}
case killtask(taskid, interruptthread) =>
executor.killtask(taskid, interruptthread)
case stopexecutor =>
executor.stop()
executor的构建,见代码清单3-37,主要包括以下步骤。
1)创建并注册executorsource。executorsource是做什么的呢?笔者将在3.8.2节详细介绍。
2)获取sparkenv。如果是非local模式,worker上的coarsegrainedexecutorbackend向driver上的coarsegrainedexecutorbackend注册executor时,则需要新建sparkenv。可以修改属性spark.executor.port(默认为0,表示随机生成)来配置executor中的actorsystem的端口号。
3)创建并注册executoractor。executoractor负责接受发送给executor的消息。
4)urlclassloader的创建。为什么需要创建这个classloader?在非local模式中,driver或者worker上都会有多个executor,每个executor都设置自身的urlclassloader,用于加载任务上传的jar包中的类,有效对任务的类加载环境进行隔离。
5)创建executor执行task的线程池。此线程池用于执行任务。
6)启动executor的心跳线程。此线程用于向driver发送心跳。
此外,还包括akka发送消息的帧大小(10 485 760字节)、结果总大小的字节限制(1 073 741 824字节)、正在运行的task的列表、设置serializer的默认classloader为创建的classloader等。
代码清单3-37 executor的构建
val executorsource = new executorsource(this, executorid)
private val env = {
if (!islocal) {
val port = conf.getint("spark.executor.port", 0)
val _env = sparkenv.createexecutorenv(
conf, executorid,
executorhostname, port, numcores, islocal, actorsystem)
sparkenv.set(_env)
_env.metricssystem.registersource(executorsource)
_env.blockmanager.initialize(conf.getappid)
_env
} else {
sparkenv.get
private val executoractor = env.actorsystem.actorof(
props(new executoractor(executorid)), "executoractor")
private val urlclassloader = createclassloader()
private val replclassloader = addreplclassloaderifneeded(urlclassloader)
env.serializer.setdefaultclassloader(urlclassloader)
private val akkaframesize = akkautils.maxframesizebytes(conf)
private val maxresultsize = utils.getmaxresultsize(conf)
val threadpool = utils.newdaemoncachedthreadpool("executor task
launch worker")
private val runningtasks = new concurrenthashmap[long, taskrunner]
startdriverheartbeater()
3.8.2 executorsource的创建与注册
executorsource用于测量系统。通过metricregistry的register方法注册计量,这些计量信息包括threadpool.activetasks、threadpool.completetasks、threadpool.currentpool_size、thread-pool.maxpool_size、filesystem.hdfs.write_bytes、filesystem.hdfs.read_ops、filesystem.file.write_bytes、filesystem.hdfs.largeread_ops、filesystem.hdfs.write_ops等,executorsource的实现见代码清单3-38。metric接口的具体实现,参考附录d。
代码清单3-38 executorsource的实现
private[spark] class executorsource(val
executor: executor, executorid: string) extends source {
private def filestats(scheme: string) : option[filesystem.statistics] =
filesystem.getallstatistics().filter(s =>
s.getscheme.equals(scheme)).headoption
private def registerfilesystemstat[t](
scheme: string, name: string, f: filesystem.statistics => t,
defaultvalue: t) = {
metricregistry.register(metricregistry.name("filesystem",
scheme, name), new gauge[t] {
override def getvalue: t = filestats(scheme).map(f).getorelse
(defaultvalue)
})
override val metricregistry = new metricregistry()
override val sourcename = "executor"
metricregistry.register(metricregistry.name("threadpool",
"activetasks"), new gauge[int] {
override def getvalue: int = executor.threadpool.getactivecount()
"completetasks"), new gauge[long] {
override def getvalue: long =
executor.threadpool.getcompletedtaskcount()
"currentpool_size"), new gauge[int] {
override def getvalue: int = executor.threadpool.getpoolsize()
"maxpool_size"), new gauge[int] {
override def getvalue: int = executor.threadpool.getmaximumpoolsize()
// gauge for file system stats of this executor
for (scheme <- array("hdfs", "file")) {
registerfilesystemstat(scheme, "read_bytes", _.getbytesread(),
0l)
registerfilesystemstat(scheme, "write_bytes",
_.getbyteswritten(), 0l)
registerfilesystemstat(scheme, "read_ops", _.getreadops(), 0)
registerfilesystemstat(scheme, "largeread_ops",
_.getlargereadops(), 0)
registerfilesystemstat(scheme, "write_ops", _.getwriteops(),
0)
创建完executorsource后,调用metricssystem的registersource方法将executorsource注册到metricssystem。registersource方法使用metricregistry的register方法,将source注册到metricregistry,见代码清单3-39。关于metricregistry,具体参阅附录d。
代码清单3-39 metricssystem注册source的实现
def registersource(source: source) {
sources += source
try {
val regname = buildregistryname(source)
registry.register(regname, source.metricregistry)
catch {
case e: illegalargumentexception => loginfo("metrics already
registered", e)
3.8.3 executoractor的构建与注册
executoractor很简单,当接收到sparkui发来的消息时,将所有线程的栈信息发送回去,代码实现如下。
case triggerthreaddump =>
sender ! utils.getthreaddump()
3.8.4 spark自身classloader的创建
获取要创建的classloader的父加载器currentloader,然后根据currentjars生成url数组,spark.files.userclasspathfirst属性指定加载类时是否先从用户的classpath下加载,最后创建executorurlclassloader或者childexecutorurlclassloader,见代码清单3-40。
代码清单3-40 spark自身classloader的创建
private def createclassloader():
mutableurlclassloader = {
val currentloader = utils.getcontextorsparkclassloader
val urls = currentjars.keyset.map { uri =>
new file(uri.split("/").last).touri.tourl
}.toarray
val userclasspathfirst =
conf.getboolean("spark.files.userclasspathfirst", false)
userclasspathfirst match {
case true => new childexecutorurlclassloader(urls, currentloader)
case false => new executorurlclassloader(urls, currentloader)
utils.getcontextorsparkclassloader的实现见附录a。executorurlclassloader或者child-executorurlclassloader实际上都继承了urlclassloader,见代码清单3-41。
代码清单3-41 childexecutorurlclassloader和executorlirlclassloader的实现
private[spark] class
childexecutorurlclassloader(urls: array[url], parent: classloader)
extends mutableurlclassloader {
private object userclassloader extends urlclassloader(urls, null){
override def addurl(url: url) {
super.addurl(url)
override def findclass(name: string): class[_] = {
super.findclass(name)
private val parentclassloader = new
parentclassloader(parent)
override def findclass(name: string):
class[_] = {
try {
userclassloader.findclass(name)
case e: classnotfoundexception => {
parentclassloader.loadclass(name)
def addurl(url: url) {
userclassloader.addurl(url)
def geturls() = {
userclassloader.geturls()
executorurlclassloader(urls: array[url], parent: classloader)
extends urlclassloader(urls, parent) with mutableurlclassloader {
如果需要repl交互,还会调用addreplclassloaderifneeded创建replclassloader,见代码清单3-42。
代码清单3-42 addreplclassloaderifneeded的实现
private def
addreplclassloaderifneeded(parent: classloader): classloader = {
val classuri = conf.get("spark.repl.class.uri", null)
if (classuri != null) {
loginfo("using repl class uri: " + classuri)
val userclasspathfirst: java.lang.boolean =
val klass =
class.forname("org.apache.spark.repl.executorclassloader")
.asinstanceof[class[_ <: classloader]]
val constructor = klass.getconstructor(classof[sparkconf],
classof[string],
classof[classloader], classof[boolean])
constructor.newinstance(conf, classuri, parent, userclasspathfirst)
case _: classnotfoundexception =>
logerror("could not find org.apache.spark.repl.executorclassloader
on classpath!")
system.exit(1)
null
else {
parent
3.8.5 启动executor的心跳线程
executor的心跳由startdriverheartbeater启动,见代码清单3-43。executor心跳线程的间隔由属性spark.executor.heartbeatinterval配置,默认是10 000毫秒。此外,超时时间是30秒,超时重试次数是3次,重试间隔是3000毫秒,使用actorsystem.actorselection
(url)方法查找到匹配的actor引用, url是akka.tcp://sparkdriver@
$driverhost:$driverport/user/heartbeat-receiver,最终创建一个运行过程中,每次会休眠10
000~20 000毫秒的线程。此线程从runningtasks获取最新的有关task的测量信息,将其与executorid、blockmanagerid封装为heartbeat消息,向heartbeatreceiver发送heartbeat消息。
代码清单3-43 启动executor的心跳线程
def startdriverheartbeater() {
val interval = conf.getint("spark.executor.heartbeatinterval",
10000)
val timeout = akkautils.lookuptimeout(conf)
val
retryattempts = akkautils.numretries(conf)
val retryintervalms = akkautils.retrywaitms(conf)
val heartbeatreceiverref =
akkautils.makedriverref("heartbeatreceiver", conf,env.actorsystem)
val t = new thread() {
override def run() {
// sleep a random interval so the heartbeats don't end up in sync
thread.sleep(interval + (math.random * interval).asinstanceof[int])
while (!isstopped) {
val tasksmetrics = new
arraybuffer[(long, taskmetrics)]()
val curgctime = gctime
for (taskrunner <-
runningtasks.values()) {
if
(!taskrunner.attemptedtask.isempty) {
option(taskrunner.task).flatmap(_.metrics).foreach { metrics =>
metrics.updateshufflereadmetrics
metrics.jvmgctime =
curgctime - taskrunner.startgctime
if (islocal) {
val
copiedmetrics = utils.deserialize[taskmetrics](utils.serialize(metrics))
tasksmetrics +=
((taskrunner.taskid, copiedmetrics))
} else {
// it will be
copied by serialization
tasksmetrics +=
((taskrunner.taskid, metrics))
}
}
}
val message = heartbeat(executorid, tasksmetrics.toarray,
env.blockmanager.blockmanagerid)
val response =
akkautils.askwithreply[heartbeatresponse](message, heartbeatreceiverref,
retryattempts,
retryintervalms, timeout)
if
(response.reregisterblockmanager) {
logwarning("told to
re-register on heartbeat")
env.blockmanager.reregister()
} catch {
case nonfatal(t) =>
logwarning("issue communicating with driver in heartbeater", t)
thread.sleep(interval)
t.setdaemon(true)
t.setname("driver heartbeater")
t.start()
这个心跳线程的作用是什么呢?其作用有两个:
更新正在处理的任务的测量信息;
通知blockmanagermaster,此executor上的blockmanager依然活着。
下面对心跳线程的实现详细分析下,读者可以自行选择是否需要阅读。
初始化taskschedulerimpl后会创建心跳接收器heartbeatreceiver。heartbeatreceiver接收所有分配给当前driver application的executor的心跳,并将task、task计量信息、心跳等交给taskschedulerimpl和dagscheduler作进一步处理。创建心跳接收器的代码如下。
private val heartbeatreceiver =
env.actorsystem.actorof(
props(new heartbeatreceiver(taskscheduler)), "heartbeatreceiver")
heartbeatreceiver在收到心跳消息后,会调用taskscheduler的executorheartbeatreceived方法,代码如下。
case heartbeat(executorid, taskmetrics, blockmanagerid) =>
val response = heartbeatresponse(
!scheduler.executorheartbeatreceived(executorid, taskmetrics,
blockmanagerid))
sender ! response
}
executorheartbeatreceived的实现代码如下。
val metricswithstageids: array[(long, int,
int, taskmetrics)] = synchronized {
taskmetrics.flatmap { case (id, metrics) =>
taskidtotasksetid.get(id)
.flatmap(activetasksets.get)
.map(tasksetmgr => (id, tasksetmgr.stageid,
tasksetmgr.taskset.attempt, metrics))
dagscheduler.executorheartbeatreceived(execid,
metricswithstageids, blockmanagerid)
这段程序通过遍历taskmetrics,依据taskidtotasksetid和activetasksets找到taskset-manager。然后将taskid、tasksetmanager.stageid、tasksetmanager .taskset.attempt、taskmetrics封装到类型为array[(long,
int, int, taskmetrics)]的数组metricswithstageids中。最后调用了dag-scheduler的executorheartbeatreceived方法,其实现如下。
listenerbus.post(sparklistenerexecutormetricsupdate(execid,
taskmetrics))
implicit val timeout = timeout(600 seconds)
await.result(
blockmanagermaster.driveractor ? blockmanagerheartbeat(blockmanagerid),
timeout.duration).asinstanceof[boolean]
dagscheduler将executorid、metricswithstageids封装为sparklistenerexecutormetricsupdate事件,并post到listenerbus中,此事件用于更新stage的各种测量数据。最后给blockmanagermaster持有的blockmanagermasteractor发送blockmanagerheartbeat消息。blockmanagermasteractor在收到消息后会匹配执行heartbeatreceived方法(参见4.3.1节)。heartbeatreceived最终更新blockmanagermaster对blockmanger的最后可见时间(即更新block-managerid对应的blockmanagerinfo的_lastseenms,见代码清单3-44)。
代码清单3-44 blockmanagermasteractor的心跳处理
heartbeatreceived(blockmanagerid: blockmanagerid): boolean = {
if (!blockmanagerinfo.contains(blockmanagerid)) {
blockmanagerid.isdriver && !islocal
blockmanagerinfo(blockmanagerid).updatelastseenms()
true
local模式下executor的心跳通信过程,可以用图3-3来表示。
在非local模式中,executor发送心跳的过程是一样的,主要的区别是executor进程与driver不在同一个进程,甚至不在同一个节点上。
接下来会初始化块管理器blockmanager,代码如下。
图3-3 executor的心跳通信过程
env.blockmanager.initialize(applicationid)
具体的初始化过程,请参阅第4章。