<b>3.9 启动测量系统metricssystem</b>
metricssystem使用codahale提供的第三方测量仓库metrics,有关metrics的具体信息可以参考附录d。metricssystem中有三个概念:
instance:指定了谁在使用测量系统;
source:指定了从哪里收集测量数据;
sink:指定了往哪里输出测量数据。
spark按照instance的不同,区分为master、worker、application、driver和executor。
spark目前提供的sink有consolesink、csvsink、jmxsink、metricsservlet、graphitesink等。
spark中使用metricsservlet作为默认的sink。
metricssystem的启动代码如下。
val metricssystem = env.metricssystem
metricssystem.start()
metricssystem的启动过程包括以下步骤:
1)注册sources;
2)注册sinks;
3)给sinks增加jetty的servletcontexthandler。
metricssystem启动完毕后,会遍历与sinks有关的servletcontexthandler,并调用attach-handler将它们绑定到spark
ui上。
metricssystem.getservlethandlers.foreach(handler
=> ui.foreach(_.attachhandler (handler)))
3.9.1 注册sources
registersources方法用于注册sources,告诉测量系统从哪里收集测量数据,它的实现见代码清单3-45。注册sources的过程分为以下步骤:
1)从metricsconfig获取driver的properties,默认为创建metricssystem的过程中解析的{sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet,
sink.servlet.path=/metrics/json}。
2)用正则匹配driver的properties中以source.开头的属性。然后将属性中的source反射得到的实例加入arraybuffer[source]。
3)将每个source的metricregistry(也是metricset的子类型)注册到concurrent-map<string,
metric> metrics。这里的registersource方法已在3.8.2节讲解过。
代码清单3-45 metricssystem注册sources的实现
private def registersources() {
val instconfig = metricsconfig.getinstance(instance)
val sourceconfigs = metricsconfig.subproperties(instconfig,
metricssystem.source_regex)
// register all the sources related to instance
sourceconfigs.foreach { kv =>
val classpath = kv._2.getproperty("class")
try {
val source = class.forname(classpath).newinstance()
registersource(source.asinstanceof[source])
} catch {
case e: exception => logerror("source class " + classpath +
" cannot be instantiated", e)
}
}
3.9.2 注册sinks
registersinks方法用于注册sinks,即告诉测量系统metricssystem往哪里输出测量数据,它的实现见代码清单3-46。注册sinks的步骤如下:
1)从driver的properties中用正则匹配以sink.开头的属性,如{sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet,
sink.servlet.path=/metrics/json},将其转换为map(servlet ->
{class=org.apache.spark.metrics.sink.metricsservlet, path=/metrics/json})。
2)将子属性class对应的类metricsservlet反射得到metricsservlet实例。如果属性的key是servlet,将其设置为metricsservlet;如果是sink,则加入到arraybuffer[sink]中。
代码清单3-46 metricssystem注册sinks的实现
private def registersinks() {
val sinkconfigs = metricsconfig.subproperties(instconfig,
metricssystem.sink_regex)
sinkconfigs.foreach { kv =>
if (null != classpath) {
val sink =
class.forname(classpath)
.getconstructor(classof[properties],
classof[metricregistry], classof[securitymanager])
.newinstance(kv._2, registry,
securitymgr)
if (kv._1 == "servlet") {
metricsservlet =
some(sink.asinstanceof[metricsservlet])
} else {
sinks += sink.asinstanceof[sink]
case e: exception =>
logerror("sink class "+ classpath + " cannot be
instantialized",e)
3.9.3 给sinks增加jetty的servletcontexthandler
为了能够在sparkui(网页)访问到测量数据,所以需要给sinks增加jetty的servlet-contexthandler,这里主要用到metricssystem的getservlethandlers方法实现如下。
def getservlethandlers = {
require(running, "can only call getservlethandlers on a running
metricssystem")
metricsservlet.map(_.gethandlers).getorelse(array())
可以看到调用了metricsservlet的gethandlers,其实现如下。
def gethandlers =
array[servletcontexthandler](
createservlethandler(servletpath,
new servletparams(request => getmetricssnapshot(request),
"text/json"), securitymgr)
)
最终生成处理/metrics/json请求的servletcontexthandler,而请求的真正处理由get-metricssnapshot方法,利用fastjson解析。生成的servletcontexthandler通过sparkui的attachhandler方法,也被绑定到sparkui(creatservlethandler与attachhandler方法在3.4.4节详细讲述过)。最终我们可以使用以下这些地址来访问测量数据。
http://localhost:4040/metrics/applications/json。
http://localhost:4040/metrics/json。
http://localhost:4040/metrics/master/json。