天天看點

【你好Hystrix】三:Hystrix名額收集之累計統計流原理源碼解析-BucketedCumulativeCounterStream前言BucketedCumulativeCounterStream模拟累計統計總結

目錄

  • 前言
  • BucketedCumulativeCounterStream
    • 過程測試
    • CumulativeCommandEventCounterStream
    • CumulativeCollapserEventCounterStream& CumulativeThreadPoolEventCounterStream
  • 模拟累計統計
    • 定義

      Hystrix

      事件類
    • 定義Hystrix事件流類
    • 累計統計流
    • 測試
  • 總結

前言

官網裡面的内容我就不拿過來貼上面了 沒有太大的意義。如果感興趣可以自己去github上面去查找對應子產品的内容。目前我們名額收集這一塊的内容對應到官網文檔的位址是Hystrix名額收集

前面一篇文章我們滑動視窗統計流做了分析 ,這一篇文章我們分析一下另外的一種統計流–累計統計流。前一篇文章我們提到過

Hystrix

在1.5之前使用

HystrixRollingNumber

環形數組 來實作滑動視窗和累計統計 個人認為這其實是一種常用的手段,我們熟悉的

Resilience4j

也是使用環形數組來實作的。各有利弊 沒有誰更好。

Resilience

從輕量級的角度考慮是以它必然是使用環形數組 不需要依賴任何的第三方庫。而

Hystrix

本來就是一個重量級的角色是以提高擴充性是它的一個重要的名額。

BucketedCumulativeCounterStream

【你好Hystrix】三:Hystrix名額收集之累計統計流原理源碼解析-BucketedCumulativeCounterStream前言BucketedCumulativeCounterStream模拟累計統計總結

上一篇文章有這樣一個內建關系。

BucketedCounterStream

的實作有兩個分支 一個是代表滑動統計的

BucketedRollingCounterStream

這個我們上一篇文章已經分析過。而今天要說的就是另一個分支

BucketedCumulativeCounterStream

.累計統計流

BucketedCounterStream

已經将我們的資料流從事件流轉成了 桶發射器。也就是

HystrixEvent

====>

Bucket

是以

BucketedCumulativeCounterStream

的功能和我們上一節說的

BucketedRollingCounterStream

功能是一樣的 都是把

Bucket

轉成

Output

。功能

public abstract class BucketedCumulativeCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
    private Observable<Output> sourceStream;
    private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);
    protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                                              Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                                              Func2<Output, Bucket, Output> reduceBucket) {
     //調用父類的構造方法 将事件流轉成 資料桶發射器
     //桶的大小(桶的時長)由bucketSizeInMs來決定,桶的個數由numBuckets來決定                                     
     super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
     //bucketedStream是父類處理之後的流 在此基礎上去操作
     this.sourceStream = bucketedStream
             .scan(getEmptyOutputValue(), reduceBucket)
             .skip(numBuckets)
             .doOnSubscribe(new Action0() {
                 @Override
                 public void call() {
                     isSourceCurrentlySubscribed.set(true);
                 }
             })
             .doOnUnsubscribe(new Action0() {
                 @Override
                 public void call() {
                     isSourceCurrentlySubscribed.set(false);
                 }
             })
             .share()
             .onBackpressureDrop();
 }

 @Override
 public Observable<Output> observe() {
     return sourceStream;
 }
           

構造方法裡面的幾個參數

  • reduceCommandCompletion

    :把

    Event

    轉成

    Bucket

    這個由子類傳遞過來 極大的增加了靈活性
  • reduceBucket

    :把

    Bucket

    轉換成

    Output

  • numBuckets

    :桶數量 這個是父類需要使用 初始化第一個視窗
  • bucketSizeInMs

    :将多長時間的事件流聚合成一個桶。也是父類需要使用

該類主要使用

scan

skip

操作符把Observable 轉換成

Observable<OutPut>

。這兩個操作符的作用上一篇文章已經示範過。

過程測試

public void cumulativeCounterStreamTest() throws InterruptedException {
     //1.Observable<Long>===>Observable<ArrayList>的轉變
    Observable<ArrayList<Long>> bucketStream =
        Observable.interval(1000, TimeUnit.MILLISECONDS)
                  .window(2000, TimeUnit.MILLISECONDS)
                  .flatMap((Func1<Observable<Long>, Observable<ArrayList<Long>>>) longObservable -> longObservable.reduce(new ArrayList<Long>(), new Func2<ArrayList<Long>, Long, ArrayList<Long>>() {
                    @Override
                    public ArrayList<Long> call(ArrayList<Long> longs, Long aLong) {
                      longs.add(aLong);
                      return longs;
                    }
                  }));
    //2.Observable<ArrayList> ==> Observable<Long>
    Observable<Long> outputStream = bucketStream.scan(0L, (aLong, longs) -> {
      for (int i = 0; i < longs.size(); i++) {
        aLong += longs.get(i);
      }
      return aLong;
    }).skip(2);
    //3.BehaviorSubject訂閱
    final BehaviorSubject subject = BehaviorSubject.create();
    outputStream.subscribe(subject);
    //4.每隔兩秒輸出一下最新的值
    new Thread(() -> {
      while (true){
        try {
          TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        System.out.println(subject.getValue());
      }
    }).start();
    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
  }
           

以上demo共分為4步

  • 第一步:将

    Observable<Long>

    轉換成

    Observable<List>

    也就是我們說的将

    Event

    轉成

    Bucket

    的過程。這個是在父類中完成的
  • 第二步:将

    Observable<List>

    轉成

    Observable<Long>

    注意此處的

    Observable<long>

    是作為一個

    output

    是一個 聚合的結果 并非第一步的

    Observable<Long>

    。此處正是我們講的

    BucketedCumulativeCounterStream

    來完成的。
  • 第三步:初始化一個

    BehaviorSubject

    來訂閱 至于為什麼是

    BehaviorSubject

    上篇中有說到,這裡不再贅述。
  • 第四步:初始化一個線程來定時擷取流中的結果

結果如下:

null
3
10
....
           

每次擷取的值都是遞增的,這個值隻可能無限的增大。這就是我們說的累計統計流的效果。

CumulativeCommandEventCounterStream

通過介紹完上面的内容之後 這個流就顯得很簡單了,父類把要做的事都做完了 它隻負責初始化确定泛型内容就可以了。

//對監聽HystrixCommandCompletion事件流做處理
public class CumulativeCommandEventCounterStream extends BucketedCumulativeCounterStream<HystrixCommandCompletion, long[], long[]> {

   //根據commandKey來緩存對應的流對象
    private static final ConcurrentMap<String, CumulativeCommandEventCounterStream> streams = new ConcurrentHashMap<String, CumulativeCommandEventCounterStream>();
    //事件類型的數量 因為需要統計不同僚件類型
    private static final int NUM_EVENT_TYPES = HystrixEventType.values().length;
    public static CumulativeCommandEventCounterStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) {
    //下面對 numBuckets 和bucketSizeInMs的初始化和前面說的RollingCommandEventCounterStream是一毛一樣的
        final int counterMetricWindow = properties.metricsRollingStatisticalWindowInMilliseconds().get();
        final int numCounterBuckets = properties.metricsRollingStatisticalWindowBuckets().get();
        final int counterBucketSizeInMs = counterMetricWindow / numCounterBuckets;
        return getInstance(commandKey, numCounterBuckets, counterBucketSizeInMs);
    }
    public static CumulativeCommandEventCounterStream getInstance(HystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs) {
        CumulativeCommandEventCounterStream initialStream = streams.get(commandKey.name());
        if (initialStream != null) {
            return initialStream;
        } else {
            synchronized (CumulativeCommandEventCounterStream.class) {
                CumulativeCommandEventCounterStream existingStream = streams.get(commandKey.name());
                if (existingStream == null) {
                    CumulativeCommandEventCounterStream newStream = new CumulativeCommandEventCounterStream(commandKey, numBuckets, bucketSizeInMs,
                            HystrixCommandMetrics.appendEventToBucket, HystrixCommandMetrics.bucketAggregator);
                    streams.putIfAbsent(commandKey.name(), newStream);
                    return newStream;
                } else {
                    return existingStream;
                }
            }
        }
    }
    public static void reset() {
        streams.clear();
    }
    private CumulativeCommandEventCounterStream(HystrixCommandKey commandKey, int numCounterBuckets, int counterBucketSizeInMs,
                                                Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion,
                                                Func2<long[], long[], long[]> reduceBucket) {
        super(HystrixCommandCompletionStream.getInstance(commandKey), numCounterBuckets, counterBucketSizeInMs, reduceCommandCompletion, reduceBucket);
    }
    @Override
    long[] getEmptyBucketSummary() {
        return new long[NUM_EVENT_TYPES];
    }
    @Override
    long[] getEmptyOutputValue() {
        return new long[NUM_EVENT_TYPES];
    }
    //這個方法比較重要 可以擷取目前對應事件的最新統計資料
    public long getLatest(HystrixEventType eventType) {
        return getLatest()[eventType.ordinal()];
    }
}
           

和前面一篇文章

RollingCommandEventCounterStream

本質上沒有任何差別的。是以統計流和滑動流的差別就是

BucketedCumulativeCounterStream

沒有使用

window

操作符來進行滑動統計。

CumulativeCollapserEventCounterStream& CumulativeThreadPoolEventCounterStream

上面看了

CumulativeCommandEventCounterStream

的源碼 和上一章說的

RollingCommandEventCounterStream

是一毛一樣的。而和

CumulativeCollapserEventCounterStream

CumulativeThreadPoolEventCounterStream

的差別僅僅有兩個地方。

  • Observable<Event>

    轉成

    Observable<Bucket>

    的操作.上述兩個類的具體操作分别定義在

    HystrixCollapserMetrics.appendEventToBucket

    ,

    HystrixThreadPoolMetrics.appendEventToBucket

  • Observable<Bucket>

    轉成

    Observable<Output>

    的操作

    對于上面的兩點不同可以自行翻閱代碼。

模拟累計統計

因為這一塊的内容比較簡單和上一篇文章有些重複 是以這裡我們就使用一個demo來展示如何累計統計的。

定義

Hystrix

事件類

public class HystrixTestEvent implements HystrixEvent {
  private Long data;
  public HystrixTestEvent(Long data){
    this.data = data;
  }
  public Long getData() {
    return data;
  }
  public void setData(Long data) {
    this.data = data;
  }
  @Override
  public String toString() {
    return ""+ data;
  }
}
           

定義Hystrix事件流類

定義一個事件流監聽上面定義的

HystrixTestEvent

事件。對外釋放一個

write

方法用于向流中寫入事件和一個

observe

方法用于傳回目前的事件流

public class HystrixEventTestStream implements HystrixEventStream<HystrixTestEvent> {
  private  final Subject<HystrixTestEvent , HystrixTestEvent> writeOnlySubject;
  private final Observable<HystrixTestEvent> readOnlyStream;
  public HystrixEventTestStream(){
    this.writeOnlySubject = new SerializedSubject<>(PublishSubject.create());
    this.readOnlyStream = this.writeOnlySubject.share();
  }
  @Override
  public Observable<HystrixTestEvent> observe() {
    return this.readOnlyStream;
  }
  public void write(HystrixTestEvent event){
    this.writeOnlySubject.onNext(event);
  }
}
           

累計統計流

CumulativeCommandEventCounterStreamTest

這個統計流是關鍵 這裡我們把

Bucket

定義為

Long

類型,

Output

定義為

String

類型,也就是整個過程是

Event -> Long -> String

的轉換但是它的實作很簡單隻需要定義如下資訊:

  • Observable<Event>

    Observable<Bucket>

    的操作符

    reduceCommandCompletion

  • Observable<Bucket>

    Observable<Output>

    的操作符

    reduceBucket

  • 初始化桶數量和桶長度
public class CumulativeCommandEventCounterStreamTest
    extends BucketedCumulativeCounterStream<HystrixTestEvent, Long, String> {
  private final static Func2<Long, HystrixTestEvent, Long> reduceCommandCompletion
      = (aLong, event) -> {
    Long data = event.getData();
    return data;
  };
  private final static Func2<String, Long, String> reduceBucket = (s, aLong) -> {
    if (StringUtils.isNotBlank(s)){
      return String.valueOf(Long.parseLong(s) + aLong);
    }else{
      return String.valueOf(aLong);
    }
  };
  protected CumulativeCommandEventCounterStreamTest(HystrixEventTestStream stream) {
  //這裡桶的大小寫死1  桶的長度寫死1000 是為了好測試 沒法送一個資料就會統計一下
    super(stream, 1, 1000, reduceCommandCompletion, reduceBucket);
  }
  @Override
  public Long getEmptyBucketSummary() {
    return 0L;
  }
  @Override
  public String getEmptyOutputValue() {
    return StringUtils.EMPTY;
  }
 }
           

測試

public static void main(String[] args) throws InterruptedException {
    HystrixEventTestStream hystrixEventTestStream = new HystrixEventTestStream();
    CumulativeCommandEventCounterStreamTest testStream =
        new CumulativeCommandEventCounterStreamTest(hystrixEventTestStream);
    //第一次調用 getLatest這裡是為了提前訂閱 CumulativeCommandEventCounterStreamTest
    System.out.println("counter:"+ testStream.getLatest());
    //開一個線程模拟事件的發送
    new Thread(()->{
      long counter = 0;
      while (counter < 100000){
        counter ++;
        HystrixTestEvent event = new HystrixTestEvent(counter);
        System.out.println("send event :" +event);
        hystrixEventTestStream.write(event);
        try {
          //這裡1s發送一次 是為了更好的分析輸出結果
          TimeUnit.MILLISECONDS.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).start();

    while (true){
      System.out.println("counter:"+ testStream.getLatest());
      TimeUnit.SECONDS.sleep(1);
    }
  }
           

列印結果: send event 表示發送事件 counter表示統計結果

counter:0
counter:0
send event :1
counter:1
send event :2
counter:3
send event :3
counter:6
send event :4
send event :5
counter:10
....
           

總結

BucketedCumulativeCounterStream

完成了Hystrix累計統計的功能。在了解完上一篇内容之後就比較簡單了。 【你好Hystrix】二:Hystrix名額收集之滑動收集原理源碼解析-BucketedRollingCounterStream

【你好Hystrix】三:Hystrix名額收集之累計統計流原理源碼解析-BucketedCumulativeCounterStream前言BucketedCumulativeCounterStream模拟累計統計總結