天天看點

二十五、Hystrix累計統計流、分發流、最大并發流、配置流、功能流(附代碼示例)

讓人迷茫的原因隻有一個:你本該拼搏的年紀,卻想得太多,做得太少。

前言

上篇文章 介紹了

Hystrix

的“主流”:在滑動視窗内統計流、健康流。既然

Hystrix

的名額資料收集是基于事件驅動,那麼自然可以多一些監聽流,那麼本文将做個收尾,對

Hystrix

内置的累計統計流、分發流、最大并發流…等等分别做介紹,讓小夥伴們能對這種模式有個更深的了解,後面介紹的Hystrix各次元的監控都基于它們擴充出來的哦。

Hystrix已經内置了對事件監聽時各種流的實作,大多數數情況下無需自己來擴充實作的,當然若你着實要和第三方監控平台深度內建,那麼你也可以自定義收集方式。

正文

累計統計流 BucketedCumulativeCounterStream

它和

BucketedRollingCounterStream

的差別是:它在減桶的過程中,持續/無限累積計數。

public abstract class BucketedCumulativeCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {

    private Observable<Output> sourceStream;
    private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);


    protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
                                              Func2<Bucket, Event, Bucket> reduceCommandCompletion,
                                              Func2<Output, Bucket, Output> reduceBucket) {
        super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);

        this.sourceStream = bucketedStream
                .scan(getEmptyOutputValue(), reduceBucket) // 這是最大的差別,使用scan 一直掃描
                .skip(numBuckets)
                .doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true))
                .doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false))
                .share() 
                .onBackpressureDrop(); // 背壓:多餘的直接棄掉
    }

	// 實作父類方法
    @Override
    public Observable<Output> observe() {
        return sourceStream;
    }
}           

複制

最大的差別就是對

bucketedStream

的處理上,滑動視窗使用的是

window + flatMap

,而本處使用的是scan,代表着持續/無限累積計數。它有如下實作類:

二十五、Hystrix累計統計流、分發流、最大并發流、配置流、功能流(附代碼示例)

CumulativeCommandEventCounterStream

CumulativeThreadPoolEventCounterStream

CumulativeCollapserEventCounterStream

以上實作類源碼此處均不展示,是因為完全和

BucketedRollingCounterStream

體系的實作一模一樣,請參照上篇文章即可。唯一的不同在父類:一個隻統計指定視窗,一個持續不斷的累計統計。

淺談metrics名額釋意

監控是大型分布式系統的必備系統,它們的資料均來自一些名額資訊。收集名額資訊的庫有很多,其中比較出名的有

metrics-core

,它可以把收集到的資訊提供給

Meter、Histogram、Gauge...

等度量工具使用,進而可以畫出如下美圖:

二十五、Hystrix累計統計流、分發流、最大并發流、配置流、功能流(附代碼示例)

當然本文并不講述

metrics-core

如何用,而是以一段名額值為例,稍加解釋:

{
    "version":"3.0.0",
    "timers":{
        "count":0,
        "max":0,
        "mean":0,
        "min":0,
        "p50":0,
        "p75":0,
        "p95":0,
        "p98":0,
        "p99":0,
        "p999":0,
        "stddev":0,
        "m15_rate":0,
        "m1_rate":0,
        "m5_rate":0,
        "mean_rate":0,
        "duration_units":"seconds",
        "rate_units":"calls/second"
    }
}           

複制

這個名額還是比較詳細的,裡面會有mean、p75、p99… 這個其實是很關鍵的資料名額。這些名額主要用于給你設定逾時時間提供極有力的參考,如果你每每設定逾時時間參考的是

RT值

是mean平均值,那你和瞎蒙沒啥差別。

另外到底參考那個值,要看你的系統的整體量級,以及需要滿足幾個9,比如要滿足三個9,那麼逾時時間是需要謹慎的。

分位數p50、p95、p999代表什麼意思?

count/max/min/mean這些都不用解釋,其它主要關心:

  • p50:也叫中位數(注意中位數不是平均數)。它表示把資料總數分為上下兩等分,中間的那個數值
  • 分位數:分位數是将總體的全部資料按從小到大順序排列後,處于各等分位置的變量值。
    • p95 = 10ms

      :代表95%的響應時間不大于10ms
    • p99、p999:含義同上
p表示:percent 百分比。
  • m15_rate

    :15分鐘内。請求數/每秒的比率
  • m1_rate

    :1分鐘内…
  • mean_rate

    : 平均每秒請求數(平均QPS,意義不大)
  • rate_units

    :calls/second” 比率機關,這裡表示每秒鐘請求數

對于監控名額來說,一般來說平均值幾乎沒有意義,而分位數一般是重點關注的值。

分布流 RollingDistributionStream

在指定時間視窗内分布流。說到分布,是以和統計、畫圖有關。。。

public class RollingDistributionStream<Event extends HystrixEvent> { 
	... 
	// 訂閱者可以訂閱消費消息,得到各種分位數,都存在CachedValuesHistogram裡呢
    public Observable<CachedValuesHistogram> observe() {
        return rollingDistributionStream;
    }
}           

複制

雖然它不是抽象類,但它也沒标明具體監聽哪種事件,使用什麼資料流

HystrixEventStream

。總之最終它會監聽一個消息流(比如

HystrixCommandStartStream

它吧),然後通過RxJava的window操作符對一段時間内的數值進行運算操作,生成統計值放在Histogram對象中,然後重新發射。

它内部內建使用度量工具

org.HdrHistogram.Histogram

來統計分析名額資料,并且給出非常詳細的分位數資料(最高達四個9 -> p9999)。它還有如下子類:

二十五、Hystrix累計統計流、分發流、最大并發流、配置流、功能流(附代碼示例)

RollingCommandUserLatencyDistributionStream

LatencyDistribution

:延遲釋出。延遲的值來自于:調用方線程送出請求和響應可見之間的時間間隔

executionResult.getUserThreadLatency()

RollingCommandLatencyDistributionStream

延遲釋出。和上的差別是延遲時間來自于:

executionResult.getExecutionLatency()

表示:time spent in run() method

它們倆監聽的均是

HystrixCommandCompletionStream

資料流~

RollingCollapserBatchSizeDistributionStream

監聽了

HystrixCollapserEventStream

消息流,并且監聽視窗期内的

ADDED_TO_BATCH

消息類型次數,通過Histogram計算後再發射出去。

最大并發流 RollingConcurrencyStream

它用于對最大并發進行統計:對一段時間内的執行并發量取最大值,如Command/ThreadPool的最大并發數。

// 竟然泛型都木有,幹淨利落
public abstract class RollingConcurrencyStream {
}           

複制

它監聽的是

HystrixCommandExecutionStarted

事件,它會發送并發數過來,進而便可獲得

event.getCurrentConcurrency()

,對比每個桶(一個桶代表1s),最後取出最大值

Math.max(a, b)

RollingCommandMaxConcurrencyStream

RollingThreadPoolMaxConcurrencyStream

因為監聽

HystrixCommandExecutionStarted

事件的有兩種事件流:command的和ThreadPool的,是以必須用兩個類來表示。它倆除了關心的事件不一樣,其它都一樣~

配置流 HystrixConfigurationStream

這個類對目前的Hystrix配置進行采樣,并将其作為流公開。

public class HystrixConfigurationStream {

	...
	private final Observable<HystrixConfiguration> allConfigurationStream;
	...
    public Observable<HystrixConfiguration> observe() {
        return allConfigurationStream;
    }
    
    // 當然還可以當讀監控某一類配置
    public Observable<Map<HystrixCommandKey, HystrixCommandConfiguration>> observeCommandConfiguration() {
        return allConfigurationStream.map(getOnlyCommandConfig);
    }
    public Observable<Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>> observeThreadPoolConfiguration() {
        return allConfigurationStream.map(getOnlyThreadPoolConfig);
    }
    public Observable<Map<HystrixCollapserKey, HystrixCollapserConfiguration>> observeCollapserConfiguration() {
        return allConfigurationStream.map(getOnlyCollapserConfig);
    }
    ...
}           

複制

可使用

hystrix.stream.config.intervalInMilliseconds = 5000

來配置多長時間采樣一次,預設5000ms也就是5秒采樣一次。另外

com.netflix.hystrix.contrib.sample.stream.HystrixConfigSseServlet

就是用該流來擷取配置資訊的。

功能流 HystrixUtilizationStream

Utilization

:使用、利用(使用率、使用率)。這個類對目前Hystrix資源的利用情況進行采樣,并将其公開為流。

public class HystrixUtilizationStream {

	// HystrixUtilization就是最終的資料結構格式,下面給使用示例
	private final Observable<HystrixUtilization> allUtilizationStream;
	...
    public Observable<HystrixUtilization> observe() {
        return allUtilizationStream;
    }

    public Observable<Map<HystrixCommandKey, HystrixCommandUtilization>> observeCommandUtilization() {
        return allUtilizationStream.map(getOnlyCommandUtilization);
    }
    public Observable<Map<HystrixThreadPoolKey, HystrixThreadPoolUtilization>> observeThreadPoolUtilization() {
        return allUtilizationStream.map(getOnlyThreadPoolUtilization);
    }
}           

複制

可使用

hystrix.stream.utilization.intervalInMilliseconds = 500

來配置多長時間采樣一次,預設500ms采樣一次。另外

com.netflix.hystrix.contrib.sample.stream.HystrixUtilizationSseServlet

就是用該流來擷取資源利用資訊的。

使用示例

public class CommandHelloWorld extends HystrixCommand<String> {
    private final String name;

    // 指定一個HystrixCommandGroupKey,這樣熔斷政策會按照此組執行
    public CommandHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("MyAppGroup"));
        this.name = name;
    }

    @Override
    protected String run() {
        if(name == null){
            throw new NullPointerException();
        }
        return "Hello " + name + "!";
    }

    @Override
    protected String getFallback() {
        // super.getFallback():No fallback available.
        return "this is fallback msg";
    }
}           

複制

private static final String toJsonString(Object obj) {
    ObjectMapper mapper = new ObjectMapper();
    try {
        return mapper.writeValueAsString(obj);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}


@Test
public void fun1() throws InterruptedException {
    // 檢視command、線程池的使用情況
    HystrixUtilizationStream utilizationStream = HystrixUtilizationStream.getInstance();
    // utilizationStream.observeThreadPoolUtilization()
    utilizationStream.observe().subscribe(d -> System.out.println(toJsonString(d)));

    // 檢視配置情況
    HystrixConfigurationStream configStream = HystrixConfigurationStream.getInstance();
    configStream.observe().subscribe(d -> {
        System.out.println(d);
    });

    // 累計統計流
    HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("CommandHelloWorld");
    HystrixPropertiesCommandDefault properties = new HystrixPropertiesCommandDefault(commandKey, HystrixCommandProperties.Setter());
    CumulativeCommandEventCounterStream counterStream = CumulativeCommandEventCounterStream.getInstance(commandKey, properties);
    counterStream.observe().subscribe(d -> System.out.println(toJsonString(d)));

    // 最大并發流
    RollingCommandMaxConcurrencyStream concurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(commandKey, properties);
    concurrencyStream.observe().subscribe(d -> System.out.println(toJsonString(d)));

    // 發送事件(發送多次)
    CommandHelloWorld helloWorld = new CommandHelloWorld("YoutBatman");
    helloWorld.execute();

    helloWorld = new CommandHelloWorld("YoutBatman");
    helloWorld.queue();

    // 走fallabck
    helloWorld = new CommandHelloWorld(null);
    helloWorld.queue();


    // 因為配置5秒鐘才能列印一次
    TimeUnit.SECONDS.sleep(5);

}           

複制

運作程式,控制台輸出:

資源利用情況:

{
    "commandUtilizationMap":{
        "CommandHelloWorld":{
            "concurrentCommandCount":0
        }
    },
    "threadPoolUtilizationMap":{
        "MyAppGroup":{
            "currentActiveCount":0,
            "currentCorePoolSize":10,
            "currentPoolSize":3,
            "currentQueueSize":0
        }
    }
}           

複制

這是功能流的資料,最明顯的是啟動了三次任務,線程池大小目前是3。另外,因為配置流中的對應無法很好的用JSON序列化,這裡我隻能采用笨拙的截圖的方式展示喽(下面配置不生效哦,若配置了信号量,那麼ThreadPoolConfig這一欄就為null了):

# hystrix.command.default.execution.isolation.strategy = SEMAPHORE
# hystrix.command.default.execution.isolation.semaphore.maxConcurrentRequests = 2           

複制

二十五、Hystrix累計統計流、分發流、最大并發流、配置流、功能流(附代碼示例)
二十五、Hystrix累計統計流、分發流、最大并發流、配置流、功能流(附代碼示例)

累計統計流的資料如下:

[0,2,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0]           

複制

對數字解釋如下:

  • 2:index=1,對應事件為

    HystrixEventType.SUCCESS

  • 1:index=2,對應事件為

    HystrixEventType.FAILURE

  • 1:index=9,對應事件為

    HystrixEventType.FALLBACK_SUCCESS

如果你願意,你還可以自行模拟出

TIMEOUT

逾時、

FALLBACK_FAILURE

復原失敗等等情況,建議親可試試,以加深了解。

最大并發流輸出的資料是

1

,因為很明顯1秒内最多才一個請求嘛~

HealthCountsStream

健康資訊彙總:

{"errorCount":1,"errorPercentage":33,"totalRequests":3}           

複制

一共3個請求,失敗了一個,是以錯誤率是

33%

說明:因為

HealthCountsStream

它預設是500ms照一次快照,是以此處它會列印10次(共5s嘛)

總結

到此,關于

Netflix Hystrix

名額收集,以及轉換為Stream流式的實作已經全部講述完成了。最後用一張大佬手繪圖對此作出總結:

二十五、Hystrix累計統計流、分發流、最大并發流、配置流、功能流(附代碼示例)
二十五、Hystrix累計統計流、分發流、最大并發流、配置流、功能流(附代碼示例)