天天看點

深入了解Spark:核心思想與源碼分析. 3.8 TaskScheduler的啟動

<b>3.8 taskscheduler的啟動</b>

3.6節介紹了任務排程器taskscheduler的建立,要想taskscheduler發揮作用,必須要啟動它,代碼如下。

taskscheduler.start()

taskscheduler在啟動的時候,實際調用了backend的start方法。

override def start() {

backend.start()

    }

以localbackend為例,啟動localbackend時向actorsystem注冊了localactor,見代碼清單3-30所示。

3.8.1 建立localactor

建立localactor的過程主要是建構本地的executor,見代碼清單3-36。

代碼清單3-36 localactor的實作

private[spark] class localactor(scheduler:

taskschedulerimpl, executorbackend: localbackend,

private val totalcores: int) extends actor with actorlogreceive with

logging {

import context.dispatcher   // to

use akka's scheduler.scheduleonce()

private var freecores = totalcores

    private

val localexecutorid = sparkcontext.driver_identifier

private val localexecutorhostname = "localhost"

val executor = new executor(

localexecutorid, localexecutorhostname, scheduler.conf.getall,

totalcores, islocal = true)

override def receivewithlogging = {

case reviveoffers =&gt;

reviveoffers()

case statusupdate(taskid, state, serializeddata) =&gt;

scheduler.statusupdate(taskid, state, serializeddata)

if (taskstate.isfinished(state)) {

                freecores +=

scheduler.cpus_per_task

                reviveoffers()

}

case killtask(taskid, interruptthread) =&gt;

executor.killtask(taskid, interruptthread)

case stopexecutor =&gt;

    executor.stop()

executor的建構,見代碼清單3-37,主要包括以下步驟。

1)建立并注冊executorsource。executorsource是做什麼的呢?筆者将在3.8.2節詳細介紹。

2)擷取sparkenv。如果是非local模式,worker上的coarsegrainedexecutorbackend向driver上的coarsegrainedexecutorbackend注冊executor時,則需要建立sparkenv。可以修改屬性spark.executor.port(預設為0,表示随機生成)來配置executor中的actorsystem的端口号。

3)建立并注冊executoractor。executoractor負責接受發送給executor的消息。

4)urlclassloader的建立。為什麼需要建立這個classloader?在非local模式中,driver或者worker上都會有多個executor,每個executor都設定自身的urlclassloader,用于加載任務上傳的jar包中的類,有效對任務的類加載環境進行隔離。

5)建立executor執行task的線程池。此線程池用于執行任務。

6)啟動executor的心跳線程。此線程用于向driver發送心跳。

此外,還包括akka發送消息的幀大小(10 485 760位元組)、結果總大小的位元組限制(1 073 741 824位元組)、正在運作的task的清單、設定serializer的預設classloader為建立的classloader等。

代碼清單3-37 executor的建構

val executorsource = new executorsource(this, executorid)

private val env = {

if (!islocal) {

val port = conf.getint("spark.executor.port", 0)

val _env = sparkenv.createexecutorenv(

                conf, executorid,

executorhostname, port, numcores, islocal, actorsystem)

sparkenv.set(_env)

_env.metricssystem.registersource(executorsource)

_env.blockmanager.initialize(conf.getappid)

_env

} else {

sparkenv.get

private val executoractor = env.actorsystem.actorof(

props(new executoractor(executorid)), "executoractor")

private val urlclassloader = createclassloader()

private val replclassloader = addreplclassloaderifneeded(urlclassloader)

env.serializer.setdefaultclassloader(urlclassloader)

private val akkaframesize = akkautils.maxframesizebytes(conf)

private val maxresultsize = utils.getmaxresultsize(conf)

val threadpool = utils.newdaemoncachedthreadpool("executor task

launch worker")

private val runningtasks = new concurrenthashmap[long, taskrunner]

startdriverheartbeater()

3.8.2 executorsource的建立與注冊

executorsource用于測量系統。通過metricregistry的register方法注冊計量,這些計量資訊包括threadpool.activetasks、threadpool.completetasks、threadpool.currentpool_size、thread-pool.maxpool_size、filesystem.hdfs.write_bytes、filesystem.hdfs.read_ops、filesystem.file.write_bytes、filesystem.hdfs.largeread_ops、filesystem.hdfs.write_ops等,executorsource的實作見代碼清單3-38。metric接口的具體實作,參考附錄d。

代碼清單3-38 executorsource的實作

private[spark] class executorsource(val

executor: executor, executorid: string) extends source {

private def filestats(scheme: string) : option[filesystem.statistics] =

filesystem.getallstatistics().filter(s =&gt;

s.getscheme.equals(scheme)).headoption

private def registerfilesystemstat[t](

scheme: string, name: string, f: filesystem.statistics =&gt; t,

defaultvalue: t) = {

metricregistry.register(metricregistry.name("filesystem",

scheme, name), new gauge[t] {

override def getvalue: t = filestats(scheme).map(f).getorelse

(defaultvalue)

})

override val metricregistry = new metricregistry()

override val sourcename = "executor"

metricregistry.register(metricregistry.name("threadpool",

"activetasks"), new gauge[int] {

override def getvalue: int = executor.threadpool.getactivecount()

"completetasks"), new gauge[long] {

override def getvalue: long =

executor.threadpool.getcompletedtaskcount()

"currentpool_size"), new gauge[int] {

override def getvalue: int = executor.threadpool.getpoolsize()

"maxpool_size"), new gauge[int] {

override def getvalue: int = executor.threadpool.getmaximumpoolsize()

// gauge for file system stats of this executor

for (scheme &lt;- array("hdfs", "file")) {

registerfilesystemstat(scheme, "read_bytes", _.getbytesread(),

0l)

registerfilesystemstat(scheme, "write_bytes",

_.getbyteswritten(), 0l)

registerfilesystemstat(scheme, "read_ops", _.getreadops(), 0)

registerfilesystemstat(scheme, "largeread_ops",

_.getlargereadops(), 0)

registerfilesystemstat(scheme, "write_ops", _.getwriteops(),

0)

建立完executorsource後,調用metricssystem的registersource方法将executorsource注冊到metricssystem。registersource方法使用metricregistry的register方法,将source注冊到metricregistry,見代碼清單3-39。關于metricregistry,具體參閱附錄d。

代碼清單3-39 metricssystem注冊source的實作

def registersource(source: source) {

sources += source

    try {

val regname = buildregistryname(source)

registry.register(regname, source.metricregistry)

catch {

case e: illegalargumentexception =&gt; loginfo("metrics already

registered", e)

3.8.3 executoractor的建構與注冊

executoractor很簡單,當接收到sparkui發來的消息時,将所有線程的棧資訊發送回去,代碼實作如下。

case triggerthreaddump =&gt;

sender ! utils.getthreaddump()

3.8.4 spark自身classloader的建立

擷取要建立的classloader的父加載器currentloader,然後根據currentjars生成url數組,spark.files.userclasspathfirst屬性指定加載類時是否先從使用者的classpath下加載,最後建立executorurlclassloader或者childexecutorurlclassloader,見代碼清單3-40。

代碼清單3-40 spark自身classloader的建立

private def createclassloader():

mutableurlclassloader = {

val currentloader = utils.getcontextorsparkclassloader

val urls = currentjars.keyset.map { uri =&gt;

new file(uri.split("/").last).touri.tourl

}.toarray

val userclasspathfirst =

conf.getboolean("spark.files.userclasspathfirst", false)

userclasspathfirst match {

case true =&gt; new childexecutorurlclassloader(urls, currentloader)

case false =&gt; new executorurlclassloader(urls, currentloader)

utils.getcontextorsparkclassloader的實作見附錄a。executorurlclassloader或者child-executorurlclassloader實際上都繼承了urlclassloader,見代碼清單3-41。

代碼清單3-41 childexecutorurlclassloader和executorlirlclassloader的實作

private[spark] class

childexecutorurlclassloader(urls: array[url], parent: classloader)

extends mutableurlclassloader {

private object userclassloader extends urlclassloader(urls, null){

override def addurl(url: url) {

super.addurl(url)

override def findclass(name: string): class[_] = {

super.findclass(name)

private val parentclassloader = new

parentclassloader(parent)

override def findclass(name: string):

class[_] = {

try {

userclassloader.findclass(name)

case e: classnotfoundexception =&gt; {

parentclassloader.loadclass(name)

def addurl(url: url) {

userclassloader.addurl(url)

def geturls() = {

userclassloader.geturls()

executorurlclassloader(urls: array[url], parent: classloader)

extends urlclassloader(urls, parent) with mutableurlclassloader {

如果需要repl互動,還會調用addreplclassloaderifneeded建立replclassloader,見代碼清單3-42。

代碼清單3-42 addreplclassloaderifneeded的實作

private def

addreplclassloaderifneeded(parent: classloader): classloader = {

val classuri = conf.get("spark.repl.class.uri", null)

if (classuri != null) {

loginfo("using repl class uri: " + classuri)

val userclasspathfirst: java.lang.boolean =

val klass =

class.forname("org.apache.spark.repl.executorclassloader")

.asinstanceof[class[_ &lt;: classloader]]

val constructor = klass.getconstructor(classof[sparkconf],

classof[string],

classof[classloader], classof[boolean])

constructor.newinstance(conf, classuri, parent, userclasspathfirst)

case _: classnotfoundexception =&gt;

logerror("could not find org.apache.spark.repl.executorclassloader

on classpath!")

system.exit(1)

null

else {

parent

3.8.5 啟動executor的心跳線程

executor的心跳由startdriverheartbeater啟動,見代碼清單3-43。executor心跳線程的間隔由屬性spark.executor.heartbeatinterval配置,預設是10 000毫秒。此外,逾時時間是30秒,逾時重試次數是3次,重試間隔是3000毫秒,使用actorsystem.actorselection

(url)方法查找到比對的actor引用, url是akka.tcp://sparkdriver@

$driverhost:$driverport/user/heartbeat-receiver,最終建立一個運作過程中,每次會休眠10

000~20 000毫秒的線程。此線程從runningtasks擷取最新的有關task的測量資訊,将其與executorid、blockmanagerid封裝為heartbeat消息,向heartbeatreceiver發送heartbeat消息。

代碼清單3-43 啟動executor的心跳線程

def startdriverheartbeater() {

val interval = conf.getint("spark.executor.heartbeatinterval",

10000)

val timeout = akkautils.lookuptimeout(conf)

    val

retryattempts = akkautils.numretries(conf)

val retryintervalms = akkautils.retrywaitms(conf)

val heartbeatreceiverref =

akkautils.makedriverref("heartbeatreceiver", conf,env.actorsystem)

val t = new thread() {

override def run() {

// sleep a random interval so the heartbeats don't end up in sync

thread.sleep(interval + (math.random * interval).asinstanceof[int])

while (!isstopped) {

                val tasksmetrics = new

arraybuffer[(long, taskmetrics)]()

                val curgctime = gctime

                for (taskrunner &lt;-

runningtasks.values()) {

                    if

(!taskrunner.attemptedtask.isempty) {

option(taskrunner.task).flatmap(_.metrics).foreach { metrics =&gt;

metrics.updateshufflereadmetrics

                            metrics.jvmgctime =

curgctime - taskrunner.startgctime

                            if (islocal) {

                                val

copiedmetrics = utils.deserialize[taskmetrics](utils.serialize(metrics))

                                tasksmetrics +=

((taskrunner.taskid, copiedmetrics))

                        } else {

                            // it will be

copied by serialization

                            tasksmetrics +=

((taskrunner.taskid, metrics))

                        }

                    }

                }

val message = heartbeat(executorid, tasksmetrics.toarray,

env.blockmanager.blockmanagerid)

     val response =

akkautils.askwithreply[heartbeatresponse](message, heartbeatreceiverref,

                    retryattempts,

retryintervalms, timeout)

                if

(response.reregisterblockmanager) {

                    logwarning("told to

re-register on heartbeat")

env.blockmanager.reregister()

} catch {

                case nonfatal(t) =&gt;

logwarning("issue communicating with driver in heartbeater", t)

thread.sleep(interval)

t.setdaemon(true)

t.setname("driver heartbeater")

t.start()

這個心跳線程的作用是什麼呢?其作用有兩個:

更新正在處理的任務的測量資訊;

通知blockmanagermaster,此executor上的blockmanager依然活着。

下面對心跳線程的實作詳細分析下,讀者可以自行選擇是否需要閱讀。

初始化taskschedulerimpl後會建立心跳接收器heartbeatreceiver。heartbeatreceiver接收所有配置設定給目前driver application的executor的心跳,并将task、task計量資訊、心跳等交給taskschedulerimpl和dagscheduler作進一步處理。建立心跳接收器的代碼如下。

private val heartbeatreceiver =

env.actorsystem.actorof(

props(new heartbeatreceiver(taskscheduler)), "heartbeatreceiver")

heartbeatreceiver在收到心跳消息後,會調用taskscheduler的executorheartbeatreceived方法,代碼如下。

case heartbeat(executorid, taskmetrics, blockmanagerid) =&gt;

val response = heartbeatresponse(

!scheduler.executorheartbeatreceived(executorid, taskmetrics,

blockmanagerid))

sender ! response

  }

executorheartbeatreceived的實作代碼如下。

val metricswithstageids: array[(long, int,

int, taskmetrics)] = synchronized {

taskmetrics.flatmap { case (id, metrics) =&gt;

taskidtotasksetid.get(id)

.flatmap(activetasksets.get)

.map(tasksetmgr =&gt; (id, tasksetmgr.stageid,

tasksetmgr.taskset.attempt, metrics))

dagscheduler.executorheartbeatreceived(execid,

metricswithstageids, blockmanagerid)

這段程式通過周遊taskmetrics,依據taskidtotasksetid和activetasksets找到taskset-manager。然後将taskid、tasksetmanager.stageid、tasksetmanager .taskset.attempt、taskmetrics封裝到類型為array[(long,

int, int, taskmetrics)]的數組metricswithstageids中。最後調用了dag-scheduler的executorheartbeatreceived方法,其實作如下。

listenerbus.post(sparklistenerexecutormetricsupdate(execid,

taskmetrics))

implicit val timeout = timeout(600 seconds)

await.result(

blockmanagermaster.driveractor ? blockmanagerheartbeat(blockmanagerid),

timeout.duration).asinstanceof[boolean]

dagscheduler将executorid、metricswithstageids封裝為sparklistenerexecutormetricsupdate事件,并post到listenerbus中,此事件用于更新stage的各種測量資料。最後給blockmanagermaster持有的blockmanagermasteractor發送blockmanagerheartbeat消息。blockmanagermasteractor在收到消息後會比對執行heartbeatreceived方法(參見4.3.1節)。heartbeatreceived最終更新blockmanagermaster對blockmanger的最後可見時間(即更新block-managerid對應的blockmanagerinfo的_lastseenms,見代碼清單3-44)。

代碼清單3-44 blockmanagermasteractor的心跳處理

heartbeatreceived(blockmanagerid: blockmanagerid): boolean = {

if (!blockmanagerinfo.contains(blockmanagerid)) {

blockmanagerid.isdriver &amp;&amp; !islocal

blockmanagerinfo(blockmanagerid).updatelastseenms()

true

local模式下executor的心跳通信過程,可以用圖3-3來表示。

在非local模式中,executor發送心跳的過程是一樣的,主要的差別是executor程序與driver不在同一個程序,甚至不在同一個節點上。

接下來會初始化塊管理器blockmanager,代碼如下。

圖3-3 executor的心跳通信過程

env.blockmanager.initialize(applicationid)

具體的初始化過程,請參閱第4章。