讓人迷茫的原因隻有一個:你本該拼搏的年紀,卻想得太多,做得太少。
前言
上篇文章 介紹了
Hystrix
的“主流”:在滑動視窗内統計流、健康流。既然
Hystrix
的名額資料收集是基于事件驅動,那麼自然可以多一些監聽流,那麼本文将做個收尾,對
Hystrix
内置的累計統計流、分發流、最大并發流…等等分别做介紹,讓小夥伴們能對這種模式有個更深的了解,後面介紹的Hystrix各次元的監控都基于它們擴充出來的哦。
Hystrix已經内置了對事件監聽時各種流的實作,大多數數情況下無需自己來擴充實作的,當然若你着實要和第三方監控平台深度內建,那麼你也可以自定義收集方式。
正文
累計統計流 BucketedCumulativeCounterStream
它和
BucketedRollingCounterStream
的差別是:它在減桶的過程中,持續/無限累積計數。
public abstract class BucketedCumulativeCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
private Observable<Output> sourceStream;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);
protected BucketedCumulativeCounterStream(HystrixEventStream<Event> stream, int numBuckets, int bucketSizeInMs,
Func2<Bucket, Event, Bucket> reduceCommandCompletion,
Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, reduceCommandCompletion);
this.sourceStream = bucketedStream
.scan(getEmptyOutputValue(), reduceBucket) // 這是最大的差別,使用scan 一直掃描
.skip(numBuckets)
.doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true))
.doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false))
.share()
.onBackpressureDrop(); // 背壓:多餘的直接棄掉
}
// 實作父類方法
@Override
public Observable<Output> observe() {
return sourceStream;
}
}
複制
最大的差別就是對
bucketedStream
的處理上,滑動視窗使用的是
window + flatMap
,而本處使用的是scan,代表着持續/無限累積計數。它有如下實作類:

CumulativeCommandEventCounterStream
CumulativeThreadPoolEventCounterStream
CumulativeCollapserEventCounterStream
以上實作類源碼此處均不展示,是因為完全和
BucketedRollingCounterStream
體系的實作一模一樣,請參照上篇文章即可。唯一的不同在父類:一個隻統計指定視窗,一個持續不斷的累計統計。
淺談metrics名額釋意
監控是大型分布式系統的必備系統,它們的資料均來自一些名額資訊。收集名額資訊的庫有很多,其中比較出名的有
metrics-core
,它可以把收集到的資訊提供給
Meter、Histogram、Gauge...
等度量工具使用,進而可以畫出如下美圖:
當然本文并不講述
metrics-core
如何用,而是以一段名額值為例,稍加解釋:
{
"version":"3.0.0",
"timers":{
"count":0,
"max":0,
"mean":0,
"min":0,
"p50":0,
"p75":0,
"p95":0,
"p98":0,
"p99":0,
"p999":0,
"stddev":0,
"m15_rate":0,
"m1_rate":0,
"m5_rate":0,
"mean_rate":0,
"duration_units":"seconds",
"rate_units":"calls/second"
}
}
複制
這個名額還是比較詳細的,裡面會有mean、p75、p99… 這個其實是很關鍵的資料名額。這些名額主要用于給你設定逾時時間提供極有力的參考,如果你每每設定逾時時間參考的是
RT值
是mean平均值,那你和瞎蒙沒啥差別。
另外到底參考那個值,要看你的系統的整體量級,以及需要滿足幾個9,比如要滿足三個9,那麼逾時時間是需要謹慎的。
分位數p50、p95、p999代表什麼意思?
count/max/min/mean這些都不用解釋,其它主要關心:
- p50:也叫中位數(注意中位數不是平均數)。它表示把資料總數分為上下兩等分,中間的那個數值
- 分位數:分位數是将總體的全部資料按從小到大順序排列後,處于各等分位置的變量值。
-
:代表95%的響應時間不大于10msp95 = 10ms
- p99、p999:含義同上
-
p表示:percent 百分比。
-
:15分鐘内。請求數/每秒的比率m15_rate
-
:1分鐘内…m1_rate
-
: 平均每秒請求數(平均QPS,意義不大)mean_rate
-
:calls/second” 比率機關,這裡表示每秒鐘請求數rate_units
對于監控名額來說,一般來說平均值幾乎沒有意義,而分位數一般是重點關注的值。
分布流 RollingDistributionStream
在指定時間視窗内分布流。說到分布,是以和統計、畫圖有關。。。
public class RollingDistributionStream<Event extends HystrixEvent> {
...
// 訂閱者可以訂閱消費消息,得到各種分位數,都存在CachedValuesHistogram裡呢
public Observable<CachedValuesHistogram> observe() {
return rollingDistributionStream;
}
}
複制
雖然它不是抽象類,但它也沒标明具體監聽哪種事件,使用什麼資料流
HystrixEventStream
。總之最終它會監聽一個消息流(比如
HystrixCommandStartStream
它吧),然後通過RxJava的window操作符對一段時間内的數值進行運算操作,生成統計值放在Histogram對象中,然後重新發射。
它内部內建使用度量工具
org.HdrHistogram.Histogram
來統計分析名額資料,并且給出非常詳細的分位數資料(最高達四個9 -> p9999)。它還有如下子類:
RollingCommandUserLatencyDistributionStream
LatencyDistribution
:延遲釋出。延遲的值來自于:調用方線程送出請求和響應可見之間的時間間隔
executionResult.getUserThreadLatency()
。
RollingCommandLatencyDistributionStream
延遲釋出。和上的差別是延遲時間來自于:
executionResult.getExecutionLatency()
表示:time spent in run() method
它們倆監聽的均是
HystrixCommandCompletionStream
資料流~
RollingCollapserBatchSizeDistributionStream
監聽了
HystrixCollapserEventStream
消息流,并且監聽視窗期内的
ADDED_TO_BATCH
消息類型次數,通過Histogram計算後再發射出去。
最大并發流 RollingConcurrencyStream
它用于對最大并發進行統計:對一段時間内的執行并發量取最大值,如Command/ThreadPool的最大并發數。
// 竟然泛型都木有,幹淨利落
public abstract class RollingConcurrencyStream {
}
複制
它監聽的是
HystrixCommandExecutionStarted
事件,它會發送并發數過來,進而便可獲得
event.getCurrentConcurrency()
,對比每個桶(一個桶代表1s),最後取出最大值
Math.max(a, b)
。
RollingCommandMaxConcurrencyStream
RollingThreadPoolMaxConcurrencyStream
因為監聽
HystrixCommandExecutionStarted
事件的有兩種事件流:command的和ThreadPool的,是以必須用兩個類來表示。它倆除了關心的事件不一樣,其它都一樣~
配置流 HystrixConfigurationStream
這個類對目前的Hystrix配置進行采樣,并将其作為流公開。
public class HystrixConfigurationStream {
...
private final Observable<HystrixConfiguration> allConfigurationStream;
...
public Observable<HystrixConfiguration> observe() {
return allConfigurationStream;
}
// 當然還可以當讀監控某一類配置
public Observable<Map<HystrixCommandKey, HystrixCommandConfiguration>> observeCommandConfiguration() {
return allConfigurationStream.map(getOnlyCommandConfig);
}
public Observable<Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>> observeThreadPoolConfiguration() {
return allConfigurationStream.map(getOnlyThreadPoolConfig);
}
public Observable<Map<HystrixCollapserKey, HystrixCollapserConfiguration>> observeCollapserConfiguration() {
return allConfigurationStream.map(getOnlyCollapserConfig);
}
...
}
複制
可使用
hystrix.stream.config.intervalInMilliseconds = 5000
來配置多長時間采樣一次,預設5000ms也就是5秒采樣一次。另外
com.netflix.hystrix.contrib.sample.stream.HystrixConfigSseServlet
就是用該流來擷取配置資訊的。
功能流 HystrixUtilizationStream
Utilization
:使用、利用(使用率、使用率)。這個類對目前Hystrix資源的利用情況進行采樣,并将其公開為流。
public class HystrixUtilizationStream {
// HystrixUtilization就是最終的資料結構格式,下面給使用示例
private final Observable<HystrixUtilization> allUtilizationStream;
...
public Observable<HystrixUtilization> observe() {
return allUtilizationStream;
}
public Observable<Map<HystrixCommandKey, HystrixCommandUtilization>> observeCommandUtilization() {
return allUtilizationStream.map(getOnlyCommandUtilization);
}
public Observable<Map<HystrixThreadPoolKey, HystrixThreadPoolUtilization>> observeThreadPoolUtilization() {
return allUtilizationStream.map(getOnlyThreadPoolUtilization);
}
}
複制
可使用
hystrix.stream.utilization.intervalInMilliseconds = 500
來配置多長時間采樣一次,預設500ms采樣一次。另外
com.netflix.hystrix.contrib.sample.stream.HystrixUtilizationSseServlet
就是用該流來擷取資源利用資訊的。
使用示例
public class CommandHelloWorld extends HystrixCommand<String> {
private final String name;
// 指定一個HystrixCommandGroupKey,這樣熔斷政策會按照此組執行
public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("MyAppGroup"));
this.name = name;
}
@Override
protected String run() {
if(name == null){
throw new NullPointerException();
}
return "Hello " + name + "!";
}
@Override
protected String getFallback() {
// super.getFallback():No fallback available.
return "this is fallback msg";
}
}
複制
private static final String toJsonString(Object obj) {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
@Test
public void fun1() throws InterruptedException {
// 檢視command、線程池的使用情況
HystrixUtilizationStream utilizationStream = HystrixUtilizationStream.getInstance();
// utilizationStream.observeThreadPoolUtilization()
utilizationStream.observe().subscribe(d -> System.out.println(toJsonString(d)));
// 檢視配置情況
HystrixConfigurationStream configStream = HystrixConfigurationStream.getInstance();
configStream.observe().subscribe(d -> {
System.out.println(d);
});
// 累計統計流
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("CommandHelloWorld");
HystrixPropertiesCommandDefault properties = new HystrixPropertiesCommandDefault(commandKey, HystrixCommandProperties.Setter());
CumulativeCommandEventCounterStream counterStream = CumulativeCommandEventCounterStream.getInstance(commandKey, properties);
counterStream.observe().subscribe(d -> System.out.println(toJsonString(d)));
// 最大并發流
RollingCommandMaxConcurrencyStream concurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(commandKey, properties);
concurrencyStream.observe().subscribe(d -> System.out.println(toJsonString(d)));
// 發送事件(發送多次)
CommandHelloWorld helloWorld = new CommandHelloWorld("YoutBatman");
helloWorld.execute();
helloWorld = new CommandHelloWorld("YoutBatman");
helloWorld.queue();
// 走fallabck
helloWorld = new CommandHelloWorld(null);
helloWorld.queue();
// 因為配置5秒鐘才能列印一次
TimeUnit.SECONDS.sleep(5);
}
複制
運作程式,控制台輸出:
資源利用情況:
{
"commandUtilizationMap":{
"CommandHelloWorld":{
"concurrentCommandCount":0
}
},
"threadPoolUtilizationMap":{
"MyAppGroup":{
"currentActiveCount":0,
"currentCorePoolSize":10,
"currentPoolSize":3,
"currentQueueSize":0
}
}
}
複制
這是功能流的資料,最明顯的是啟動了三次任務,線程池大小目前是3。另外,因為配置流中的對應無法很好的用JSON序列化,這裡我隻能采用笨拙的截圖的方式展示喽(下面配置不生效哦,若配置了信号量,那麼ThreadPoolConfig這一欄就為null了):
# hystrix.command.default.execution.isolation.strategy = SEMAPHORE
# hystrix.command.default.execution.isolation.semaphore.maxConcurrentRequests = 2
複制
累計統計流的資料如下:
[0,2,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0]
複制
對數字解釋如下:
- 2:index=1,對應事件為
HystrixEventType.SUCCESS
- 1:index=2,對應事件為
HystrixEventType.FAILURE
- 1:index=9,對應事件為
HystrixEventType.FALLBACK_SUCCESS
如果你願意,你還可以自行模拟出逾時、
TIMEOUT
復原失敗等等情況,建議親可試試,以加深了解。
FALLBACK_FAILURE
最大并發流輸出的資料是
1
,因為很明顯1秒内最多才一個請求嘛~
HealthCountsStream
健康資訊彙總:
{"errorCount":1,"errorPercentage":33,"totalRequests":3}
複制
一共3個請求,失敗了一個,是以錯誤率是
33%
。
說明:因為 HealthCountsStream
它預設是500ms照一次快照,是以此處它會列印10次(共5s嘛)
總結
到此,關于
Netflix Hystrix
名額收集,以及轉換為Stream流式的實作已經全部講述完成了。最後用一張大佬手繪圖對此作出總結: