天天看點

深入了解Spark:核心思想與源碼分析. 3.9 啟動測量系統MetricsSystem

<b>3.9 啟動測量系統metricssystem</b>

metricssystem使用codahale提供的第三方測量倉庫metrics,有關metrics的具體資訊可以參考附錄d。metricssystem中有三個概念:

instance:指定了誰在使用測量系統;

source:指定了從哪裡收集測量資料;

sink:指定了往哪裡輸出測量資料。

spark按照instance的不同,區分為master、worker、application、driver和executor。

spark目前提供的sink有consolesink、csvsink、jmxsink、metricsservlet、graphitesink等。

spark中使用metricsservlet作為預設的sink。

metricssystem的啟動代碼如下。

val metricssystem = env.metricssystem

metricssystem.start()

metricssystem的啟動過程包括以下步驟:

1)注冊sources;

2)注冊sinks;

3)給sinks增加jetty的servletcontexthandler。

metricssystem啟動完畢後,會周遊與sinks有關的servletcontexthandler,并調用attach-handler将它們綁定到spark

ui上。

metricssystem.getservlethandlers.foreach(handler

=&gt; ui.foreach(_.attachhandler (handler)))

3.9.1 注冊sources

registersources方法用于注冊sources,告訴測量系統從哪裡收集測量資料,它的實作見代碼清單3-45。注冊sources的過程分為以下步驟:

1)從metricsconfig擷取driver的properties,預設為建立metricssystem的過程中解析的{sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet,

sink.servlet.path=/metrics/json}。

2)用正則比對driver的properties中以source.開頭的屬性。然後将屬性中的source反射得到的執行個體加入arraybuffer[source]。

3)将每個source的metricregistry(也是metricset的子類型)注冊到concurrent-map&lt;string,

metric&gt; metrics。這裡的registersource方法已在3.8.2節講解過。

代碼清單3-45 metricssystem注冊sources的實作

private def registersources() {

val instconfig = metricsconfig.getinstance(instance)

val sourceconfigs = metricsconfig.subproperties(instconfig,

metricssystem.source_regex)

// register all the sources related to instance

sourceconfigs.foreach { kv =&gt;

val classpath = kv._2.getproperty("class")

try {

val source = class.forname(classpath).newinstance()

registersource(source.asinstanceof[source])

} catch {

case e: exception =&gt; logerror("source class " + classpath +

" cannot be instantiated", e)

}

    }

3.9.2 注冊sinks

registersinks方法用于注冊sinks,即告訴測量系統metricssystem往哪裡輸出測量資料,它的實作見代碼清單3-46。注冊sinks的步驟如下:

1)從driver的properties中用正則比對以sink.開頭的屬性,如{sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet,

sink.servlet.path=/metrics/json},将其轉換為map(servlet -&gt;

{class=org.apache.spark.metrics.sink.metricsservlet, path=/metrics/json})。

2)将子屬性class對應的類metricsservlet反射得到metricsservlet執行個體。如果屬性的key是servlet,将其設定為metricsservlet;如果是sink,則加入到arraybuffer[sink]中。

代碼清單3-46 metricssystem注冊sinks的實作

private def registersinks() {

val sinkconfigs = metricsconfig.subproperties(instconfig,

metricssystem.sink_regex)

sinkconfigs.foreach { kv =&gt;

if (null != classpath) {

                val sink =

class.forname(classpath)

                .getconstructor(classof[properties],

classof[metricregistry], classof[securitymanager])

                .newinstance(kv._2, registry,

securitymgr)

if (kv._1 == "servlet") {

                metricsservlet =

some(sink.asinstanceof[metricsservlet])

} else {

                sinks += sink.asinstanceof[sink]

                case e: exception =&gt;

logerror("sink class "+ classpath + " cannot be

instantialized",e)

3.9.3 給sinks增加jetty的servletcontexthandler

為了能夠在sparkui(網頁)通路到測量資料,是以需要給sinks增加jetty的servlet-contexthandler,這裡主要用到metricssystem的getservlethandlers方法實作如下。

def getservlethandlers = {

require(running, "can only call getservlethandlers on a running

metricssystem")

metricsservlet.map(_.gethandlers).getorelse(array())

可以看到調用了metricsservlet的gethandlers,其實作如下。

def gethandlers =

array[servletcontexthandler](

createservlethandler(servletpath,

new servletparams(request =&gt; getmetricssnapshot(request),

"text/json"), securitymgr)

)

最終生成處理/metrics/json請求的servletcontexthandler,而請求的真正處理由get-metricssnapshot方法,利用fastjson解析。生成的servletcontexthandler通過sparkui的attachhandler方法,也被綁定到sparkui(creatservlethandler與attachhandler方法在3.4.4節詳細講述過)。最終我們可以使用以下這些位址來通路測量資料。

http://localhost:4040/metrics/applications/json。

http://localhost:4040/metrics/json。

http://localhost:4040/metrics/master/json。

繼續閱讀