天天看點

深入了解Spark:核心思想與源碼分析. 3.4 SparkUI詳解

<b>3.4 sparkui詳解</b>

任何系統都需要提供監控功能,用浏覽器能通路具有樣式及布局并提供豐富監控資料的頁面無疑是一種簡單、高效的方式。sparkui就是這樣的服務,它的架構如圖3-1所示。

在大型分布式系統中,采用事件監聽機制是最常見的。為什麼要使用事件監聽機制?假如sparkui采用scala的函數調用方式,那麼随着整個叢集規模的增加,對函數的調用會越來越多,最終會受到driver所在jvm的線程數量限制而影響監控資料的更新,甚至出現監控資料無法及時顯示給使用者的情況。由于函數調用多數情況下是同步調用,這就導緻線程被阻塞,在分布式環境中,還可能因為網絡問題,導緻線程被長時間占用。将函數調用更換為發送事件,事件的處理是異步的,目前線程可以繼續執行後續邏輯,線程池中的線程還可以被重用,這樣整個系統的并發度會大大增加。發送的事件會存入緩存,由定時排程器取出後,配置設定給監聽此事件的監聽器對監控資料進行更新。

圖3-1 sparkui架構

我們先簡單介紹圖3-1中的各個元件:dagscheduler是主要的産生各類sparklistener-event的源頭,它将各種sparklistenerevent發送到listenerbus的事件隊列中,listenerbus通過定時器将sparklistenerevent事件比對到具體的sparklistener,改變sparklistener中的統計監控資料,最終由sparkui的界面展示。從圖3-1中還可以看到spark裡定義了很多監聽器sparklistener的實作,包括jobprogresslistener、environmentlistener、storagelistener、executorslistener,它們的類繼承體系如圖3-2所示。

圖3-2 sparklistener的類繼承體系

<b>3.4.1 listenerbus詳解</b>

listenerbus的類型是livelistenerbus。livelistenerbus實作了監聽器模型,通過監聽事件觸發對各種監聽器監聽狀态資訊的修改,達到ui界面的資料重新整理效果。livelistenerbus由以下部分組成:

事件阻塞隊列:類型為linkedblockingqueue[sparklistenerevent],固定大小是10 000;

監聽器數組:類型為arraybuffer[sparklistener],存放各類監聽器sparklistener。

事件比對監聽器的線程:此thread不斷拉取linkedblockingqueue中的事件,周遊監聽器,調用監聽器的方法。任何事件都會在linkedblockingqueue中存在一段時間,然後thread處理了此事件後,會将其清除。是以使用listenerbus這個名字再合适不過了,到站就下車。listenerbus的實作見代碼清單3-15。

代碼清單3-15 livelistenerbus的事件處理實作

private val event_queue_capacity = 10000

    private

val eventqueue = new

linkedblockingqueue[sparklistenerevent](event_queue_capacity)

private var queuefullerrormessagelogged = false

private var started = false

// a counter that represents the number of events produced and consumed

in the queue

private val eventlock = new semaphore(0)

private val listenerthread = new thread("sparklistenerbus") {

setdaemon(true)

override def run(): unit = utils.loguncaughtexceptions {

while (true) {

eventlock.acquire()

// atomically remove and process this event

livelistenerbus.this.synchronized {

                val event = eventqueue.poll

                if (event ==

sparklistenershutdown) {

                    // get out of the while

loop and shutdown the daemon thread

                    return

                }

option(event).foreach(posttoall)

}

    }

def start() {

if (started) {

throw new illegalstateexception("listener bus already

started!")

listenerthread.start()

started = true

def post(event: sparklistenerevent) {

val eventadded = eventqueue.offer(event)

if (eventadded) {

eventlock.release()

else {

logqueuefullerrormessage()

def listenerthreadisalive: boolean =

synchronized { listenerthread.isalive }

def queueisempty: boolean = synchronized {

eventqueue.isempty }

def stop() {

   if

(!started) {

throw new illegalstateexception("attempted to stop a listener bus

that has not yet started!")

post(sparklistenershutdown)

listenerthread.join()

livelistenerbus中調用的posttoall方法實際定義在父類sparklistenerbus中,如代碼清單3-16所示。

代碼清單3-16 sparklistenerbus中的監聽器調用

protected val sparklisteners = new

arraybuffer[sparklistener]

with mutable.synchronizedbuffer[sparklistener]

def addlistener(listener: sparklistener) {

sparklisteners += listener

def posttoall(event: sparklistenerevent) {

event match {

case stagesubmitted: sparklistenerstagesubmitted =&gt;

foreachlistener(_.onstagesubmitted(stagesubmitted))

case stagecompleted: sparklistenerstagecompleted =&gt;

foreachlistener(_.onstagecompleted(stagecompleted))

case jobstart: sparklistenerjobstart =&gt;

foreachlistener(_.onjobstart(jobstart))

case jobend: sparklistenerjobend =&gt;

foreachlistener(_.onjobend(jobend))

case taskstart: sparklistenertaskstart =&gt;

foreachlistener(_.ontaskstart(taskstart))

case taskgettingresult: sparklistenertaskgettingresult =&gt;

foreachlistener(_.ontaskgettingresult(taskgettingresult))

case taskend: sparklistenertaskend =&gt;

foreachlistener(_.ontaskend(taskend))

case environmentupdate: sparklistenerenvironmentupdate =&gt;

foreachlistener(_.onenvironmentupdate(environmentupdate))

case blockmanageradded: sparklistenerblockmanageradded =&gt;

foreachlistener(_.onblockmanageradded(blockmanageradded))

case blockmanagerremoved: sparklistenerblockmanagerremoved =&gt;

foreachlistener(_.onblockmanagerremoved(blockmanagerremoved))

case unpersistrdd: sparklistenerunpersistrdd =&gt;

  foreachlistener(_.onunpersistrdd(unpersistrdd))

case applicationstart: sparklistenerapplicationstart =&gt;

foreachlistener(_.onapplicationstart(applicationstart))

case applicationend: sparklistenerapplicationend =&gt;

foreachlistener(_.onapplicationend(applicationend))

case metricsupdate: sparklistenerexecutormetricsupdate =&gt;

foreachlistener(_.onexecutormetricsupdate(metricsupdate))

case sparklistenershutdown =&gt;

private def foreachlistener(f:

sparklistener =&gt; unit): unit = {

sparklisteners.foreach { listener =&gt;

try {

f(listener)

} catch {

case e: exception =&gt;

logerror(s"listener ${utils.getformattedclassname(listener)} threw

an exception", e)

<b>3.4.2 構造jobprogresslistener</b>

我們以jobprogresslistener為例來講解sparklistener。jobprogresslistener是sparkcontext中一個重要的組成部分,通過監聽listenerbus中的事件更新任務進度。sparkstatustracker和sparkui實際上也是通過jobprogresslistener來實作任務狀态跟蹤的。建立jobprogresslistener的代碼如下。

private[spark] val jobprogresslistener =

new jobprogresslistener(conf)

listenerbus.addlistener(jobprogresslistener)

val statustracker = new

sparkstatustracker(this)

jobprogresslistener的作用是通過hashmap、listbuffer等資料結構存儲jobid及對應的jobuidata資訊,并按照激活、完成、失敗等job狀态統計。對于stageid、stageinfo等資訊按照激活、完成、忽略、失敗等stage狀态統計,并且存儲stageid與jobid的一對多關系。這些統計資訊最終會被jobpage和stagepage等頁面通路和渲染。jobprogresslistener的資料結構見代碼清單3-17。

代碼清單3-17 jobprogresslistener維護的資訊

class jobprogresslistener(conf: sparkconf)

extends sparklistener with logging {

import jobprogresslistener._

type jobid = int

type stageid = int

type stageattemptid = int

type poolname = string

type executorid = string

    // jobs:

val activejobs = new hashmap[jobid, jobuidata]

val completedjobs = listbuffer[jobuidata]()

val failedjobs = listbuffer[jobuidata]()

val jobidtodata = new hashmap[jobid, jobuidata]

// stages:

val activestages = new hashmap[stageid, stageinfo]

val completedstages = listbuffer[stageinfo]()

val skippedstages = listbuffer[stageinfo]()

val failedstages = listbuffer[stageinfo]()

val stageidtodata = new hashmap[(stageid, stageattemptid), stageuidata]

val stageidtoinfo = new hashmap[stageid, stageinfo]

val stageidtoactivejobids = new hashmap[stageid, hashset[jobid]]

val pooltoactivestages = hashmap[poolname, hashmap[stageid,

stageinfo]]()

var numcompletedstages = 0         //

總共完成的stage數量

var numfailedstages = 0         // 總共失敗的stage數量

// misc:

val executoridtoblockmanagerid = hashmap[executorid, blockmanagerid]()

def blockmanagerids = executoridtoblockmanagerid.values.toseq

var schedulingmode: option[schedulingmode] = none

// number of non-active jobs and stages (there is no limit for active

jobs   and stages):

val retainedstages = conf.getint("spark.ui.retainedstages",

default_retained_stages)

val retainedjobs = conf.getint("spark.ui.retainedjobs",

default_retained_jobs)

jobprogresslistener 實作了onjobstart、onjobend、onstagecompleted、onstagesubmitted、ontaskstart、ontaskend等方法,這些方法正是在listenerbus的驅動下,改變jobprogress-listener中的各種job、stage相關的資料。

<b>3.4.3 sparkui的建立與初始化</b>

sparkui的建立,見代碼清單3-18。

代碼清單3-18 sparkui的聲明

private[spark] val ui: option[sparkui] =

if (conf.getboolean("spark.ui.enabled", true)) {

some(sparkui.createliveui(this, conf, listenerbus, jobprogresslistener,

env.securitymanager,appname))

none

ui.foreach(_.bind())

可以看到如果不需要提供sparkui服務,可以将屬性spark.ui.enabled修改為false。其中createliveui實際是調用了create方法,見代碼清單3-19。

代碼清單3-19 sparkui的建立

def createliveui(

sc: sparkcontext,

conf: sparkconf,

listenerbus: sparklistenerbus,

jobprogresslistener: jobprogresslistener,

securitymanager: securitymanager,

appname: string): sparkui =  {

create(some(sc), conf, listenerbus, securitymanager, appname,

jobprogresslistener = some(jobprogresslistener))

  }

create方法的實作參見代碼清單3-20。

代碼清單3-20 creat方法的實作

private def create(

sc: option[sparkcontext],

appname: string,

basepath: string = "",

jobprogresslistener: option[jobprogresslistener] = none): sparkui = {

val _jobprogresslistener: jobprogresslistener =

jobprogresslistener.getorelse {

val listener = new jobprogresslistener(conf)

listenerbus.addlistener(listener)

listener

val environmentlistener = new environmentlistener

val storagestatuslistener = new storagestatuslistener

val executorslistener = new executorslistener(storagestatuslistener)

val storagelistener = new storagelistener(storagestatuslistener)

listenerbus.addlistener(environmentlistener)

listenerbus.addlistener(storagestatuslistener)

listenerbus.addlistener(executorslistener)

listenerbus.addlistener(storagelistener)

new sparkui(sc, conf, securitymanager, environmentlistener,

storagestatuslistener,

executorslistener, _jobprogresslistener, storagelistener, appname,

basepath)

根據代碼清單3-20,可以知道在create方法裡除了jobprogresslistener是外部傳入的之外,又增加了一些sparklistener。例如,用于對jvm參數、spark屬性、java系統屬性、classpath等進行監控的environmentlistener;用于維護executor的存儲狀态的storagestatuslistener;用于準備将executor的資訊展示在executorstab的executorslistener;用于準備将executor相關存儲資訊展示在blockmanagerui的storagelistener等。最後建立sparkui,spark ui服務預設是可以被殺掉的,通過修改屬性spark.ui.killenabled為false可以保證不被殺死。initialize方法會組織前端頁面各個tab和page的展示及布局,參見代碼清單3-21。

代碼清單3-21 sparkui的初始化

private[spark] class sparkui private (

val sc: option[sparkcontext],

val conf: sparkconf,

val securitymanager: securitymanager,

val environmentlistener: environmentlistener,

val storagestatuslistener: storagestatuslistener,

val executorslistener: executorslistener,

val jobprogresslistener: jobprogresslistener,

val storagelistener: storagelistener,

var appname: string,

val basepath: string)

extends webui(securitymanager,

sparkui.getuiport(conf), conf, basepath, "sparkui")

with logging {

val killenabled =

sc.map(_.conf.getboolean("spark.ui.killenabled",

true)).getorelse(false)

/** initialize all components of the

server. */

def initialize() {

attachtab(new jobstab(this))

val stagestab = new stagestab(this)

attachtab(stagestab)

attachtab(new storagetab(this))

attachtab(new environmenttab(this))

attachtab(new executorstab(this))

attachhandler(createstatichandler(sparkui.static_resource_dir,

"/static"))

attachhandler(createredirecthandler("/", "/jobs",

basepath = basepath))

attachhandler(

createredirecthandler("/stages/stage/kill",

"/stages", stagestab.handlekillrequest))

initialize()

<b>3.4.4 spark ui的頁面布局與展示</b>

sparkui究竟是如何實作頁面布局及展示的?jobstab展示所有job的進度、狀态資訊,這裡我們以它為例來說明。jobstab會複用sparkui的killenabled、sparkcontext、job-progresslistener,包括alljobspage和jobpage兩個頁面,見代碼清單3-22。

代碼清單3-22 jobstab的實作

private[ui] class jobstab(parent: sparkui)

extends sparkuitab(parent, "jobs") {

val sc = parent.sc

val killenabled = parent.killenabled

def isfairscheduler = listener.schedulingmode.exists(_ ==

schedulingmode.fair)

val listener = parent.jobprogresslistener

attachpage(new alljobspage(this))

attachpage(new jobpage(this))

alljobspage由render方法渲染,利用jobprogresslistener中的統計監控資料生成激活、完成、失敗等狀态的job摘要資訊,并調用jobstable方法生成表格等html元素,最終使用uiutils的headersparkpage封裝好css、js、header及頁面布局等,見代碼清單3-23。

代碼清單3-23 alljobspage的實作

def render(request: httpservletrequest):

seq[node] = {

listener.synchronized {

val activejobs = listener.activejobs.values.toseq

val completedjobs = listener.completedjobs.reverse.toseq

val failedjobs = listener.failedjobs.reverse.toseq

val now = system.currenttimemillis

val activejobstable =

jobstable(activejobs.sortby(_.starttime.getorelse(-1l)).reverse)

val completedjobstable =

jobstable(completedjobs.sortby(_.endtime.getorelse(-1l)).reverse)

val failedjobstable =

jobstable(failedjobs.sortby(_.endtime.getorelse(-1l)).reverse)

val summary: nodeseq =

&lt;div&gt;

                &lt;ul

class="unstyled"&gt;

                    {if (starttime.isdefined) {

                        // total duration is

not meaningful unless the ui is live

                        &lt;li&gt;

                            &lt;strong&gt;total

duration: &lt;/strong&gt;

{uiutils.formatduration(now - starttime.get)}

                        &lt;/li&gt;

                    }}

                    &lt;li&gt;

&lt;strong&gt;scheduling mode: &lt;/strong&gt;

{listener.schedulingmode.map(_.tostring).getorelse("unknown")}

                    &lt;/li&gt;

                        &lt;a

href="#active"&gt;&lt;strong&gt;active jobs:&lt;/strong&gt;&lt;/a&gt;

                        {activejobs.size}

href="#completed"&gt;&lt;strong&gt;completed

jobs:&lt;/strong&gt;&lt;/a&gt;

                        {completedjobs.size}

href="#failed"&gt;&lt;strong&gt;failed jobs:&lt;/strong&gt;&lt;/a&gt;

                        {failedjobs.size}

                &lt;/ul&gt;

&lt;/div&gt;

jobstable用來生成表格資料,見代碼清單3-24。

代碼清單3-24 jobstable處理表格的實作

private def jobstable(jobs:

seq[jobuidata]): seq[node] = {

val somejobhasjobgroup = jobs.exists(_.jobgroup.isdefined)

val columns: seq[node] = {

&lt;th&gt;{if (somejobhasjobgroup) "job id (job group)" else

"job id"}&lt;/th&gt;

&lt;th&gt;description&lt;/th&gt;

&lt;th&gt;submitted&lt;/th&gt;

&lt;th&gt;duration&lt;/th&gt;

&lt;th class="sorttable_nosort"&gt;stages:

succeeded/total&lt;/th&gt;

&lt;th class="sorttable_nosort"&gt;tasks (for all stages):

&lt;table class="table table-bordered table-striped table-condensed

sortable"&gt;

&lt;thead&gt;{columns}&lt;/thead&gt;

&lt;tbody&gt;

{jobs.map(makerow)}

  &lt;/tbody&gt;

&lt;/table&gt;

表格中每行資料又是通過makerow方法渲染的,參見代碼清單3-25。

代碼清單3-25 生成表格中的行

def makerow(job: jobuidata): seq[node] = {

val laststageinfo = option(job.stageids)

.filter(_.nonempty)

.flatmap { ids =&gt; listener.stageidtoinfo.get(ids.max) }

val laststagedata = laststageinfo.flatmap { s =&gt;

listener.stageidtodata.get((s.stageid, s.attemptid))

val iscomplete = job.status == jobexecutionstatus.succeeded

val laststagename = laststageinfo.map(_.name).getorelse("(unknown

stage name)")

val laststagedescription =

laststagedata.flatmap(_.description).getorelse("")

val duration: option[long] = {

job.starttime.map { start =&gt;

val end = job.endtime.getorelse(system.currenttimemillis())

end - start

val formattedduration = duration.map(d =&gt;

uiutils.formatduration(d)).getorelse("unknown")

val formattedsubmissiontime =

job.starttime.map(uiutils.formatdate).getorelse("unknown")

val detailurl =

"%s/jobs/job?id=%s".format(uiutils.prependbaseuri(parent.basepath),

job.jobid)

&lt;tr&gt;

&lt;td sorttable_customkey={job.jobid.tostring}&gt;

{job.jobid} {job.jobgroup.map(id =&gt;

s"($id)").getorelse("")}

&lt;/td&gt;

&lt;td&gt;

      &lt;div&gt;&lt;em&gt;{laststagedescription}&lt;/em&gt;&lt;/div&gt;

&lt;a href={detailurl}&gt;{laststagename}&lt;/a&gt;

&lt;td sorttable_customkey={job.starttime.getorelse(-1).tostring}&gt;

{formattedsubmissiontime}

&lt;td sorttable_customkey={duration.getorelse(-1).tostring}&gt;{formatted-duration}&lt;/td&gt;

&lt;td class="stage-progress-cell"&gt;

{job.completedstageindices.size}/{job.stageids.size -

job.numskipped-stages}

{if (job.numfailedstages &gt; 0) s"(${job.numfailedstages}

failed)"}

{if (job.numskippedstages &gt; 0) s"(${job.numskippedstages}

skipped)"}

&lt;td class="progress-cell"&gt;

{uiutils.makeprogressbar(started = job.numactivetasks, completed =

job.numcompletedtasks,

failed = job.numfailedtasks, skipped = job.numskippedtasks,

total = job.numtasks - job.numskippedtasks)}

&lt;/tr&gt;

代碼清單3-22中的attachpage方法存在于jobstab的父類webuitab中,webuitab維護有arraybuffer[webuipage]的資料結構,alljobspage和jobpage将被放入此arraybuffer中,參見代碼清單3-26。

代碼清單3-26 webuitab的實作

private[spark] abstract class

webuitab(parent: webui, val prefix: string) {

val pages = arraybuffer[webuipage]()

val name = prefix.capitalize

/** attach a page to this tab. this prepends the page's prefix with the

tab's own prefix. */

def attachpage(page: webuipage) {

page.prefix = (prefix + "/" +

page.prefix).stripsuffix("/")

pages += page

/** get a list of header tabs from the parent ui. */

def headertabs: seq[webuitab] = parent.gettabs

def basepath: string = parent.getbasepath

jobstab建立之後,将被attachtab方法加入sparkui的arraybuffer[webuitab]中,并且通過attachpage方法,給每一個page生成org.eclipse.jetty.servlet.servletcontexthandler,最後調用attachhandler方法将servletcontexthandler綁定到sparkui,即加入到handlers

:arraybuffer[servletcontexthandler]和樣例類serverinfo的roothandler(contexthandlercollection)中。sparkui繼承自webui,attachtab方法在webui中實作,參見代碼清單3-27。

代碼清單3-27 webui的實作

private[spark] abstract class webui(

securitymanager: securitymanager, port: int,

conf: sparkconf, basepath: string = "", name: string =

"") extends logging {

protected val tabs = arraybuffer[webuitab]()

protected val handlers = arraybuffer[servletcontexthandler]()

protected var serverinfo: option[serverinfo] = none

protected val localhostname = utils.localhostname()

protected val publichostname =

option(system.getenv("spark_public_dns")).getorelse(localhostname)

private val classname = utils.getformattedclassname(this)

def getbasepath: string = basepath

def gettabs: seq[webuitab] = tabs.toseq

def gethandlers: seq[servletcontexthandler] = handlers.toseq

def getsecuritymanager: securitymanager = securitymanager

/** attach a tab to this ui, along with all of its attached pages. */

def attachtab(tab: webuitab) {

tab.pages.foreach(attachpage)

tabs += tab

/** attach a page to this ui. */

 val pagepath = "/" +

page.prefix

attachhandler(createservlethandler(pagepath,

(request: httpservletrequest) =&gt; page.render(request),

securitymanager, basepath))

attachhandler(createservlethandler(pagepath.stripsuffix("/") +

"/json",

        (request: httpservletrequest) =&gt;

page.renderjson(request), security-manager, basepath))

/** attach a handler to this ui. */

def attachhandler(handler: servletcontexthandler) {

handlers += handler

serverinfo.foreach { info =&gt;

info.roothandler.addhandler(handler)

if (!handler.isstarted) {

                handler.start()

由于代碼清單3-27所在的類中使用import org.apache.spark.ui.jettyutils._導入了jettyutils的靜态方法,是以createservlethandler方法實際是jettyutils

的靜态方法createservlethandler。createservlethandler實際建立了javax.servlet.http.httpservlet的匿名内部類執行個體,此執行個體實際使用(request:

httpservletrequest) =&gt; page.render(request)函數參數來處理請求,進而渲染頁面呈現給使用者。有關createservlethandler的實作及jetty的相關資訊,請參閱附錄c。

<b>3.4.5 sparkui的啟動</b>

sparkui建立好後,需要調用父類webui的bind方法,綁定服務和端口,bind方法中主要的代碼實作如下。

serverinfo =

some(startjettyserver("0.0.0.0", port, handlers, conf, name))

jettyutils的靜态方法startjettyserver的實作請參閱附錄c。最終啟動了jetty提供的服務,預設端口是4040。