<b>3.8 taskscheduler的啟動</b>
3.6節介紹了任務排程器taskscheduler的建立,要想taskscheduler發揮作用,必須要啟動它,代碼如下。
taskscheduler.start()
taskscheduler在啟動的時候,實際調用了backend的start方法。
override def start() {
backend.start()
}
以localbackend為例,啟動localbackend時向actorsystem注冊了localactor,見代碼清單3-30所示。
3.8.1 建立localactor
建立localactor的過程主要是建構本地的executor,見代碼清單3-36。
代碼清單3-36 localactor的實作
private[spark] class localactor(scheduler:
taskschedulerimpl, executorbackend: localbackend,
private val totalcores: int) extends actor with actorlogreceive with
logging {
import context.dispatcher // to
use akka's scheduler.scheduleonce()
private var freecores = totalcores
private
val localexecutorid = sparkcontext.driver_identifier
private val localexecutorhostname = "localhost"
val executor = new executor(
localexecutorid, localexecutorhostname, scheduler.conf.getall,
totalcores, islocal = true)
override def receivewithlogging = {
case reviveoffers =>
reviveoffers()
case statusupdate(taskid, state, serializeddata) =>
scheduler.statusupdate(taskid, state, serializeddata)
if (taskstate.isfinished(state)) {
freecores +=
scheduler.cpus_per_task
reviveoffers()
}
case killtask(taskid, interruptthread) =>
executor.killtask(taskid, interruptthread)
case stopexecutor =>
executor.stop()
executor的建構,見代碼清單3-37,主要包括以下步驟。
1)建立并注冊executorsource。executorsource是做什麼的呢?筆者将在3.8.2節詳細介紹。
2)擷取sparkenv。如果是非local模式,worker上的coarsegrainedexecutorbackend向driver上的coarsegrainedexecutorbackend注冊executor時,則需要建立sparkenv。可以修改屬性spark.executor.port(預設為0,表示随機生成)來配置executor中的actorsystem的端口号。
3)建立并注冊executoractor。executoractor負責接受發送給executor的消息。
4)urlclassloader的建立。為什麼需要建立這個classloader?在非local模式中,driver或者worker上都會有多個executor,每個executor都設定自身的urlclassloader,用于加載任務上傳的jar包中的類,有效對任務的類加載環境進行隔離。
5)建立executor執行task的線程池。此線程池用于執行任務。
6)啟動executor的心跳線程。此線程用于向driver發送心跳。
此外,還包括akka發送消息的幀大小(10 485 760位元組)、結果總大小的位元組限制(1 073 741 824位元組)、正在運作的task的清單、設定serializer的預設classloader為建立的classloader等。
代碼清單3-37 executor的建構
val executorsource = new executorsource(this, executorid)
private val env = {
if (!islocal) {
val port = conf.getint("spark.executor.port", 0)
val _env = sparkenv.createexecutorenv(
conf, executorid,
executorhostname, port, numcores, islocal, actorsystem)
sparkenv.set(_env)
_env.metricssystem.registersource(executorsource)
_env.blockmanager.initialize(conf.getappid)
_env
} else {
sparkenv.get
private val executoractor = env.actorsystem.actorof(
props(new executoractor(executorid)), "executoractor")
private val urlclassloader = createclassloader()
private val replclassloader = addreplclassloaderifneeded(urlclassloader)
env.serializer.setdefaultclassloader(urlclassloader)
private val akkaframesize = akkautils.maxframesizebytes(conf)
private val maxresultsize = utils.getmaxresultsize(conf)
val threadpool = utils.newdaemoncachedthreadpool("executor task
launch worker")
private val runningtasks = new concurrenthashmap[long, taskrunner]
startdriverheartbeater()
3.8.2 executorsource的建立與注冊
executorsource用于測量系統。通過metricregistry的register方法注冊計量,這些計量資訊包括threadpool.activetasks、threadpool.completetasks、threadpool.currentpool_size、thread-pool.maxpool_size、filesystem.hdfs.write_bytes、filesystem.hdfs.read_ops、filesystem.file.write_bytes、filesystem.hdfs.largeread_ops、filesystem.hdfs.write_ops等,executorsource的實作見代碼清單3-38。metric接口的具體實作,參考附錄d。
代碼清單3-38 executorsource的實作
private[spark] class executorsource(val
executor: executor, executorid: string) extends source {
private def filestats(scheme: string) : option[filesystem.statistics] =
filesystem.getallstatistics().filter(s =>
s.getscheme.equals(scheme)).headoption
private def registerfilesystemstat[t](
scheme: string, name: string, f: filesystem.statistics => t,
defaultvalue: t) = {
metricregistry.register(metricregistry.name("filesystem",
scheme, name), new gauge[t] {
override def getvalue: t = filestats(scheme).map(f).getorelse
(defaultvalue)
})
override val metricregistry = new metricregistry()
override val sourcename = "executor"
metricregistry.register(metricregistry.name("threadpool",
"activetasks"), new gauge[int] {
override def getvalue: int = executor.threadpool.getactivecount()
"completetasks"), new gauge[long] {
override def getvalue: long =
executor.threadpool.getcompletedtaskcount()
"currentpool_size"), new gauge[int] {
override def getvalue: int = executor.threadpool.getpoolsize()
"maxpool_size"), new gauge[int] {
override def getvalue: int = executor.threadpool.getmaximumpoolsize()
// gauge for file system stats of this executor
for (scheme <- array("hdfs", "file")) {
registerfilesystemstat(scheme, "read_bytes", _.getbytesread(),
0l)
registerfilesystemstat(scheme, "write_bytes",
_.getbyteswritten(), 0l)
registerfilesystemstat(scheme, "read_ops", _.getreadops(), 0)
registerfilesystemstat(scheme, "largeread_ops",
_.getlargereadops(), 0)
registerfilesystemstat(scheme, "write_ops", _.getwriteops(),
0)
建立完executorsource後,調用metricssystem的registersource方法将executorsource注冊到metricssystem。registersource方法使用metricregistry的register方法,将source注冊到metricregistry,見代碼清單3-39。關于metricregistry,具體參閱附錄d。
代碼清單3-39 metricssystem注冊source的實作
def registersource(source: source) {
sources += source
try {
val regname = buildregistryname(source)
registry.register(regname, source.metricregistry)
catch {
case e: illegalargumentexception => loginfo("metrics already
registered", e)
3.8.3 executoractor的建構與注冊
executoractor很簡單,當接收到sparkui發來的消息時,将所有線程的棧資訊發送回去,代碼實作如下。
case triggerthreaddump =>
sender ! utils.getthreaddump()
3.8.4 spark自身classloader的建立
擷取要建立的classloader的父加載器currentloader,然後根據currentjars生成url數組,spark.files.userclasspathfirst屬性指定加載類時是否先從使用者的classpath下加載,最後建立executorurlclassloader或者childexecutorurlclassloader,見代碼清單3-40。
代碼清單3-40 spark自身classloader的建立
private def createclassloader():
mutableurlclassloader = {
val currentloader = utils.getcontextorsparkclassloader
val urls = currentjars.keyset.map { uri =>
new file(uri.split("/").last).touri.tourl
}.toarray
val userclasspathfirst =
conf.getboolean("spark.files.userclasspathfirst", false)
userclasspathfirst match {
case true => new childexecutorurlclassloader(urls, currentloader)
case false => new executorurlclassloader(urls, currentloader)
utils.getcontextorsparkclassloader的實作見附錄a。executorurlclassloader或者child-executorurlclassloader實際上都繼承了urlclassloader,見代碼清單3-41。
代碼清單3-41 childexecutorurlclassloader和executorlirlclassloader的實作
private[spark] class
childexecutorurlclassloader(urls: array[url], parent: classloader)
extends mutableurlclassloader {
private object userclassloader extends urlclassloader(urls, null){
override def addurl(url: url) {
super.addurl(url)
override def findclass(name: string): class[_] = {
super.findclass(name)
private val parentclassloader = new
parentclassloader(parent)
override def findclass(name: string):
class[_] = {
try {
userclassloader.findclass(name)
case e: classnotfoundexception => {
parentclassloader.loadclass(name)
def addurl(url: url) {
userclassloader.addurl(url)
def geturls() = {
userclassloader.geturls()
executorurlclassloader(urls: array[url], parent: classloader)
extends urlclassloader(urls, parent) with mutableurlclassloader {
如果需要repl互動,還會調用addreplclassloaderifneeded建立replclassloader,見代碼清單3-42。
代碼清單3-42 addreplclassloaderifneeded的實作
private def
addreplclassloaderifneeded(parent: classloader): classloader = {
val classuri = conf.get("spark.repl.class.uri", null)
if (classuri != null) {
loginfo("using repl class uri: " + classuri)
val userclasspathfirst: java.lang.boolean =
val klass =
class.forname("org.apache.spark.repl.executorclassloader")
.asinstanceof[class[_ <: classloader]]
val constructor = klass.getconstructor(classof[sparkconf],
classof[string],
classof[classloader], classof[boolean])
constructor.newinstance(conf, classuri, parent, userclasspathfirst)
case _: classnotfoundexception =>
logerror("could not find org.apache.spark.repl.executorclassloader
on classpath!")
system.exit(1)
null
else {
parent
3.8.5 啟動executor的心跳線程
executor的心跳由startdriverheartbeater啟動,見代碼清單3-43。executor心跳線程的間隔由屬性spark.executor.heartbeatinterval配置,預設是10 000毫秒。此外,逾時時間是30秒,逾時重試次數是3次,重試間隔是3000毫秒,使用actorsystem.actorselection
(url)方法查找到比對的actor引用, url是akka.tcp://sparkdriver@
$driverhost:$driverport/user/heartbeat-receiver,最終建立一個運作過程中,每次會休眠10
000~20 000毫秒的線程。此線程從runningtasks擷取最新的有關task的測量資訊,将其與executorid、blockmanagerid封裝為heartbeat消息,向heartbeatreceiver發送heartbeat消息。
代碼清單3-43 啟動executor的心跳線程
def startdriverheartbeater() {
val interval = conf.getint("spark.executor.heartbeatinterval",
10000)
val timeout = akkautils.lookuptimeout(conf)
val
retryattempts = akkautils.numretries(conf)
val retryintervalms = akkautils.retrywaitms(conf)
val heartbeatreceiverref =
akkautils.makedriverref("heartbeatreceiver", conf,env.actorsystem)
val t = new thread() {
override def run() {
// sleep a random interval so the heartbeats don't end up in sync
thread.sleep(interval + (math.random * interval).asinstanceof[int])
while (!isstopped) {
val tasksmetrics = new
arraybuffer[(long, taskmetrics)]()
val curgctime = gctime
for (taskrunner <-
runningtasks.values()) {
if
(!taskrunner.attemptedtask.isempty) {
option(taskrunner.task).flatmap(_.metrics).foreach { metrics =>
metrics.updateshufflereadmetrics
metrics.jvmgctime =
curgctime - taskrunner.startgctime
if (islocal) {
val
copiedmetrics = utils.deserialize[taskmetrics](utils.serialize(metrics))
tasksmetrics +=
((taskrunner.taskid, copiedmetrics))
} else {
// it will be
copied by serialization
tasksmetrics +=
((taskrunner.taskid, metrics))
}
}
}
val message = heartbeat(executorid, tasksmetrics.toarray,
env.blockmanager.blockmanagerid)
val response =
akkautils.askwithreply[heartbeatresponse](message, heartbeatreceiverref,
retryattempts,
retryintervalms, timeout)
if
(response.reregisterblockmanager) {
logwarning("told to
re-register on heartbeat")
env.blockmanager.reregister()
} catch {
case nonfatal(t) =>
logwarning("issue communicating with driver in heartbeater", t)
thread.sleep(interval)
t.setdaemon(true)
t.setname("driver heartbeater")
t.start()
這個心跳線程的作用是什麼呢?其作用有兩個:
更新正在處理的任務的測量資訊;
通知blockmanagermaster,此executor上的blockmanager依然活着。
下面對心跳線程的實作詳細分析下,讀者可以自行選擇是否需要閱讀。
初始化taskschedulerimpl後會建立心跳接收器heartbeatreceiver。heartbeatreceiver接收所有配置設定給目前driver application的executor的心跳,并将task、task計量資訊、心跳等交給taskschedulerimpl和dagscheduler作進一步處理。建立心跳接收器的代碼如下。
private val heartbeatreceiver =
env.actorsystem.actorof(
props(new heartbeatreceiver(taskscheduler)), "heartbeatreceiver")
heartbeatreceiver在收到心跳消息後,會調用taskscheduler的executorheartbeatreceived方法,代碼如下。
case heartbeat(executorid, taskmetrics, blockmanagerid) =>
val response = heartbeatresponse(
!scheduler.executorheartbeatreceived(executorid, taskmetrics,
blockmanagerid))
sender ! response
}
executorheartbeatreceived的實作代碼如下。
val metricswithstageids: array[(long, int,
int, taskmetrics)] = synchronized {
taskmetrics.flatmap { case (id, metrics) =>
taskidtotasksetid.get(id)
.flatmap(activetasksets.get)
.map(tasksetmgr => (id, tasksetmgr.stageid,
tasksetmgr.taskset.attempt, metrics))
dagscheduler.executorheartbeatreceived(execid,
metricswithstageids, blockmanagerid)
這段程式通過周遊taskmetrics,依據taskidtotasksetid和activetasksets找到taskset-manager。然後将taskid、tasksetmanager.stageid、tasksetmanager .taskset.attempt、taskmetrics封裝到類型為array[(long,
int, int, taskmetrics)]的數組metricswithstageids中。最後調用了dag-scheduler的executorheartbeatreceived方法,其實作如下。
listenerbus.post(sparklistenerexecutormetricsupdate(execid,
taskmetrics))
implicit val timeout = timeout(600 seconds)
await.result(
blockmanagermaster.driveractor ? blockmanagerheartbeat(blockmanagerid),
timeout.duration).asinstanceof[boolean]
dagscheduler将executorid、metricswithstageids封裝為sparklistenerexecutormetricsupdate事件,并post到listenerbus中,此事件用于更新stage的各種測量資料。最後給blockmanagermaster持有的blockmanagermasteractor發送blockmanagerheartbeat消息。blockmanagermasteractor在收到消息後會比對執行heartbeatreceived方法(參見4.3.1節)。heartbeatreceived最終更新blockmanagermaster對blockmanger的最後可見時間(即更新block-managerid對應的blockmanagerinfo的_lastseenms,見代碼清單3-44)。
代碼清單3-44 blockmanagermasteractor的心跳處理
heartbeatreceived(blockmanagerid: blockmanagerid): boolean = {
if (!blockmanagerinfo.contains(blockmanagerid)) {
blockmanagerid.isdriver && !islocal
blockmanagerinfo(blockmanagerid).updatelastseenms()
true
local模式下executor的心跳通信過程,可以用圖3-3來表示。
在非local模式中,executor發送心跳的過程是一樣的,主要的差別是executor程序與driver不在同一個程序,甚至不在同一個節點上。
接下來會初始化塊管理器blockmanager,代碼如下。
圖3-3 executor的心跳通信過程
env.blockmanager.initialize(applicationid)
具體的初始化過程,請參閱第4章。