天天看点

Storm-源码分析-Stats (backtype.storm.stats)

会发现, 现在storm里面有两套metrics系统, metrics framework和stats framework

并且在所有地方都是同时注册两套, 貌似准备用metrics来替代stats, 但当前版本ui仍然使用stats

这个模块统计的数据怎么被使用,

1. 在worker中, 会定期调用do-executor-heartbeats去往zk同步hb 

可以看到, stats也会作为hb的一部分被同步到zk上

executor-stats stats

2. 现在任何人都可以通过nimbus的thrift接口来得到相关信息 

3. 最直接的用户就是storm ui, 在准备topology page的时候, 就会调用gettopologyinfo来获取数据

这个模块用于spout和bolt来抽样统计数据, 需要统计的具体metics如下

抽样的比例在storm-conf, topology_stats_sample_rate, 配置

为什么统计时每次加rate, 而不是加1? 因为这里的统计是抽样的, 所以如果抽样比例是10%, 那么发现一个, 应该加1/(10%), 10个

然后统计是基于时间窗口的, 底下是对应默认的bucket和时间窗口的定义

核心数据结构是rollingwindowset, 包含: 

统计数据需要的函数, updater extractor, 之所以治理也需要是因为需要统计all-time  

一组rolling windows, 默认是3个时间窗, 10 minutes, 3 hours, 1 day 

all-time, 在完整的时间区间上的统计结果

继续看看rolling window的定义, 

核心数据, buckets, hashmap, {streamid, data}, 初始化为{} 

统计data需要的函数, updater merger extractor 

时间窗口, buckets大小和buckets个数

在mk-executedata的时候需要创建stats

第一个参数忽略, 其实就是分别调用stats/mk-spout-stats或stats/mk-bolt-stats, 可见就是对于每个需要统计的数据, 创建一个rolling-windows-set

就以update-executor-stat! stats :acked stream (stats-rate stats)为例子看看怎么做的?

spoutexecutorstats取出用于记录spout acked情况的rolling-windows-set 

然后使用update-rolling-window-set来swap这个atom

来看看记录acked的rolling-windows-set是如何定义的?

keyed-counter-rolling-window-set, 预定义了updater merger extractor 

updater, incr-val [amap key amt], 把给定的值amt加到amap的对应的key的value上 

merger, (partial merge-with +), 用+作为map merge的逻辑, 即出现相同key则相加 

extractor, counter-extract, (if v v {}), 有则返回, 无则返回{} 

windows, rolling-window的list 

all-time, 初始化为nil

好, 下面就看看, 当spout-acked-tuple!时更新:acked时, 如何update的?

首先更新每个rolling-window, 并把更新过的rolling-window-set更新到:windows 

并且更新:all-time, (apply (:updater rws) (:all-time rws) args) 

updated, incr-val [amap key amt] 

args, steamid, rate 

all-time, 是用来记录整个时间区间上的, 某个stream的统计情况

看下如何更新某个rolling-windw 

根据now算出当前属于哪个bucket, time-bucket 

取出buckets, 并使用:updater更新相应的bucket, 这里的操作仍然是把rate叠加到streamid的value上

继续阅读