天天看点

【你好Hystrix】三:Hystrix指标收集之累计统计流原理源码解析-BucketedCumulativeCounterStream前言BucketedCumulativeCounterStream模拟累计统计总结

目录

  • 前言
  • BucketedCumulativeCounterStream
    • 过程测试
    • CumulativeCommandEventCounterStream
    • CumulativeCollapserEventCounterStream& CumulativeThreadPoolEventCounterStream
  • 模拟累计统计
    • 定义

      Hystrix

      事件类
    • 定义Hystrix事件流类
    • 累计统计流
    • 测试
  • 总结

前言

官网里面的内容我就不拿过来贴上面了 没有太大的意义。如果感兴趣可以自己去github上面去查找对应模块的内容。目前我们指标收集这一块的内容对应到官网文档的地址是Hystrix指标收集

前面一篇文章我们滑动窗口统计流做了分析 ,这一篇文章我们分析一下另外的一种统计流–累计统计流。前一篇文章我们提到过

Hystrix

在1.5之前使用

HystrixRollingNumber

环形数组 来实现滑动窗口和累计统计 个人认为这其实是一种常用的手段,我们熟悉的

Resilience4j

也是使用环形数组来实现的。各有利弊 没有谁更好。

Resilience

从轻量级的角度考虑所以它必然是使用环形数组 不需要依赖任何的第三方库。而

Hystrix

本来就是一个重量级的角色所以提高扩展性是它的一个重要的指标。

BucketedCumulativeCounterStream

【你好Hystrix】三:Hystrix指标收集之累计统计流原理源码解析-BucketedCumulativeCounterStream前言BucketedCumulativeCounterStream模拟累计统计总结

上一篇文章有这样一个集成关系。

BucketedCounterStream

的实现有两个分支 一个是代表滑动统计的

BucketedRollingCounterStream

这个我们上一篇文章已经分析过。而今天要说的就是另一个分支

BucketedCumulativeCounterStream

.累计统计流

BucketedCounterStream

已经将我们的数据流从事件流转成了 桶发射器。也就是

HystrixEvent

====>

Bucket

所以

BucketedCumulativeCounterStream

的功能和我们上一节说的

BucketedRollingCounterStream

功能是一样的 都是把

Bucket

转成

Output

。功能

public abstract class BucketedCumulativeCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
    private Observable<Output> sourceStream;
    private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);
    protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                                              Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                                              Func2<Output, Bucket, Output> reduceBucket) {
     //调用父类的构造方法 将事件流转成 数据桶发射器
     //桶的大小(桶的时长)由bucketSizeInMs来决定,桶的个数由numBuckets来决定                                     
     super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
     //bucketedStream是父类处理之后的流 在此基础上去操作
     this.sourceStream = bucketedStream
             .scan(getEmptyOutputValue(), reduceBucket)
             .skip(numBuckets)
             .doOnSubscribe(new Action0() {
                 @Override
                 public void call() {
                     isSourceCurrentlySubscribed.set(true);
                 }
             })
             .doOnUnsubscribe(new Action0() {
                 @Override
                 public void call() {
                     isSourceCurrentlySubscribed.set(false);
                 }
             })
             .share()
             .onBackpressureDrop();
 }

 @Override
 public Observable<Output> observe() {
     return sourceStream;
 }
           

构造方法里面的几个参数

  • reduceCommandCompletion

    :把

    Event

    转成

    Bucket

    这个由子类传递过来 极大的增加了灵活性
  • reduceBucket

    :把

    Bucket

    转换成

    Output

  • numBuckets

    :桶数量 这个是父类需要使用 初始化第一个窗口
  • bucketSizeInMs

    :将多长时间的事件流聚合成一个桶。也是父类需要使用

该类主要使用

scan

skip

操作符把Observable 转换成

Observable<OutPut>

。这两个操作符的作用上一篇文章已经演示过。

过程测试

public void cumulativeCounterStreamTest() throws InterruptedException {
     //1.Observable<Long>===>Observable<ArrayList>的转变
    Observable<ArrayList<Long>> bucketStream =
        Observable.interval(1000, TimeUnit.MILLISECONDS)
                  .window(2000, TimeUnit.MILLISECONDS)
                  .flatMap((Func1<Observable<Long>, Observable<ArrayList<Long>>>) longObservable -> longObservable.reduce(new ArrayList<Long>(), new Func2<ArrayList<Long>, Long, ArrayList<Long>>() {
                    @Override
                    public ArrayList<Long> call(ArrayList<Long> longs, Long aLong) {
                      longs.add(aLong);
                      return longs;
                    }
                  }));
    //2.Observable<ArrayList> ==> Observable<Long>
    Observable<Long> outputStream = bucketStream.scan(0L, (aLong, longs) -> {
      for (int i = 0; i < longs.size(); i++) {
        aLong += longs.get(i);
      }
      return aLong;
    }).skip(2);
    //3.BehaviorSubject订阅
    final BehaviorSubject subject = BehaviorSubject.create();
    outputStream.subscribe(subject);
    //4.每隔两秒输出一下最新的值
    new Thread(() -> {
      while (true){
        try {
          TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        System.out.println(subject.getValue());
      }
    }).start();
    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
  }
           

以上demo共分为4步

  • 第一步:将

    Observable<Long>

    转换成

    Observable<List>

    也就是我们说的将

    Event

    转成

    Bucket

    的过程。这个是在父类中完成的
  • 第二步:将

    Observable<List>

    转成

    Observable<Long>

    注意此处的

    Observable<long>

    是作为一个

    output

    是一个 聚合的结果 并非第一步的

    Observable<Long>

    。此处正是我们讲的

    BucketedCumulativeCounterStream

    来完成的。
  • 第三步:初始化一个

    BehaviorSubject

    来订阅 至于为什么是

    BehaviorSubject

    上篇中有说到,这里不再赘述。
  • 第四步:初始化一个线程来定时获取流中的结果

结果如下:

null
3
10
....
           

每次获取的值都是递增的,这个值只可能无限的增大。这就是我们说的累计统计流的效果。

CumulativeCommandEventCounterStream

通过介绍完上面的内容之后 这个流就显得很简单了,父类把要做的事都做完了 它只负责初始化确定泛型内容就可以了。

//对监听HystrixCommandCompletion事件流做处理
public class CumulativeCommandEventCounterStream extends BucketedCumulativeCounterStream<HystrixCommandCompletion, long[], long[]> {

   //根据commandKey来缓存对应的流对象
    private static final ConcurrentMap<String, CumulativeCommandEventCounterStream> streams = new ConcurrentHashMap<String, CumulativeCommandEventCounterStream>();
    //事件类型的数量 因为需要统计不同事件类型
    private static final int NUM_EVENT_TYPES = HystrixEventType.values().length;
    public static CumulativeCommandEventCounterStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) {
    //下面对 numBuckets 和bucketSizeInMs的初始化和前面说的RollingCommandEventCounterStream是一毛一样的
        final int counterMetricWindow = properties.metricsRollingStatisticalWindowInMilliseconds().get();
        final int numCounterBuckets = properties.metricsRollingStatisticalWindowBuckets().get();
        final int counterBucketSizeInMs = counterMetricWindow / numCounterBuckets;
        return getInstance(commandKey, numCounterBuckets, counterBucketSizeInMs);
    }
    public static CumulativeCommandEventCounterStream getInstance(HystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs) {
        CumulativeCommandEventCounterStream initialStream = streams.get(commandKey.name());
        if (initialStream != null) {
            return initialStream;
        } else {
            synchronized (CumulativeCommandEventCounterStream.class) {
                CumulativeCommandEventCounterStream existingStream = streams.get(commandKey.name());
                if (existingStream == null) {
                    CumulativeCommandEventCounterStream newStream = new CumulativeCommandEventCounterStream(commandKey, numBuckets, bucketSizeInMs,
                            HystrixCommandMetrics.appendEventToBucket, HystrixCommandMetrics.bucketAggregator);
                    streams.putIfAbsent(commandKey.name(), newStream);
                    return newStream;
                } else {
                    return existingStream;
                }
            }
        }
    }
    public static void reset() {
        streams.clear();
    }
    private CumulativeCommandEventCounterStream(HystrixCommandKey commandKey, int numCounterBuckets, int counterBucketSizeInMs,
                                                Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion,
                                                Func2<long[], long[], long[]> reduceBucket) {
        super(HystrixCommandCompletionStream.getInstance(commandKey), numCounterBuckets, counterBucketSizeInMs, reduceCommandCompletion, reduceBucket);
    }
    @Override
    long[] getEmptyBucketSummary() {
        return new long[NUM_EVENT_TYPES];
    }
    @Override
    long[] getEmptyOutputValue() {
        return new long[NUM_EVENT_TYPES];
    }
    //这个方法比较重要 可以获取当前对应事件的最新统计数据
    public long getLatest(HystrixEventType eventType) {
        return getLatest()[eventType.ordinal()];
    }
}
           

和前面一篇文章

RollingCommandEventCounterStream

本质上没有任何区别的。所以统计流和滑动流的区别就是

BucketedCumulativeCounterStream

没有使用

window

操作符来进行滑动统计。

CumulativeCollapserEventCounterStream& CumulativeThreadPoolEventCounterStream

上面看了

CumulativeCommandEventCounterStream

的源码 和上一章说的

RollingCommandEventCounterStream

是一毛一样的。而和

CumulativeCollapserEventCounterStream

CumulativeThreadPoolEventCounterStream

的区别仅仅有两个地方。

  • Observable<Event>

    转成

    Observable<Bucket>

    的操作.上述两个类的具体操作分别定义在

    HystrixCollapserMetrics.appendEventToBucket

    ,

    HystrixThreadPoolMetrics.appendEventToBucket

  • Observable<Bucket>

    转成

    Observable<Output>

    的操作

    对于上面的两点不同可以自行翻阅代码。

模拟累计统计

因为这一块的内容比较简单和上一篇文章有些重复 所以这里我们就使用一个demo来展示如何累计统计的。

定义

Hystrix

事件类

public class HystrixTestEvent implements HystrixEvent {
  private Long data;
  public HystrixTestEvent(Long data){
    this.data = data;
  }
  public Long getData() {
    return data;
  }
  public void setData(Long data) {
    this.data = data;
  }
  @Override
  public String toString() {
    return ""+ data;
  }
}
           

定义Hystrix事件流类

定义一个事件流监听上面定义的

HystrixTestEvent

事件。对外释放一个

write

方法用于向流中写入事件和一个

observe

方法用于返回当前的事件流

public class HystrixEventTestStream implements HystrixEventStream<HystrixTestEvent> {
  private  final Subject<HystrixTestEvent , HystrixTestEvent> writeOnlySubject;
  private final Observable<HystrixTestEvent> readOnlyStream;
  public HystrixEventTestStream(){
    this.writeOnlySubject = new SerializedSubject<>(PublishSubject.create());
    this.readOnlyStream = this.writeOnlySubject.share();
  }
  @Override
  public Observable<HystrixTestEvent> observe() {
    return this.readOnlyStream;
  }
  public void write(HystrixTestEvent event){
    this.writeOnlySubject.onNext(event);
  }
}
           

累计统计流

CumulativeCommandEventCounterStreamTest

这个统计流是关键 这里我们把

Bucket

定义为

Long

类型,

Output

定义为

String

类型,也就是整个过程是

Event -> Long -> String

的转换但是它的实现很简单只需要定义如下信息:

  • Observable<Event>

    Observable<Bucket>

    的操作符

    reduceCommandCompletion

  • Observable<Bucket>

    Observable<Output>

    的操作符

    reduceBucket

  • 初始化桶数量和桶长度
public class CumulativeCommandEventCounterStreamTest
    extends BucketedCumulativeCounterStream<HystrixTestEvent, Long, String> {
  private final static Func2<Long, HystrixTestEvent, Long> reduceCommandCompletion
      = (aLong, event) -> {
    Long data = event.getData();
    return data;
  };
  private final static Func2<String, Long, String> reduceBucket = (s, aLong) -> {
    if (StringUtils.isNotBlank(s)){
      return String.valueOf(Long.parseLong(s) + aLong);
    }else{
      return String.valueOf(aLong);
    }
  };
  protected CumulativeCommandEventCounterStreamTest(HystrixEventTestStream stream) {
  //这里桶的大小写死1  桶的长度写死1000 是为了好测试 没法送一个数据就会统计一下
    super(stream, 1, 1000, reduceCommandCompletion, reduceBucket);
  }
  @Override
  public Long getEmptyBucketSummary() {
    return 0L;
  }
  @Override
  public String getEmptyOutputValue() {
    return StringUtils.EMPTY;
  }
 }
           

测试

public static void main(String[] args) throws InterruptedException {
    HystrixEventTestStream hystrixEventTestStream = new HystrixEventTestStream();
    CumulativeCommandEventCounterStreamTest testStream =
        new CumulativeCommandEventCounterStreamTest(hystrixEventTestStream);
    //第一次调用 getLatest这里是为了提前订阅 CumulativeCommandEventCounterStreamTest
    System.out.println("counter:"+ testStream.getLatest());
    //开一个线程模拟事件的发送
    new Thread(()->{
      long counter = 0;
      while (counter < 100000){
        counter ++;
        HystrixTestEvent event = new HystrixTestEvent(counter);
        System.out.println("send event :" +event);
        hystrixEventTestStream.write(event);
        try {
          //这里1s发送一次 是为了更好的分析输出结果
          TimeUnit.MILLISECONDS.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).start();

    while (true){
      System.out.println("counter:"+ testStream.getLatest());
      TimeUnit.SECONDS.sleep(1);
    }
  }
           

打印结果: send event 表示发送事件 counter表示统计结果

counter:0
counter:0
send event :1
counter:1
send event :2
counter:3
send event :3
counter:6
send event :4
send event :5
counter:10
....
           

总结

BucketedCumulativeCounterStream

完成了Hystrix累计统计的功能。在了解完上一篇内容之后就比较简单了。 【你好Hystrix】二:Hystrix指标收集之滑动收集原理源码解析-BucketedRollingCounterStream

【你好Hystrix】三:Hystrix指标收集之累计统计流原理源码解析-BucketedCumulativeCounterStream前言BucketedCumulativeCounterStream模拟累计统计总结