天天看点

深入理解Spark:核心思想与源码分析. 3.9 启动测量系统MetricsSystem

<b>3.9 启动测量系统metricssystem</b>

metricssystem使用codahale提供的第三方测量仓库metrics,有关metrics的具体信息可以参考附录d。metricssystem中有三个概念:

instance:指定了谁在使用测量系统;

source:指定了从哪里收集测量数据;

sink:指定了往哪里输出测量数据。

spark按照instance的不同,区分为master、worker、application、driver和executor。

spark目前提供的sink有consolesink、csvsink、jmxsink、metricsservlet、graphitesink等。

spark中使用metricsservlet作为默认的sink。

metricssystem的启动代码如下。

val metricssystem = env.metricssystem

metricssystem.start()

metricssystem的启动过程包括以下步骤:

1)注册sources;

2)注册sinks;

3)给sinks增加jetty的servletcontexthandler。

metricssystem启动完毕后,会遍历与sinks有关的servletcontexthandler,并调用attach-handler将它们绑定到spark

ui上。

metricssystem.getservlethandlers.foreach(handler

=&gt; ui.foreach(_.attachhandler (handler)))

3.9.1 注册sources

registersources方法用于注册sources,告诉测量系统从哪里收集测量数据,它的实现见代码清单3-45。注册sources的过程分为以下步骤:

1)从metricsconfig获取driver的properties,默认为创建metricssystem的过程中解析的{sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet,

sink.servlet.path=/metrics/json}。

2)用正则匹配driver的properties中以source.开头的属性。然后将属性中的source反射得到的实例加入arraybuffer[source]。

3)将每个source的metricregistry(也是metricset的子类型)注册到concurrent-map&lt;string,

metric&gt; metrics。这里的registersource方法已在3.8.2节讲解过。

代码清单3-45 metricssystem注册sources的实现

private def registersources() {

val instconfig = metricsconfig.getinstance(instance)

val sourceconfigs = metricsconfig.subproperties(instconfig,

metricssystem.source_regex)

// register all the sources related to instance

sourceconfigs.foreach { kv =&gt;

val classpath = kv._2.getproperty("class")

try {

val source = class.forname(classpath).newinstance()

registersource(source.asinstanceof[source])

} catch {

case e: exception =&gt; logerror("source class " + classpath +

" cannot be instantiated", e)

}

    }

3.9.2 注册sinks

registersinks方法用于注册sinks,即告诉测量系统metricssystem往哪里输出测量数据,它的实现见代码清单3-46。注册sinks的步骤如下:

1)从driver的properties中用正则匹配以sink.开头的属性,如{sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet,

sink.servlet.path=/metrics/json},将其转换为map(servlet -&gt;

{class=org.apache.spark.metrics.sink.metricsservlet, path=/metrics/json})。

2)将子属性class对应的类metricsservlet反射得到metricsservlet实例。如果属性的key是servlet,将其设置为metricsservlet;如果是sink,则加入到arraybuffer[sink]中。

代码清单3-46 metricssystem注册sinks的实现

private def registersinks() {

val sinkconfigs = metricsconfig.subproperties(instconfig,

metricssystem.sink_regex)

sinkconfigs.foreach { kv =&gt;

if (null != classpath) {

                val sink =

class.forname(classpath)

                .getconstructor(classof[properties],

classof[metricregistry], classof[securitymanager])

                .newinstance(kv._2, registry,

securitymgr)

if (kv._1 == "servlet") {

                metricsservlet =

some(sink.asinstanceof[metricsservlet])

} else {

                sinks += sink.asinstanceof[sink]

                case e: exception =&gt;

logerror("sink class "+ classpath + " cannot be

instantialized",e)

3.9.3 给sinks增加jetty的servletcontexthandler

为了能够在sparkui(网页)访问到测量数据,所以需要给sinks增加jetty的servlet-contexthandler,这里主要用到metricssystem的getservlethandlers方法实现如下。

def getservlethandlers = {

require(running, "can only call getservlethandlers on a running

metricssystem")

metricsservlet.map(_.gethandlers).getorelse(array())

可以看到调用了metricsservlet的gethandlers,其实现如下。

def gethandlers =

array[servletcontexthandler](

createservlethandler(servletpath,

new servletparams(request =&gt; getmetricssnapshot(request),

"text/json"), securitymgr)

)

最终生成处理/metrics/json请求的servletcontexthandler,而请求的真正处理由get-metricssnapshot方法,利用fastjson解析。生成的servletcontexthandler通过sparkui的attachhandler方法,也被绑定到sparkui(creatservlethandler与attachhandler方法在3.4.4节详细讲述过)。最终我们可以使用以下这些地址来访问测量数据。

http://localhost:4040/metrics/applications/json。

http://localhost:4040/metrics/json。

http://localhost:4040/metrics/master/json。

继续阅读