目录
- 前言
- BucketedCumulativeCounterStream
-
- 过程测试
- CumulativeCommandEventCounterStream
- CumulativeCollapserEventCounterStream& CumulativeThreadPoolEventCounterStream
- 模拟累计统计
-
- 定义
事件类Hystrix
- 定义Hystrix事件流类
- 累计统计流
- 测试
- 定义
- 总结
前言
官网里面的内容我就不拿过来贴上面了 没有太大的意义。如果感兴趣可以自己去github上面去查找对应模块的内容。目前我们指标收集这一块的内容对应到官网文档的地址是Hystrix指标收集
前面一篇文章我们滑动窗口统计流做了分析 ,这一篇文章我们分析一下另外的一种统计流–累计统计流。前一篇文章我们提到过
Hystrix
在1.5之前使用
HystrixRollingNumber
环形数组 来实现滑动窗口和累计统计 个人认为这其实是一种常用的手段,我们熟悉的
Resilience4j
也是使用环形数组来实现的。各有利弊 没有谁更好。
Resilience
从轻量级的角度考虑所以它必然是使用环形数组 不需要依赖任何的第三方库。而
Hystrix
本来就是一个重量级的角色所以提高扩展性是它的一个重要的指标。
BucketedCumulativeCounterStream
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLwB3MaVnRtpFaSJDWuVzVZBHcXRGd5cVWwh2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLxYzNzATN1cTM3AjMwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
上一篇文章有这样一个集成关系。
BucketedCounterStream
的实现有两个分支 一个是代表滑动统计的
BucketedRollingCounterStream
这个我们上一篇文章已经分析过。而今天要说的就是另一个分支
BucketedCumulativeCounterStream
.累计统计流
BucketedCounterStream
已经将我们的数据流从事件流转成了 桶发射器。也就是
HystrixEvent
====>
Bucket
。
所以
BucketedCumulativeCounterStream
的功能和我们上一节说的
BucketedRollingCounterStream
功能是一样的 都是把
Bucket
转成
Output
。功能
public abstract class BucketedCumulativeCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
private Observable<Output> sourceStream;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);
protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
Func2<Bucket, Event, Bucket> reduceCommandCompletion,
Func2<Output, Bucket, Output> reduceBucket) {
//调用父类的构造方法 将事件流转成 数据桶发射器
//桶的大小(桶的时长)由bucketSizeInMs来决定,桶的个数由numBuckets来决定
super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
//bucketedStream是父类处理之后的流 在此基础上去操作
this.sourceStream = bucketedStream
.scan(getEmptyOutputValue(), reduceBucket)
.skip(numBuckets)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
@Override
public Observable<Output> observe() {
return sourceStream;
}
构造方法里面的几个参数
-
:把reduceCommandCompletion
转成Event
这个由子类传递过来 极大的增加了灵活性Bucket
-
:把reduceBucket
转换成Bucket
Output
-
:桶数量 这个是父类需要使用 初始化第一个窗口numBuckets
-
:将多长时间的事件流聚合成一个桶。也是父类需要使用bucketSizeInMs
该类主要使用
scan
和
skip
操作符把Observable 转换成
Observable<OutPut>
。这两个操作符的作用上一篇文章已经演示过。
过程测试
public void cumulativeCounterStreamTest() throws InterruptedException {
//1.Observable<Long>===>Observable<ArrayList>的转变
Observable<ArrayList<Long>> bucketStream =
Observable.interval(1000, TimeUnit.MILLISECONDS)
.window(2000, TimeUnit.MILLISECONDS)
.flatMap((Func1<Observable<Long>, Observable<ArrayList<Long>>>) longObservable -> longObservable.reduce(new ArrayList<Long>(), new Func2<ArrayList<Long>, Long, ArrayList<Long>>() {
@Override
public ArrayList<Long> call(ArrayList<Long> longs, Long aLong) {
longs.add(aLong);
return longs;
}
}));
//2.Observable<ArrayList> ==> Observable<Long>
Observable<Long> outputStream = bucketStream.scan(0L, (aLong, longs) -> {
for (int i = 0; i < longs.size(); i++) {
aLong += longs.get(i);
}
return aLong;
}).skip(2);
//3.BehaviorSubject订阅
final BehaviorSubject subject = BehaviorSubject.create();
outputStream.subscribe(subject);
//4.每隔两秒输出一下最新的值
new Thread(() -> {
while (true){
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(subject.getValue());
}
}).start();
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
以上demo共分为4步
- 第一步:将
转换成Observable<Long>
也就是我们说的将Observable<List>
转成Event
的过程。这个是在父类中完成的Bucket
- 第二步:将
转成Observable<List>
注意此处的Observable<Long>
是作为一个Observable<long>
是一个 聚合的结果 并非第一步的output
。此处正是我们讲的Observable<Long>
来完成的。BucketedCumulativeCounterStream
- 第三步:初始化一个
来订阅 至于为什么是BehaviorSubject
上篇中有说到,这里不再赘述。BehaviorSubject
- 第四步:初始化一个线程来定时获取流中的结果
结果如下:
null
3
10
....
每次获取的值都是递增的,这个值只可能无限的增大。这就是我们说的累计统计流的效果。
CumulativeCommandEventCounterStream
通过介绍完上面的内容之后 这个流就显得很简单了,父类把要做的事都做完了 它只负责初始化确定泛型内容就可以了。
//对监听HystrixCommandCompletion事件流做处理
public class CumulativeCommandEventCounterStream extends BucketedCumulativeCounterStream<HystrixCommandCompletion, long[], long[]> {
//根据commandKey来缓存对应的流对象
private static final ConcurrentMap<String, CumulativeCommandEventCounterStream> streams = new ConcurrentHashMap<String, CumulativeCommandEventCounterStream>();
//事件类型的数量 因为需要统计不同事件类型
private static final int NUM_EVENT_TYPES = HystrixEventType.values().length;
public static CumulativeCommandEventCounterStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) {
//下面对 numBuckets 和bucketSizeInMs的初始化和前面说的RollingCommandEventCounterStream是一毛一样的
final int counterMetricWindow = properties.metricsRollingStatisticalWindowInMilliseconds().get();
final int numCounterBuckets = properties.metricsRollingStatisticalWindowBuckets().get();
final int counterBucketSizeInMs = counterMetricWindow / numCounterBuckets;
return getInstance(commandKey, numCounterBuckets, counterBucketSizeInMs);
}
public static CumulativeCommandEventCounterStream getInstance(HystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs) {
CumulativeCommandEventCounterStream initialStream = streams.get(commandKey.name());
if (initialStream != null) {
return initialStream;
} else {
synchronized (CumulativeCommandEventCounterStream.class) {
CumulativeCommandEventCounterStream existingStream = streams.get(commandKey.name());
if (existingStream == null) {
CumulativeCommandEventCounterStream newStream = new CumulativeCommandEventCounterStream(commandKey, numBuckets, bucketSizeInMs,
HystrixCommandMetrics.appendEventToBucket, HystrixCommandMetrics.bucketAggregator);
streams.putIfAbsent(commandKey.name(), newStream);
return newStream;
} else {
return existingStream;
}
}
}
}
public static void reset() {
streams.clear();
}
private CumulativeCommandEventCounterStream(HystrixCommandKey commandKey, int numCounterBuckets, int counterBucketSizeInMs,
Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion,
Func2<long[], long[], long[]> reduceBucket) {
super(HystrixCommandCompletionStream.getInstance(commandKey), numCounterBuckets, counterBucketSizeInMs, reduceCommandCompletion, reduceBucket);
}
@Override
long[] getEmptyBucketSummary() {
return new long[NUM_EVENT_TYPES];
}
@Override
long[] getEmptyOutputValue() {
return new long[NUM_EVENT_TYPES];
}
//这个方法比较重要 可以获取当前对应事件的最新统计数据
public long getLatest(HystrixEventType eventType) {
return getLatest()[eventType.ordinal()];
}
}
和前面一篇文章
RollingCommandEventCounterStream
本质上没有任何区别的。所以统计流和滑动流的区别就是
BucketedCumulativeCounterStream
没有使用
window
操作符来进行滑动统计。
CumulativeCollapserEventCounterStream& CumulativeThreadPoolEventCounterStream
上面看了
CumulativeCommandEventCounterStream
的源码 和上一章说的
RollingCommandEventCounterStream
是一毛一样的。而和
CumulativeCollapserEventCounterStream
、
CumulativeThreadPoolEventCounterStream
的区别仅仅有两个地方。
-
转成Observable<Event>
的操作.上述两个类的具体操作分别定义在Observable<Bucket>
,HystrixCollapserMetrics.appendEventToBucket
HystrixThreadPoolMetrics.appendEventToBucket
-
转成Observable<Bucket>
Observable<Output>
的操作
对于上面的两点不同可以自行翻阅代码。
模拟累计统计
因为这一块的内容比较简单和上一篇文章有些重复 所以这里我们就使用一个demo来展示如何累计统计的。
定义 Hystrix
事件类
Hystrix
public class HystrixTestEvent implements HystrixEvent {
private Long data;
public HystrixTestEvent(Long data){
this.data = data;
}
public Long getData() {
return data;
}
public void setData(Long data) {
this.data = data;
}
@Override
public String toString() {
return ""+ data;
}
}
定义Hystrix事件流类
定义一个事件流监听上面定义的
HystrixTestEvent
事件。对外释放一个
write
方法用于向流中写入事件和一个
observe
方法用于返回当前的事件流
public class HystrixEventTestStream implements HystrixEventStream<HystrixTestEvent> {
private final Subject<HystrixTestEvent , HystrixTestEvent> writeOnlySubject;
private final Observable<HystrixTestEvent> readOnlyStream;
public HystrixEventTestStream(){
this.writeOnlySubject = new SerializedSubject<>(PublishSubject.create());
this.readOnlyStream = this.writeOnlySubject.share();
}
@Override
public Observable<HystrixTestEvent> observe() {
return this.readOnlyStream;
}
public void write(HystrixTestEvent event){
this.writeOnlySubject.onNext(event);
}
}
累计统计流
CumulativeCommandEventCounterStreamTest
这个统计流是关键 这里我们把
Bucket
定义为
Long
类型,
Output
定义为
String
类型,也就是整个过程是
Event -> Long -> String
的转换但是它的实现很简单只需要定义如下信息:
-
转Observable<Event>
的操作符Observable<Bucket>
reduceCommandCompletion
-
转Observable<Bucket>
的操作符Observable<Output>
reduceBucket
- 初始化桶数量和桶长度
public class CumulativeCommandEventCounterStreamTest
extends BucketedCumulativeCounterStream<HystrixTestEvent, Long, String> {
private final static Func2<Long, HystrixTestEvent, Long> reduceCommandCompletion
= (aLong, event) -> {
Long data = event.getData();
return data;
};
private final static Func2<String, Long, String> reduceBucket = (s, aLong) -> {
if (StringUtils.isNotBlank(s)){
return String.valueOf(Long.parseLong(s) + aLong);
}else{
return String.valueOf(aLong);
}
};
protected CumulativeCommandEventCounterStreamTest(HystrixEventTestStream stream) {
//这里桶的大小写死1 桶的长度写死1000 是为了好测试 没法送一个数据就会统计一下
super(stream, 1, 1000, reduceCommandCompletion, reduceBucket);
}
@Override
public Long getEmptyBucketSummary() {
return 0L;
}
@Override
public String getEmptyOutputValue() {
return StringUtils.EMPTY;
}
}
测试
public static void main(String[] args) throws InterruptedException {
HystrixEventTestStream hystrixEventTestStream = new HystrixEventTestStream();
CumulativeCommandEventCounterStreamTest testStream =
new CumulativeCommandEventCounterStreamTest(hystrixEventTestStream);
//第一次调用 getLatest这里是为了提前订阅 CumulativeCommandEventCounterStreamTest
System.out.println("counter:"+ testStream.getLatest());
//开一个线程模拟事件的发送
new Thread(()->{
long counter = 0;
while (counter < 100000){
counter ++;
HystrixTestEvent event = new HystrixTestEvent(counter);
System.out.println("send event :" +event);
hystrixEventTestStream.write(event);
try {
//这里1s发送一次 是为了更好的分析输出结果
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
while (true){
System.out.println("counter:"+ testStream.getLatest());
TimeUnit.SECONDS.sleep(1);
}
}
打印结果: send event 表示发送事件 counter表示统计结果
counter:0
counter:0
send event :1
counter:1
send event :2
counter:3
send event :3
counter:6
send event :4
send event :5
counter:10
....
总结
BucketedCumulativeCounterStream
完成了Hystrix累计统计的功能。在了解完上一篇内容之后就比较简单了。 【你好Hystrix】二:Hystrix指标收集之滑动收集原理源码解析-BucketedRollingCounterStream