天天看点

Storm-源码分析- metric

首先定义一系列metric相关的interface, imetric, ireducer, icombiner (backtype.storm.metric.api)

在task中, 创建一系列builtin-metrics, (backtype.storm.daemon.builtin-metrics), 并注册到topology context里面

task会不断的利用如spout-acked-tuple!的functions去更新这些builtin-metrics

task会定期将builtin-metrics里面的统计数据通过metrics-stream发送给metric-bolt (backtype.storm.metric.metricsconsumerbolt, 该bolt会创建实现backtype.storm.metric.api.imetricsconsumer的对象, 用于计算出metrics) 

然后如何使用这些metrics? 

由于这是builtin metrics, 是不会被外界使用的 

如果处理这些metrics, 取决于_metricsconsumer.handledatapoints, 这里的_metricsconsumer是通过topology's configuration配置的 

比如backtype.storm.metric.loggingmetricsconsumer, 如果使用这个consumer就会将metrics写入log中

countmetric, 计数, reset时清零 

assignablemetric, 赋值, 不用reset 

multicountmetric, 使用hashmap记录多个count, reset时分别对每个count对象调用getvalueandreset

combinedmetric, 结合icombiner和imetric

public interface ireducer<t> { 

    t init(); 

    t reduce(t accumulator, object input); 

    object extractresult(t accumulator); 

}

实现ireducer接口, 实现平均数reducer, reduce里面累加和计数, extractresult里面acc/count求平均数

reducedmetric

结合ireducer和imetric

这个interface, 内嵌taskinfo和datapoint类 

handledatapoints, 添加逻辑以处理task对应的一系列datapoint

定义spout和bolt所需要的一些metric, 主要两个record, builtinspoutmetrics和builtinboltmetrics, [metric-name, metric-object]的hashmap

在mk-task-data的时候, 调用make-data来创建相应的metrics,

并在executor的mk-threads中, 会将这些builtin-metrics注册到topologycontext中去,

上面完成的builtin-metrics的创建和注册, 接着定义了一系列用于更新metrics的functions,

以spout-acked-tuple!为例, 需要更新multicountmetric ack-count和multireducedmetric complete-latency 

.scope从multicountmetric取出某个countmetric, 然后incrby来将stats的rate增加到count上

创建实现imetricsconsumer的对象, 并在execute里面调用handledatapoints

systembolt, 根据comments里面说的, 每个worker都有一个, taskid=-1 

定义些system相关的metric, 并注册到topologycontext里面

需要使用java调用clojure, 所以需要import下面的package

并且用到些用于监控memory和jvm的java package

这个bolt的特点是, 只有prepare实现了逻辑, 并且通过_preparewascalled保证prepare只被执行一次 

prepare中的逻辑, 主要就是定义各种metric, 并且通过registermetric注册到topologycontext中 

metic包含, jvm的运行时间, 开始时间, memory情况, 和每个garbagecollector的情况 

注册的这些system metrics也会一起被发送到metricsconsumerbolt进行处理 

这应该用spout实现, 为啥用bolt实现?

这里会动态的往topology里面, 加入metric-component (metricsconsumerbolt) 和system-component (systembolt), 以及相应的steam信息

system-topology!会往topology加上些东西

1. acker, 后面再说 

2. metric-bolt, input是所有component的tasks发来的metrics-stream, 没有output 

3. system-bolt, 没有input, output是两个tick-stream 

4. 给所有component, 增加额外的输出metrics-stream, system-stream

看下thrift中的定义, 往topology里面增加一个blot component, 其实就是往hashmap中增加一组[string, bolt] 

关键就是看看如何使用thrift/mk-bolt-spec*来创建blot spec

metric-components

首先, topology里面所有的component(包含system component), 都需要往metics-bolt发送统计数据, 所以component-ids-that-emit-metrics就是all-components-ids+system-component-id 

那么对于任意一个comp, 都会对metics-bolt产生如下输入, {[comp-id metrics-stream-id] :shuffle} (采用:suffle grouping方式)

然后, 用thrift/mk-bolt-spec*来定义创建bolt的fn, mk-bolt-spec

最后, 调用mk-bolt-spec来创建metics-bolt的spec, 参考上面的定义 

关键就是, 创建metricsconsumerbolt对象, 需要从storm-conf里面读出, metricsconsumer的实现类和参数 

这个bolt负责, 将从各个task接收到的数据, 调用handledatapoints生成metircs, 参考前面的定义

给每个component增加两个output stream 

metrics-stream-id, 发送给metric-blot, 数据结构为output-fields ["task-info" "data-points"] 

system-stream-id, ,数据结构为output-fields ["event"]