<b>3.9 啟動測量系統metricssystem</b>
metricssystem使用codahale提供的第三方測量倉庫metrics,有關metrics的具體資訊可以參考附錄d。metricssystem中有三個概念:
instance:指定了誰在使用測量系統;
source:指定了從哪裡收集測量資料;
sink:指定了往哪裡輸出測量資料。
spark按照instance的不同,區分為master、worker、application、driver和executor。
spark目前提供的sink有consolesink、csvsink、jmxsink、metricsservlet、graphitesink等。
spark中使用metricsservlet作為預設的sink。
metricssystem的啟動代碼如下。
val metricssystem = env.metricssystem
metricssystem.start()
metricssystem的啟動過程包括以下步驟:
1)注冊sources;
2)注冊sinks;
3)給sinks增加jetty的servletcontexthandler。
metricssystem啟動完畢後,會周遊與sinks有關的servletcontexthandler,并調用attach-handler将它們綁定到spark
ui上。
metricssystem.getservlethandlers.foreach(handler
=> ui.foreach(_.attachhandler (handler)))
3.9.1 注冊sources
registersources方法用于注冊sources,告訴測量系統從哪裡收集測量資料,它的實作見代碼清單3-45。注冊sources的過程分為以下步驟:
1)從metricsconfig擷取driver的properties,預設為建立metricssystem的過程中解析的{sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet,
sink.servlet.path=/metrics/json}。
2)用正則比對driver的properties中以source.開頭的屬性。然後将屬性中的source反射得到的執行個體加入arraybuffer[source]。
3)将每個source的metricregistry(也是metricset的子類型)注冊到concurrent-map<string,
metric> metrics。這裡的registersource方法已在3.8.2節講解過。
代碼清單3-45 metricssystem注冊sources的實作
private def registersources() {
val instconfig = metricsconfig.getinstance(instance)
val sourceconfigs = metricsconfig.subproperties(instconfig,
metricssystem.source_regex)
// register all the sources related to instance
sourceconfigs.foreach { kv =>
val classpath = kv._2.getproperty("class")
try {
val source = class.forname(classpath).newinstance()
registersource(source.asinstanceof[source])
} catch {
case e: exception => logerror("source class " + classpath +
" cannot be instantiated", e)
}
}
3.9.2 注冊sinks
registersinks方法用于注冊sinks,即告訴測量系統metricssystem往哪裡輸出測量資料,它的實作見代碼清單3-46。注冊sinks的步驟如下:
1)從driver的properties中用正則比對以sink.開頭的屬性,如{sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet,
sink.servlet.path=/metrics/json},将其轉換為map(servlet ->
{class=org.apache.spark.metrics.sink.metricsservlet, path=/metrics/json})。
2)将子屬性class對應的類metricsservlet反射得到metricsservlet執行個體。如果屬性的key是servlet,将其設定為metricsservlet;如果是sink,則加入到arraybuffer[sink]中。
代碼清單3-46 metricssystem注冊sinks的實作
private def registersinks() {
val sinkconfigs = metricsconfig.subproperties(instconfig,
metricssystem.sink_regex)
sinkconfigs.foreach { kv =>
if (null != classpath) {
val sink =
class.forname(classpath)
.getconstructor(classof[properties],
classof[metricregistry], classof[securitymanager])
.newinstance(kv._2, registry,
securitymgr)
if (kv._1 == "servlet") {
metricsservlet =
some(sink.asinstanceof[metricsservlet])
} else {
sinks += sink.asinstanceof[sink]
case e: exception =>
logerror("sink class "+ classpath + " cannot be
instantialized",e)
3.9.3 給sinks增加jetty的servletcontexthandler
為了能夠在sparkui(網頁)通路到測量資料,是以需要給sinks增加jetty的servlet-contexthandler,這裡主要用到metricssystem的getservlethandlers方法實作如下。
def getservlethandlers = {
require(running, "can only call getservlethandlers on a running
metricssystem")
metricsservlet.map(_.gethandlers).getorelse(array())
可以看到調用了metricsservlet的gethandlers,其實作如下。
def gethandlers =
array[servletcontexthandler](
createservlethandler(servletpath,
new servletparams(request => getmetricssnapshot(request),
"text/json"), securitymgr)
)
最終生成處理/metrics/json請求的servletcontexthandler,而請求的真正處理由get-metricssnapshot方法,利用fastjson解析。生成的servletcontexthandler通過sparkui的attachhandler方法,也被綁定到sparkui(creatservlethandler與attachhandler方法在3.4.4節詳細講述過)。最終我們可以使用以下這些位址來通路測量資料。
http://localhost:4040/metrics/applications/json。
http://localhost:4040/metrics/json。
http://localhost:4040/metrics/master/json。