天天看點

[享學Netflix] 二十六、Hystrix名額資料收集器:HystrixMetrics(HystrixDashboard的資料來源)

看一個人的成功,不是看他赢了多少人,而是看他成就了多少人。

代碼下載下傳位址:https://github.com/f641385712/netflix-learning

前言

前面已經花了5篇文章專門介紹了Hystrix基于事件機制的資料收集、Stream流式處理,再回頭來了解它的

HystrixMetrics

名額收集就一清二楚了。

HystrixCommand

執行過程(開始執行、結束執行、異常、逾時)時會不斷發出各類事件,通過收集這些資料,提供給消費者。如斷路器、

Hystrix Dashboard

可以統計分析這些資料,進而完成特定的功能。

Hystrix

以command指令模式的方式來控制業務邏輯以及熔斷邏輯的調用時機,是以說資料統計對它來說不算難事,但如何高效、精準的在記憶體中統計資料,還需要一定的技巧。

需要提前說明的是:像什麼

hystrix.stream

HystrixDashboard

面闆檢視等這些,本文均還不會展現。本文隻闡述資料的采集,至于資料如何使用(存儲or展示)放在後幾篇文章。

正文

Hystrix

收集資料是必不可少的一步,每個降級點(需要采取降級保護的點)的資料是獨立的,是以我們可以給每個降級點配置單獨的政策。

這些政策一般是建立在我們對這些降級點的了解之上的,初期甚至可以先觀察一下采集的資料來指定降級政策。

采集哪些資料?資料如何存儲?資料如何上報?這都是Hystrix需要考慮的問題,

Hystrix

采用的是滑動視窗+分桶的形式來采集資料(原理還蠻複雜的,本文不不做讨論),這樣既解決了資料在統計周期間切換而帶來的跳變問題(通過時間視窗),也控制了切換了力度(通過桶大小)。

關于Metrics名額收集,就不得不再次請上第一篇文章已貼出的這張執行原理圖了:

[享學Netflix] 二十六、Hystrix名額資料收集器:HystrixMetrics(HystrixDashboard的資料來源)

它從各個地方(包括正常邏輯執行、線程池/信号量資源監察)收集名額資訊,然後提供給斷路器使用,或者提供給監控大盤們使用(它們均是consumer)

HystrixRollingNumber

該類用來統計一段時間内的計數,也被稱作Hystrix裡用于qps計數的資料結構,采用滑動視窗 + 分桶的形式收集。

事件類型

HystrixRollingNumberEvent

:可以在

HystrixRollingNumber

中捕獲的各種狀态/事件。

public enum HystrixRollingNumberEvent {
    SUCCESS(1), FAILURE(1), TIMEOUT(1), SHORT_CIRCUITED(1), THREAD_POOL_REJECTED(1), SEMAPHORE_REJECTED(1), BAD_REQUEST(1),
    FALLBACK_SUCCESS(1), FALLBACK_FAILURE(1), FALLBACK_REJECTION(1), FALLBACK_MISSING(1), EXCEPTION_THROWN(1), COMMAND_MAX_ACTIVE(2), EMIT(1), FALLBACK_EMIT(1),
    THREAD_EXECUTION(1), THREAD_MAX_ACTIVE(2), COLLAPSED(1), RESPONSE_FROM_CACHE(1),
    COLLAPSER_REQUEST_BATCHED(1), COLLAPSER_BATCH(1);

    private final int type;
    private HystrixRollingNumberEvent(int type) {
        this.type = type;
    }

	// 可執行HystrixRollingNumber#increment/add/getRollingSum方法
    public boolean isCounter() { return type == 1; }
    // 可執行HystrixRollingNumber#updateRollingMax/getRollingMaxValue方法
    public boolean isMaxUpdater() { return type == 2; }

	// HystrixEventType轉為HystrixRollingNumberEvent 
	public static HystrixRollingNumberEvent from(HystrixEventType eventType) {
		...
	}
}           

複制

可以看到,每一個

HystrixEventType

類型都能比對到一個

HystrixRollingNumberEvent

進而被收集進來。

HystrixRollingNumber

它是一個工具類,位于Util包:

com.netflix.hystrix.util

。滑動視窗 + 分桶邏輯實作複雜,但它作為一個工具類給提供了非常實用的擷取資料的方法:

HystrixRollingNumber:
	
	final int numberOfBuckets;
	...
	// 環形桶:因為時間視窗需要滑動
	final BucketCircularArray buckets;
	...
    public void increment(HystrixRollingNumberEvent type) {
        getCurrentBucket().getAdder(type).increment();
    }
    ...
    // 擷取自JVM啟動以來所有桶的累積和
    public long getCumulativeSum(HystrixRollingNumberEvent type) { ... }
    // 擷取給定指定event類型的**滾動計數器**中所有桶的總和(常用)
    public long getRollingSum(HystrixRollingNumberEvent type) { ... }
    // 擷取滾動過程中,最新的桶的值
    public long getValueOfLatestBucket(HystrixRollingNumberEvent type) { ... }
    // 擷取滾動中,所有桶的值們
    public long[] getValues(HystrixRollingNumberEvent type) { ... }
    // 基于getValues的基礎上排序,然後取出最大值
    public long getRollingMaxValue(HystrixRollingNumberEvent type) { ... }
	...           

複制

HystrixRollingNumber

統計一定時間内的統計數值,基本思想就是分段/分桶統計,比如說要統計qps,即1秒内的請求總數。如下圖所示,我們可以将1s的時間分成10段,每段100ms。在第一個100ms内,寫入第一個段中進行計數,在第二個100ms内,寫入第二個段中進行計數,這樣如果要統計目前時間的qps,我們總是可以通過統計目前時間前1s(共10段)的計數總和值。

說明:注意它和RollingDistributionStream的差別哦~

Metrics如何統計

Metrics在統計各種狀态時,時運用滑動視窗思想進行統計的,在一個滑動視窗時間中又劃分了若幹個Bucket(滑動視窗時間與Bucket成整數倍關系),滑動視窗的移動是以Bucket為機關進行滑動的。

如:HealthCounts 記錄的是一個Buckets的監控狀态,Buckets為一個滑動視窗的一小部分,如果一個滑動視窗時間為 t ,Bucket數量為 n,那麼每隔t/n秒将建立一個HealthCounts對象。

[享學Netflix] 二十六、Hystrix名額資料收集器:HystrixMetrics(HystrixDashboard的資料來源)

Metrics收集步驟

根據前面幾篇文章的表述,這裡簡單總結名額資訊的收集步驟:

  1. 指令在開始執行前會向開始消息流(HystrixCommandStartStream)發送開始消息(HystrixCommandExecutionStarted)
  2. 如果是線程池執行,執行前會向線程池開始消息流(HystrixThreadPoolStartStream)發送開始消息(HystrixCommandExecutionStarted)
  3. 如果是線程池執行,執行後會向線程池結束消息流(HystrixThreadPoolCompletionStream)發送完成消息(HystrixCommandCompletion)
  4. 指令在結束執行前會向完成消息流(HystrixCommandCompletionStream)發送完成消息(HystrixCommandCompletion)
  5. 不同類型的統計流(比如滑動視窗統計、累計統計、最大并發統計等等),會監聽開始消息流或完成消息流,根據接受到的消息内容,進行統計

HystrixMetrics

名額資料采集的基類。目前服務的健康狀況, 包括服務調用總次數和服務調用失敗次數等. 根據Metrics的計數, 熔斷器進而能計算出目前服務的調用失敗率, 用來和設定的門檻值比較進而決定熔斷器的狀态切換邏輯. 是以Metrics的實作非常重要。

public abstract class HystrixMetrics {
	protected final HystrixRollingNumber counter;
    protected HystrixMetrics(HystrixRollingNumber counter) {
        this.counter = counter;
    }
	
	// 擷取累計總數
    public long getCumulativeCount(HystrixRollingNumberEvent event) {
        return counter.getCumulativeSum(event);
    }
    // 擷取目前滑動視窗内的總數
    public long getRollingCount(HystrixRollingNumberEvent event) {
        return counter.getRollingSum(event);
    }
}           

複制

Hystrix的Metrics功能子產品中存儲了與Hystrix運作相關的度量資訊,主要有三類類型:

[享學Netflix] 二十六、Hystrix名額資料收集器:HystrixMetrics(HystrixDashboard的資料來源)

HystrixCommandMetrics

儲存hystrix指令執行的度量資訊。

public class HystrixCommandMetrics extends HystrixMetrics {

	private static final HystrixEventType[] ALL_EVENT_TYPES = HystrixEventType.values();
	// 靜态Map,key為HystrixCommandKey,緩存
	private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<>();
	
	// 這兩個public的函數,在介紹前面介紹HealthCountsStream已講過,略
	public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = ...
	public static final Func2<long[], long[], long[]> bucketAggregator = ...
}           

複制

此處解釋一下,為何有

metrics

這個static變量:由于每次請求都需要新建立command對象,而每建立一次command對象都有好多屬性需要初始化(具體參見講解AbstractCommand文章),那麼是不是非常的耗時呢???

其實構造函數中的很多初始化工作隻會集中在建立第一個Command時來做,後續建立的Command對象主要是從靜态Map中取對應的執行個體來指派,比如監控器、斷路器和線程池的初始化,因為相同的Command的command key和線程池key都是一緻的,在

HystrixCommandMetrics

HystrixCircuitBreaker.Factory

HystrixThreadPool

中均有類似于

metrics

這樣的static Map緩存用于提高效率的。

HystrixMetrics負責收集名額資料,它會借用多個

Stream

來進行多元護收集,是以它有衆多成員屬性:

HystrixCommandMetrics:

    private final HystrixCommandProperties properties;
    private final HystrixCommandKey key;
    private final HystrixCommandGroupKey group;
    private final HystrixThreadPoolKey threadPoolKey;
    // 記錄目前正在執行的總指令數
    // 指令start執行的時候+1,執行結束-1
    private final AtomicInteger concurrentExecutionCount = new AtomicInteger();

 	// 使用各種緯度的Stream,進行監聽資訊流
    private HealthCountsStream healthCountsStream;
    private final RollingCommandEventCounterStream rollingCommandEventCounterStream;
    private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream;
    private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream;
    private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream;
    private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream;

	... // 私有化構造器:給所有的屬性指派
	... // 省略get方法


	...

	// 目前**正在執行**的總數:HystrixCommand#run()
    public int getCurrentConcurrentExecutionCount() {
        return concurrentExecutionCount.get();
    }
    ...

	// 檢索總請求、錯誤計數和錯誤百分比的快照。
    public HealthCounts getHealthCounts() {
        return healthCountsStream.getLatest();
    }
	...

	// 因為收集名額資訊都是異步收集的,這個方法可以解除所有的訂閱
    private void unsubscribeAll() {
        healthCountsStream.unsubscribe();
        rollingCommandEventCounterStream.unsubscribe();
        cumulativeCommandEventCounterStream.unsubscribe();
        rollingCommandLatencyDistributionStream.unsubscribe();
        rollingCommandUserLatencyDistributionStream.unsubscribe();
        rollingCommandMaxConcurrencyStream.unsubscribe();
    }           

複制

它對外提供非常多的方法/能力,其中絕大多數都是委托給各種xxxStream來完成,下面對主要方法做如下描述:

  • markCommandStart

    :當指令開始執行,調用該方法
  • markCommandDone

    :指令執行完成,調用該方法
    • 說明:以上2方法均不是public方法,由

      AbstractCommand

      調用。其中有個API:

      HystrixThreadEventStream

      後續會有詳細介紹
  • getRollingCount

    :擷取某一事件類型視窗期内的統計數值(委托

    rollingCommandEventCounterStream

  • getCumulativeCount

    :擷取某一事件類型持續的統計數值
  • getExecutionTimePercentile

    :擷取某一百分比的請求執行時間(委托

    rollingCommandLatencyDistributionStream

  • getExecutionTimeMean

    :擷取平均請求執行時間(委托

    rollingCommandLatencyDistributionStream

  • getTotalTimePercentile

    :擷取某一百分比的請求執行總時間(委托

    rollingCommandUserLatencyDistributionStream

  • getTotalTimeMean

    :擷取平均請求執行總時間
  • getRollingMaxConcurrentExecutions

    :擷取上一個視窗期内最大的并發數
  • getHealthCountsStream

    :擷取視窗期内的失敗次數,總次數,失敗比率

另外,建構一個

HystrixCommandMetrics

的執行個體,依舊以static靜态方法對外提供,加緩存來提高效率:

HystrixCommandMetrics:

	// 根據參數,得到一個HystrixCommandMetrics 執行個體
	// 如果緩存裡已經有了,就直接傳回
    public static HystrixCommandMetrics getInstance(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties) {
        return getInstance(key, commandGroup, null, properties);
    }
    public static HystrixCommandMetrics getInstance(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties) {
    	
    	// 雙重校驗鎖
        HystrixCommandMetrics commandMetrics = metrics.get(key.name());
        if (commandMetrics != null) 
            return commandMetrics; 
		// 線程安全
		synchronized (HystrixCommandMetrics.class) {
			HystrixCommandMetrics existingMetrics = metrics.get(key.name());
                if (existingMetrics != null)
                    return existingMetrics;
			
				// 建立一個新的執行個體
				// =======線程的key預設情況下就是groupKey======
                HystrixThreadPoolKey nonNullThreadPoolKey;
                if (threadPoolKey == null) {
                    nonNullThreadPoolKey = HystrixThreadPoolKey.Factory.asKey(commandGroup.name());
                } else {
                    nonNullThreadPoolKey = threadPoolKey;
                }

				// 使用構造器初始化一個執行個體,并且放進Map裡緩存起來
                HystrixCommandMetrics newCommandMetrics = new HystrixCommandMetrics(key, commandGroup, nonNullThreadPoolKey, properties, HystrixPlugins.getInstance().getEventNotifier());
                metrics.putIfAbsent(key.name(), newCommandMetrics);
                return newCommandMetrics;

		}
    }

	// 這個getInstance隻查找緩存,若緩存中木有,就傳回null
    public static HystrixCommandMetrics getInstance(HystrixCommandKey key) {
        return metrics.get(key.name());
    }           

複制

HystrixThreadPoolMetrics

原理同上,隻是管理的Stream流不一樣而已:

HystrixThreadPoolMetrics:

    private final HystrixThreadPoolKey threadPoolKey;
    private final ThreadPoolExecutor threadPool;
    private final HystrixThreadPoolProperties properties;


    private final AtomicInteger concurrentExecutionCount = new AtomicInteger();
    private final RollingThreadPoolEventCounterStream rollingCounterStream;
    private final CumulativeThreadPoolEventCounterStream cumulativeCounterStream;
    private final RollingThreadPoolMaxConcurrencyStream rollingThreadPoolMaxConcurrencyStream;           

複制

主要方法:

  • markThreadExecution

    :當線程執行時,調用此方法,次數+1
  • markThreadCompletion

    :執行完,次數-1
  • markThreadRejection

    :command任務被線程池拒絕時,次數-1
  • getRollingCount

    :和上面不一樣,這裡委托的是

    RollingThreadPoolEventCounterStream

  • getCumulativeCount

擷取執行個體的方法同上。

HystrixCollapserMetrics

HystrixCollapserMetrics:

    private final HystrixCollapserKey collapserKey;
    private final HystrixCollapserProperties properties;

    private final RollingCollapserEventCounterStream rollingCollapserEventCounterStream;
    private final CumulativeCollapserEventCounterStream cumulativeCollapserEventCounterStream;
    private final RollingCollapserBatchSizeDistributionStream rollingCollapserBatchSizeDistributionStream;           

複制

略。

使用示例

在例子之前,需要提醒的是:以上方法雖然最終是委托給Stream去執行的,但是它們并不會有延遲,是立即的(因為Stream流一般都會有視窗,比如1s一次,500ms一次等等),但是,但是,但是它是getLatest哦,也就是拿最新的資料,官方解釋為:

// 同步調用以檢索上次計算的bucket而不用等待
// Synchronous call to retrieve the last calculated bucket without waiting for any emissions
public Output getLatest() { ... }           

複制

示例代碼:

@Test
public void fun1() throws InterruptedException {
    HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("CommandHelloWorld");
    HystrixCommandGroupKey commandGroupKey = HystrixCommandGroupKey.Factory.asKey("MyAppGroup");
    HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("MyAppGroup");
    HystrixPropertiesCommandDefault properties = new HystrixPropertiesCommandDefault(commandKey, HystrixCommandProperties.Setter());

    // command名額資訊
    HystrixCommandMetrics commandMetrics = HystrixCommandMetrics.getInstance(commandKey, commandGroupKey, threadPoolKey, properties);

    // 發送事件(發送多次)
    CommandHelloWorld helloWorld = new CommandHelloWorld("YoutBatman");
    helloWorld.execute();
    helloWorld = new CommandHelloWorld("YoutBatman");
    helloWorld.queue();
    // 走fallabck
    helloWorld = new CommandHelloWorld(null);
    helloWorld.queue();


    // 列印名額資訊
    TimeUnit.SECONDS.sleep(1); // 需要留給名額收集的時間
    System.out.println("===========commandMetrics資訊===========");
    System.out.println(commandMetrics.getRollingCount(HystrixEventType.SUCCESS));
    System.out.println(commandMetrics.getRollingCount(HystrixEventType.FAILURE));
    System.out.println(commandMetrics.getRollingCount(HystrixEventType.FALLBACK_SUCCESS));

    System.out.println(commandMetrics.getCumulativeCount(HystrixEventType.SUCCESS));
    System.out.println(commandMetrics.getCumulativeCount(HystrixEventType.FAILURE));
    System.out.println(commandMetrics.getCumulativeCount(HystrixEventType.FALLBACK_SUCCESS));


    System.out.println(commandMetrics.getHealthCounts());
    System.out.println(commandMetrics.getExecutionTimeMean());
}           

複制

運作程式控制台列印:

===========commandMetrics資訊===========
0
0
0
0
0
0
HealthCounts[1 / 3 : 33%]
0           

複制

說明:至于為何很多是0,這個和getLatest以及本地測試不好控有關,暫可忽略。

總結

關于Netflix Hystrix名額資料收集器:

HystrixMetrics

就介紹到這了,你可能會覺得它和前面講的xxxStream有非常多的功能相似之處。從源碼能看出,

HystrixMetrics

似乎是對

xxxStream

的一些包裝,内部事件最終都是委托給

xxxStream

去完成了的。

隻不過最大的差別是:

HystrixMetrics

所有的擷取名額資訊的方法,擷取的都是瞬時的(最新的)值,而并不需要等待,這是和流式統計計算最大的差別。

[享學Netflix] 二十六、Hystrix名額資料收集器:HystrixMetrics(HystrixDashboard的資料來源)