看一個人的成功,不是看他赢了多少人,而是看他成就了多少人。
代碼下載下傳位址:https://github.com/f641385712/netflix-learning
前言
前面已經花了5篇文章專門介紹了Hystrix基于事件機制的資料收集、Stream流式處理,再回頭來了解它的
HystrixMetrics
名額收集就一清二楚了。
在
HystrixCommand
執行過程(開始執行、結束執行、異常、逾時)時會不斷發出各類事件,通過收集這些資料,提供給消費者。如斷路器、
Hystrix Dashboard
可以統計分析這些資料,進而完成特定的功能。
Hystrix
以command指令模式的方式來控制業務邏輯以及熔斷邏輯的調用時機,是以說資料統計對它來說不算難事,但如何高效、精準的在記憶體中統計資料,還需要一定的技巧。
需要提前說明的是:像什麼
hystrix.stream
、
HystrixDashboard
面闆檢視等這些,本文均還不會展現。本文隻闡述資料的采集,至于資料如何使用(存儲or展示)放在後幾篇文章。
正文
Hystrix
收集資料是必不可少的一步,每個降級點(需要采取降級保護的點)的資料是獨立的,是以我們可以給每個降級點配置單獨的政策。
這些政策一般是建立在我們對這些降級點的了解之上的,初期甚至可以先觀察一下采集的資料來指定降級政策。
采集哪些資料?資料如何存儲?資料如何上報?這都是Hystrix需要考慮的問題,
Hystrix
采用的是滑動視窗+分桶的形式來采集資料(原理還蠻複雜的,本文不不做讨論),這樣既解決了資料在統計周期間切換而帶來的跳變問題(通過時間視窗),也控制了切換了力度(通過桶大小)。
關于Metrics名額收集,就不得不再次請上第一篇文章已貼出的這張執行原理圖了:

它從各個地方(包括正常邏輯執行、線程池/信号量資源監察)收集名額資訊,然後提供給斷路器使用,或者提供給監控大盤們使用(它們均是consumer)
HystrixRollingNumber
該類用來統計一段時間内的計數,也被稱作Hystrix裡用于qps計數的資料結構,采用滑動視窗 + 分桶的形式收集。
事件類型
HystrixRollingNumberEvent
:可以在
HystrixRollingNumber
中捕獲的各種狀态/事件。
public enum HystrixRollingNumberEvent {
SUCCESS(1), FAILURE(1), TIMEOUT(1), SHORT_CIRCUITED(1), THREAD_POOL_REJECTED(1), SEMAPHORE_REJECTED(1), BAD_REQUEST(1),
FALLBACK_SUCCESS(1), FALLBACK_FAILURE(1), FALLBACK_REJECTION(1), FALLBACK_MISSING(1), EXCEPTION_THROWN(1), COMMAND_MAX_ACTIVE(2), EMIT(1), FALLBACK_EMIT(1),
THREAD_EXECUTION(1), THREAD_MAX_ACTIVE(2), COLLAPSED(1), RESPONSE_FROM_CACHE(1),
COLLAPSER_REQUEST_BATCHED(1), COLLAPSER_BATCH(1);
private final int type;
private HystrixRollingNumberEvent(int type) {
this.type = type;
}
// 可執行HystrixRollingNumber#increment/add/getRollingSum方法
public boolean isCounter() { return type == 1; }
// 可執行HystrixRollingNumber#updateRollingMax/getRollingMaxValue方法
public boolean isMaxUpdater() { return type == 2; }
// HystrixEventType轉為HystrixRollingNumberEvent
public static HystrixRollingNumberEvent from(HystrixEventType eventType) {
...
}
}
複制
可以看到,每一個
HystrixEventType
類型都能比對到一個
HystrixRollingNumberEvent
進而被收集進來。
HystrixRollingNumber
它是一個工具類,位于Util包:
com.netflix.hystrix.util
。滑動視窗 + 分桶邏輯實作複雜,但它作為一個工具類給提供了非常實用的擷取資料的方法:
HystrixRollingNumber:
final int numberOfBuckets;
...
// 環形桶:因為時間視窗需要滑動
final BucketCircularArray buckets;
...
public void increment(HystrixRollingNumberEvent type) {
getCurrentBucket().getAdder(type).increment();
}
...
// 擷取自JVM啟動以來所有桶的累積和
public long getCumulativeSum(HystrixRollingNumberEvent type) { ... }
// 擷取給定指定event類型的**滾動計數器**中所有桶的總和(常用)
public long getRollingSum(HystrixRollingNumberEvent type) { ... }
// 擷取滾動過程中,最新的桶的值
public long getValueOfLatestBucket(HystrixRollingNumberEvent type) { ... }
// 擷取滾動中,所有桶的值們
public long[] getValues(HystrixRollingNumberEvent type) { ... }
// 基于getValues的基礎上排序,然後取出最大值
public long getRollingMaxValue(HystrixRollingNumberEvent type) { ... }
...
複制
HystrixRollingNumber
統計一定時間内的統計數值,基本思想就是分段/分桶統計,比如說要統計qps,即1秒内的請求總數。如下圖所示,我們可以将1s的時間分成10段,每段100ms。在第一個100ms内,寫入第一個段中進行計數,在第二個100ms内,寫入第二個段中進行計數,這樣如果要統計目前時間的qps,我們總是可以通過統計目前時間前1s(共10段)的計數總和值。
說明:注意它和RollingDistributionStream的差別哦~
Metrics如何統計
Metrics在統計各種狀态時,時運用滑動視窗思想進行統計的,在一個滑動視窗時間中又劃分了若幹個Bucket(滑動視窗時間與Bucket成整數倍關系),滑動視窗的移動是以Bucket為機關進行滑動的。
如:HealthCounts 記錄的是一個Buckets的監控狀态,Buckets為一個滑動視窗的一小部分,如果一個滑動視窗時間為 t ,Bucket數量為 n,那麼每隔t/n秒将建立一個HealthCounts對象。
Metrics收集步驟
根據前面幾篇文章的表述,這裡簡單總結名額資訊的收集步驟:
- 指令在開始執行前會向開始消息流(HystrixCommandStartStream)發送開始消息(HystrixCommandExecutionStarted)
- 如果是線程池執行,執行前會向線程池開始消息流(HystrixThreadPoolStartStream)發送開始消息(HystrixCommandExecutionStarted)
- 如果是線程池執行,執行後會向線程池結束消息流(HystrixThreadPoolCompletionStream)發送完成消息(HystrixCommandCompletion)
- 指令在結束執行前會向完成消息流(HystrixCommandCompletionStream)發送完成消息(HystrixCommandCompletion)
- 不同類型的統計流(比如滑動視窗統計、累計統計、最大并發統計等等),會監聽開始消息流或完成消息流,根據接受到的消息内容,進行統計
HystrixMetrics
名額資料采集的基類。目前服務的健康狀況, 包括服務調用總次數和服務調用失敗次數等. 根據Metrics的計數, 熔斷器進而能計算出目前服務的調用失敗率, 用來和設定的門檻值比較進而決定熔斷器的狀态切換邏輯. 是以Metrics的實作非常重要。
public abstract class HystrixMetrics {
protected final HystrixRollingNumber counter;
protected HystrixMetrics(HystrixRollingNumber counter) {
this.counter = counter;
}
// 擷取累計總數
public long getCumulativeCount(HystrixRollingNumberEvent event) {
return counter.getCumulativeSum(event);
}
// 擷取目前滑動視窗内的總數
public long getRollingCount(HystrixRollingNumberEvent event) {
return counter.getRollingSum(event);
}
}
複制
Hystrix的Metrics功能子產品中存儲了與Hystrix運作相關的度量資訊,主要有三類類型:
HystrixCommandMetrics
儲存hystrix指令執行的度量資訊。
public class HystrixCommandMetrics extends HystrixMetrics {
private static final HystrixEventType[] ALL_EVENT_TYPES = HystrixEventType.values();
// 靜态Map,key為HystrixCommandKey,緩存
private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<>();
// 這兩個public的函數,在介紹前面介紹HealthCountsStream已講過,略
public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = ...
public static final Func2<long[], long[], long[]> bucketAggregator = ...
}
複制
此處解釋一下,為何有
metrics
這個static變量:由于每次請求都需要新建立command對象,而每建立一次command對象都有好多屬性需要初始化(具體參見講解AbstractCommand文章),那麼是不是非常的耗時呢???
其實構造函數中的很多初始化工作隻會集中在建立第一個Command時來做,後續建立的Command對象主要是從靜态Map中取對應的執行個體來指派,比如監控器、斷路器和線程池的初始化,因為相同的Command的command key和線程池key都是一緻的,在
HystrixCommandMetrics
、
HystrixCircuitBreaker.Factory
、
HystrixThreadPool
中均有類似于
metrics
這樣的static Map緩存用于提高效率的。
HystrixMetrics負責收集名額資料,它會借用多個
Stream
來進行多元護收集,是以它有衆多成員屬性:
HystrixCommandMetrics:
private final HystrixCommandProperties properties;
private final HystrixCommandKey key;
private final HystrixCommandGroupKey group;
private final HystrixThreadPoolKey threadPoolKey;
// 記錄目前正在執行的總指令數
// 指令start執行的時候+1,執行結束-1
private final AtomicInteger concurrentExecutionCount = new AtomicInteger();
// 使用各種緯度的Stream,進行監聽資訊流
private HealthCountsStream healthCountsStream;
private final RollingCommandEventCounterStream rollingCommandEventCounterStream;
private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream;
private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream;
private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream;
private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream;
... // 私有化構造器:給所有的屬性指派
... // 省略get方法
...
// 目前**正在執行**的總數:HystrixCommand#run()
public int getCurrentConcurrentExecutionCount() {
return concurrentExecutionCount.get();
}
...
// 檢索總請求、錯誤計數和錯誤百分比的快照。
public HealthCounts getHealthCounts() {
return healthCountsStream.getLatest();
}
...
// 因為收集名額資訊都是異步收集的,這個方法可以解除所有的訂閱
private void unsubscribeAll() {
healthCountsStream.unsubscribe();
rollingCommandEventCounterStream.unsubscribe();
cumulativeCommandEventCounterStream.unsubscribe();
rollingCommandLatencyDistributionStream.unsubscribe();
rollingCommandUserLatencyDistributionStream.unsubscribe();
rollingCommandMaxConcurrencyStream.unsubscribe();
}
複制
它對外提供非常多的方法/能力,其中絕大多數都是委托給各種xxxStream來完成,下面對主要方法做如下描述:
-
:當指令開始執行,調用該方法markCommandStart
-
:指令執行完成,調用該方法markCommandDone
- 說明:以上2方法均不是public方法,由
調用。其中有個API:AbstractCommand
後續會有詳細介紹HystrixThreadEventStream
- 說明:以上2方法均不是public方法,由
-
:擷取某一事件類型視窗期内的統計數值(委托getRollingCount
)rollingCommandEventCounterStream
-
:擷取某一事件類型持續的統計數值getCumulativeCount
-
:擷取某一百分比的請求執行時間(委托getExecutionTimePercentile
)rollingCommandLatencyDistributionStream
-
:擷取平均請求執行時間(委托getExecutionTimeMean
)rollingCommandLatencyDistributionStream
-
:擷取某一百分比的請求執行總時間(委托getTotalTimePercentile
)rollingCommandUserLatencyDistributionStream
-
:擷取平均請求執行總時間getTotalTimeMean
-
:擷取上一個視窗期内最大的并發數getRollingMaxConcurrentExecutions
-
:擷取視窗期内的失敗次數,總次數,失敗比率getHealthCountsStream
另外,建構一個
HystrixCommandMetrics
的執行個體,依舊以static靜态方法對外提供,加緩存來提高效率:
HystrixCommandMetrics:
// 根據參數,得到一個HystrixCommandMetrics 執行個體
// 如果緩存裡已經有了,就直接傳回
public static HystrixCommandMetrics getInstance(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties) {
return getInstance(key, commandGroup, null, properties);
}
public static HystrixCommandMetrics getInstance(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties) {
// 雙重校驗鎖
HystrixCommandMetrics commandMetrics = metrics.get(key.name());
if (commandMetrics != null)
return commandMetrics;
// 線程安全
synchronized (HystrixCommandMetrics.class) {
HystrixCommandMetrics existingMetrics = metrics.get(key.name());
if (existingMetrics != null)
return existingMetrics;
// 建立一個新的執行個體
// =======線程的key預設情況下就是groupKey======
HystrixThreadPoolKey nonNullThreadPoolKey;
if (threadPoolKey == null) {
nonNullThreadPoolKey = HystrixThreadPoolKey.Factory.asKey(commandGroup.name());
} else {
nonNullThreadPoolKey = threadPoolKey;
}
// 使用構造器初始化一個執行個體,并且放進Map裡緩存起來
HystrixCommandMetrics newCommandMetrics = new HystrixCommandMetrics(key, commandGroup, nonNullThreadPoolKey, properties, HystrixPlugins.getInstance().getEventNotifier());
metrics.putIfAbsent(key.name(), newCommandMetrics);
return newCommandMetrics;
}
}
// 這個getInstance隻查找緩存,若緩存中木有,就傳回null
public static HystrixCommandMetrics getInstance(HystrixCommandKey key) {
return metrics.get(key.name());
}
複制
HystrixThreadPoolMetrics
原理同上,隻是管理的Stream流不一樣而已:
HystrixThreadPoolMetrics:
private final HystrixThreadPoolKey threadPoolKey;
private final ThreadPoolExecutor threadPool;
private final HystrixThreadPoolProperties properties;
private final AtomicInteger concurrentExecutionCount = new AtomicInteger();
private final RollingThreadPoolEventCounterStream rollingCounterStream;
private final CumulativeThreadPoolEventCounterStream cumulativeCounterStream;
private final RollingThreadPoolMaxConcurrencyStream rollingThreadPoolMaxConcurrencyStream;
複制
主要方法:
-
:當線程執行時,調用此方法,次數+1markThreadExecution
-
:執行完,次數-1markThreadCompletion
-
:command任務被線程池拒絕時,次數-1markThreadRejection
-
:和上面不一樣,這裡委托的是getRollingCount
RollingThreadPoolEventCounterStream
-
:getCumulativeCount
擷取執行個體的方法同上。
HystrixCollapserMetrics
HystrixCollapserMetrics:
private final HystrixCollapserKey collapserKey;
private final HystrixCollapserProperties properties;
private final RollingCollapserEventCounterStream rollingCollapserEventCounterStream;
private final CumulativeCollapserEventCounterStream cumulativeCollapserEventCounterStream;
private final RollingCollapserBatchSizeDistributionStream rollingCollapserBatchSizeDistributionStream;
複制
略。
使用示例
在例子之前,需要提醒的是:以上方法雖然最終是委托給Stream去執行的,但是它們并不會有延遲,是立即的(因為Stream流一般都會有視窗,比如1s一次,500ms一次等等),但是,但是,但是它是getLatest哦,也就是拿最新的資料,官方解釋為:
// 同步調用以檢索上次計算的bucket而不用等待
// Synchronous call to retrieve the last calculated bucket without waiting for any emissions
public Output getLatest() { ... }
複制
示例代碼:
@Test
public void fun1() throws InterruptedException {
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("CommandHelloWorld");
HystrixCommandGroupKey commandGroupKey = HystrixCommandGroupKey.Factory.asKey("MyAppGroup");
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("MyAppGroup");
HystrixPropertiesCommandDefault properties = new HystrixPropertiesCommandDefault(commandKey, HystrixCommandProperties.Setter());
// command名額資訊
HystrixCommandMetrics commandMetrics = HystrixCommandMetrics.getInstance(commandKey, commandGroupKey, threadPoolKey, properties);
// 發送事件(發送多次)
CommandHelloWorld helloWorld = new CommandHelloWorld("YoutBatman");
helloWorld.execute();
helloWorld = new CommandHelloWorld("YoutBatman");
helloWorld.queue();
// 走fallabck
helloWorld = new CommandHelloWorld(null);
helloWorld.queue();
// 列印名額資訊
TimeUnit.SECONDS.sleep(1); // 需要留給名額收集的時間
System.out.println("===========commandMetrics資訊===========");
System.out.println(commandMetrics.getRollingCount(HystrixEventType.SUCCESS));
System.out.println(commandMetrics.getRollingCount(HystrixEventType.FAILURE));
System.out.println(commandMetrics.getRollingCount(HystrixEventType.FALLBACK_SUCCESS));
System.out.println(commandMetrics.getCumulativeCount(HystrixEventType.SUCCESS));
System.out.println(commandMetrics.getCumulativeCount(HystrixEventType.FAILURE));
System.out.println(commandMetrics.getCumulativeCount(HystrixEventType.FALLBACK_SUCCESS));
System.out.println(commandMetrics.getHealthCounts());
System.out.println(commandMetrics.getExecutionTimeMean());
}
複制
運作程式控制台列印:
===========commandMetrics資訊===========
0
0
0
0
0
0
HealthCounts[1 / 3 : 33%]
0
複制
說明:至于為何很多是0,這個和getLatest以及本地測試不好控有關,暫可忽略。
總結
關于Netflix Hystrix名額資料收集器:
HystrixMetrics
就介紹到這了,你可能會覺得它和前面講的xxxStream有非常多的功能相似之處。從源碼能看出,
HystrixMetrics
似乎是對
xxxStream
的一些包裝,内部事件最終都是委托給
xxxStream
去完成了的。
隻不過最大的差別是:
HystrixMetrics
所有的擷取名額資訊的方法,擷取的都是瞬時的(最新的)值,而并不需要等待,這是和流式統計計算最大的差別。