天天看點

Storm-源碼分析- metric

首先定義一系列metric相關的interface, imetric, ireducer, icombiner (backtype.storm.metric.api)

在task中, 建立一系列builtin-metrics, (backtype.storm.daemon.builtin-metrics), 并注冊到topology context裡面

task會不斷的利用如spout-acked-tuple!的functions去更新這些builtin-metrics

task會定期将builtin-metrics裡面的統計資料通過metrics-stream發送給metric-bolt (backtype.storm.metric.metricsconsumerbolt, 該bolt會建立實作backtype.storm.metric.api.imetricsconsumer的對象, 用于計算出metrics) 

然後如何使用這些metrics? 

由于這是builtin metrics, 是不會被外界使用的 

如果處理這些metrics, 取決于_metricsconsumer.handledatapoints, 這裡的_metricsconsumer是通過topology's configuration配置的 

比如backtype.storm.metric.loggingmetricsconsumer, 如果使用這個consumer就會将metrics寫入log中

countmetric, 計數, reset時清零 

assignablemetric, 指派, 不用reset 

multicountmetric, 使用hashmap記錄多個count, reset時分别對每個count對象調用getvalueandreset

combinedmetric, 結合icombiner和imetric

public interface ireducer<t> { 

    t init(); 

    t reduce(t accumulator, object input); 

    object extractresult(t accumulator); 

}

實作ireducer接口, 實作平均數reducer, reduce裡面累加和計數, extractresult裡面acc/count求平均數

reducedmetric

結合ireducer和imetric

這個interface, 内嵌taskinfo和datapoint類 

handledatapoints, 添加邏輯以處理task對應的一系列datapoint

定義spout和bolt所需要的一些metric, 主要兩個record, builtinspoutmetrics和builtinboltmetrics, [metric-name, metric-object]的hashmap

在mk-task-data的時候, 調用make-data來建立相應的metrics,

并在executor的mk-threads中, 會将這些builtin-metrics注冊到topologycontext中去,

上面完成的builtin-metrics的建立和注冊, 接着定義了一系列用于更新metrics的functions,

以spout-acked-tuple!為例, 需要更新multicountmetric ack-count和multireducedmetric complete-latency 

.scope從multicountmetric取出某個countmetric, 然後incrby來将stats的rate增加到count上

建立實作imetricsconsumer的對象, 并在execute裡面調用handledatapoints

systembolt, 根據comments裡面說的, 每個worker都有一個, taskid=-1 

定義些system相關的metric, 并注冊到topologycontext裡面

需要使用java調用clojure, 是以需要import下面的package

并且用到些用于監控memory和jvm的java package

這個bolt的特點是, 隻有prepare實作了邏輯, 并且通過_preparewascalled保證prepare隻被執行一次 

prepare中的邏輯, 主要就是定義各種metric, 并且通過registermetric注冊到topologycontext中 

metic包含, jvm的運作時間, 開始時間, memory情況, 和每個garbagecollector的情況 

注冊的這些system metrics也會一起被發送到metricsconsumerbolt進行處理 

這應該用spout實作, 為啥用bolt實作?

這裡會動态的往topology裡面, 加入metric-component (metricsconsumerbolt) 和system-component (systembolt), 以及相應的steam資訊

system-topology!會往topology加上些東西

1. acker, 後面再說 

2. metric-bolt, input是所有component的tasks發來的metrics-stream, 沒有output 

3. system-bolt, 沒有input, output是兩個tick-stream 

4. 給所有component, 增加額外的輸出metrics-stream, system-stream

看下thrift中的定義, 往topology裡面增加一個blot component, 其實就是往hashmap中增加一組[string, bolt] 

關鍵就是看看如何使用thrift/mk-bolt-spec*來建立blot spec

metric-components

首先, topology裡面所有的component(包含system component), 都需要往metics-bolt發送統計資料, 是以component-ids-that-emit-metrics就是all-components-ids+system-component-id 

那麼對于任意一個comp, 都會對metics-bolt産生如下輸入, {[comp-id metrics-stream-id] :shuffle} (采用:suffle grouping方式)

然後, 用thrift/mk-bolt-spec*來定義建立bolt的fn, mk-bolt-spec

最後, 調用mk-bolt-spec來建立metics-bolt的spec, 參考上面的定義 

關鍵就是, 建立metricsconsumerbolt對象, 需要從storm-conf裡面讀出, metricsconsumer的實作類和參數 

這個bolt負責, 将從各個task接收到的資料, 調用handledatapoints生成metircs, 參考前面的定義

給每個component增加兩個output stream 

metrics-stream-id, 發送給metric-blot, 資料結構為output-fields ["task-info" "data-points"] 

system-stream-id, ,資料結構為output-fields ["event"]