目錄
- 前言
- BucketedCumulativeCounterStream
-
- 過程測試
- CumulativeCommandEventCounterStream
- CumulativeCollapserEventCounterStream& CumulativeThreadPoolEventCounterStream
- 模拟累計統計
-
- 定義
事件類Hystrix
- 定義Hystrix事件流類
- 累計統計流
- 測試
- 定義
- 總結
前言
官網裡面的内容我就不拿過來貼上面了 沒有太大的意義。如果感興趣可以自己去github上面去查找對應子產品的内容。目前我們名額收集這一塊的内容對應到官網文檔的位址是Hystrix名額收集
前面一篇文章我們滑動視窗統計流做了分析 ,這一篇文章我們分析一下另外的一種統計流–累計統計流。前一篇文章我們提到過
Hystrix
在1.5之前使用
HystrixRollingNumber
環形數組 來實作滑動視窗和累計統計 個人認為這其實是一種常用的手段,我們熟悉的
Resilience4j
也是使用環形數組來實作的。各有利弊 沒有誰更好。
Resilience
從輕量級的角度考慮是以它必然是使用環形數組 不需要依賴任何的第三方庫。而
Hystrix
本來就是一個重量級的角色是以提高擴充性是它的一個重要的名額。
BucketedCumulativeCounterStream
上一篇文章有這樣一個內建關系。
BucketedCounterStream
的實作有兩個分支 一個是代表滑動統計的
BucketedRollingCounterStream
這個我們上一篇文章已經分析過。而今天要說的就是另一個分支
BucketedCumulativeCounterStream
.累計統計流
BucketedCounterStream
已經将我們的資料流從事件流轉成了 桶發射器。也就是
HystrixEvent
====>
Bucket
。
是以
BucketedCumulativeCounterStream
的功能和我們上一節說的
BucketedRollingCounterStream
功能是一樣的 都是把
Bucket
轉成
Output
。功能
public abstract class BucketedCumulativeCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
private Observable<Output> sourceStream;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);
protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
Func2<Bucket, Event, Bucket> reduceCommandCompletion,
Func2<Output, Bucket, Output> reduceBucket) {
//調用父類的構造方法 将事件流轉成 資料桶發射器
//桶的大小(桶的時長)由bucketSizeInMs來決定,桶的個數由numBuckets來決定
super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
//bucketedStream是父類處理之後的流 在此基礎上去操作
this.sourceStream = bucketedStream
.scan(getEmptyOutputValue(), reduceBucket)
.skip(numBuckets)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}
@Override
public Observable<Output> observe() {
return sourceStream;
}
構造方法裡面的幾個參數
-
:把reduceCommandCompletion
轉成Event
這個由子類傳遞過來 極大的增加了靈活性Bucket
-
:把reduceBucket
轉換成Bucket
Output
-
:桶數量 這個是父類需要使用 初始化第一個視窗numBuckets
-
:将多長時間的事件流聚合成一個桶。也是父類需要使用bucketSizeInMs
該類主要使用
scan
和
skip
操作符把Observable 轉換成
Observable<OutPut>
。這兩個操作符的作用上一篇文章已經示範過。
過程測試
public void cumulativeCounterStreamTest() throws InterruptedException {
//1.Observable<Long>===>Observable<ArrayList>的轉變
Observable<ArrayList<Long>> bucketStream =
Observable.interval(1000, TimeUnit.MILLISECONDS)
.window(2000, TimeUnit.MILLISECONDS)
.flatMap((Func1<Observable<Long>, Observable<ArrayList<Long>>>) longObservable -> longObservable.reduce(new ArrayList<Long>(), new Func2<ArrayList<Long>, Long, ArrayList<Long>>() {
@Override
public ArrayList<Long> call(ArrayList<Long> longs, Long aLong) {
longs.add(aLong);
return longs;
}
}));
//2.Observable<ArrayList> ==> Observable<Long>
Observable<Long> outputStream = bucketStream.scan(0L, (aLong, longs) -> {
for (int i = 0; i < longs.size(); i++) {
aLong += longs.get(i);
}
return aLong;
}).skip(2);
//3.BehaviorSubject訂閱
final BehaviorSubject subject = BehaviorSubject.create();
outputStream.subscribe(subject);
//4.每隔兩秒輸出一下最新的值
new Thread(() -> {
while (true){
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(subject.getValue());
}
}).start();
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
以上demo共分為4步
- 第一步:将
轉換成Observable<Long>
也就是我們說的将Observable<List>
轉成Event
的過程。這個是在父類中完成的Bucket
- 第二步:将
轉成Observable<List>
注意此處的Observable<Long>
是作為一個Observable<long>
是一個 聚合的結果 并非第一步的output
。此處正是我們講的Observable<Long>
來完成的。BucketedCumulativeCounterStream
- 第三步:初始化一個
來訂閱 至于為什麼是BehaviorSubject
上篇中有說到,這裡不再贅述。BehaviorSubject
- 第四步:初始化一個線程來定時擷取流中的結果
結果如下:
null
3
10
....
每次擷取的值都是遞增的,這個值隻可能無限的增大。這就是我們說的累計統計流的效果。
CumulativeCommandEventCounterStream
通過介紹完上面的内容之後 這個流就顯得很簡單了,父類把要做的事都做完了 它隻負責初始化确定泛型内容就可以了。
//對監聽HystrixCommandCompletion事件流做處理
public class CumulativeCommandEventCounterStream extends BucketedCumulativeCounterStream<HystrixCommandCompletion, long[], long[]> {
//根據commandKey來緩存對應的流對象
private static final ConcurrentMap<String, CumulativeCommandEventCounterStream> streams = new ConcurrentHashMap<String, CumulativeCommandEventCounterStream>();
//事件類型的數量 因為需要統計不同僚件類型
private static final int NUM_EVENT_TYPES = HystrixEventType.values().length;
public static CumulativeCommandEventCounterStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) {
//下面對 numBuckets 和bucketSizeInMs的初始化和前面說的RollingCommandEventCounterStream是一毛一樣的
final int counterMetricWindow = properties.metricsRollingStatisticalWindowInMilliseconds().get();
final int numCounterBuckets = properties.metricsRollingStatisticalWindowBuckets().get();
final int counterBucketSizeInMs = counterMetricWindow / numCounterBuckets;
return getInstance(commandKey, numCounterBuckets, counterBucketSizeInMs);
}
public static CumulativeCommandEventCounterStream getInstance(HystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs) {
CumulativeCommandEventCounterStream initialStream = streams.get(commandKey.name());
if (initialStream != null) {
return initialStream;
} else {
synchronized (CumulativeCommandEventCounterStream.class) {
CumulativeCommandEventCounterStream existingStream = streams.get(commandKey.name());
if (existingStream == null) {
CumulativeCommandEventCounterStream newStream = new CumulativeCommandEventCounterStream(commandKey, numBuckets, bucketSizeInMs,
HystrixCommandMetrics.appendEventToBucket, HystrixCommandMetrics.bucketAggregator);
streams.putIfAbsent(commandKey.name(), newStream);
return newStream;
} else {
return existingStream;
}
}
}
}
public static void reset() {
streams.clear();
}
private CumulativeCommandEventCounterStream(HystrixCommandKey commandKey, int numCounterBuckets, int counterBucketSizeInMs,
Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion,
Func2<long[], long[], long[]> reduceBucket) {
super(HystrixCommandCompletionStream.getInstance(commandKey), numCounterBuckets, counterBucketSizeInMs, reduceCommandCompletion, reduceBucket);
}
@Override
long[] getEmptyBucketSummary() {
return new long[NUM_EVENT_TYPES];
}
@Override
long[] getEmptyOutputValue() {
return new long[NUM_EVENT_TYPES];
}
//這個方法比較重要 可以擷取目前對應事件的最新統計資料
public long getLatest(HystrixEventType eventType) {
return getLatest()[eventType.ordinal()];
}
}
和前面一篇文章
RollingCommandEventCounterStream
本質上沒有任何差別的。是以統計流和滑動流的差別就是
BucketedCumulativeCounterStream
沒有使用
window
操作符來進行滑動統計。
CumulativeCollapserEventCounterStream& CumulativeThreadPoolEventCounterStream
上面看了
CumulativeCommandEventCounterStream
的源碼 和上一章說的
RollingCommandEventCounterStream
是一毛一樣的。而和
CumulativeCollapserEventCounterStream
、
CumulativeThreadPoolEventCounterStream
的差別僅僅有兩個地方。
-
轉成Observable<Event>
的操作.上述兩個類的具體操作分别定義在Observable<Bucket>
,HystrixCollapserMetrics.appendEventToBucket
HystrixThreadPoolMetrics.appendEventToBucket
-
轉成Observable<Bucket>
Observable<Output>
的操作
對于上面的兩點不同可以自行翻閱代碼。
模拟累計統計
因為這一塊的内容比較簡單和上一篇文章有些重複 是以這裡我們就使用一個demo來展示如何累計統計的。
定義 Hystrix
事件類
Hystrix
public class HystrixTestEvent implements HystrixEvent {
private Long data;
public HystrixTestEvent(Long data){
this.data = data;
}
public Long getData() {
return data;
}
public void setData(Long data) {
this.data = data;
}
@Override
public String toString() {
return ""+ data;
}
}
定義Hystrix事件流類
定義一個事件流監聽上面定義的
HystrixTestEvent
事件。對外釋放一個
write
方法用于向流中寫入事件和一個
observe
方法用于傳回目前的事件流
public class HystrixEventTestStream implements HystrixEventStream<HystrixTestEvent> {
private final Subject<HystrixTestEvent , HystrixTestEvent> writeOnlySubject;
private final Observable<HystrixTestEvent> readOnlyStream;
public HystrixEventTestStream(){
this.writeOnlySubject = new SerializedSubject<>(PublishSubject.create());
this.readOnlyStream = this.writeOnlySubject.share();
}
@Override
public Observable<HystrixTestEvent> observe() {
return this.readOnlyStream;
}
public void write(HystrixTestEvent event){
this.writeOnlySubject.onNext(event);
}
}
累計統計流
CumulativeCommandEventCounterStreamTest
這個統計流是關鍵 這裡我們把
Bucket
定義為
Long
類型,
Output
定義為
String
類型,也就是整個過程是
Event -> Long -> String
的轉換但是它的實作很簡單隻需要定義如下資訊:
-
轉Observable<Event>
的操作符Observable<Bucket>
reduceCommandCompletion
-
轉Observable<Bucket>
的操作符Observable<Output>
reduceBucket
- 初始化桶數量和桶長度
public class CumulativeCommandEventCounterStreamTest
extends BucketedCumulativeCounterStream<HystrixTestEvent, Long, String> {
private final static Func2<Long, HystrixTestEvent, Long> reduceCommandCompletion
= (aLong, event) -> {
Long data = event.getData();
return data;
};
private final static Func2<String, Long, String> reduceBucket = (s, aLong) -> {
if (StringUtils.isNotBlank(s)){
return String.valueOf(Long.parseLong(s) + aLong);
}else{
return String.valueOf(aLong);
}
};
protected CumulativeCommandEventCounterStreamTest(HystrixEventTestStream stream) {
//這裡桶的大小寫死1 桶的長度寫死1000 是為了好測試 沒法送一個資料就會統計一下
super(stream, 1, 1000, reduceCommandCompletion, reduceBucket);
}
@Override
public Long getEmptyBucketSummary() {
return 0L;
}
@Override
public String getEmptyOutputValue() {
return StringUtils.EMPTY;
}
}
測試
public static void main(String[] args) throws InterruptedException {
HystrixEventTestStream hystrixEventTestStream = new HystrixEventTestStream();
CumulativeCommandEventCounterStreamTest testStream =
new CumulativeCommandEventCounterStreamTest(hystrixEventTestStream);
//第一次調用 getLatest這裡是為了提前訂閱 CumulativeCommandEventCounterStreamTest
System.out.println("counter:"+ testStream.getLatest());
//開一個線程模拟事件的發送
new Thread(()->{
long counter = 0;
while (counter < 100000){
counter ++;
HystrixTestEvent event = new HystrixTestEvent(counter);
System.out.println("send event :" +event);
hystrixEventTestStream.write(event);
try {
//這裡1s發送一次 是為了更好的分析輸出結果
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
while (true){
System.out.println("counter:"+ testStream.getLatest());
TimeUnit.SECONDS.sleep(1);
}
}
列印結果: send event 表示發送事件 counter表示統計結果
counter:0
counter:0
send event :1
counter:1
send event :2
counter:3
send event :3
counter:6
send event :4
send event :5
counter:10
....
總結
BucketedCumulativeCounterStream
完成了Hystrix累計統計的功能。在了解完上一篇内容之後就比較簡單了。 【你好Hystrix】二:Hystrix名額收集之滑動收集原理源碼解析-BucketedRollingCounterStream