天天看點

Hystrix 源碼解析 —— 斷路器 HystrixCircuitBreaker1. 概述2. HystrixCircuitBreaker3. HystrixCircuitBreaker.Factory4. HystrixCircuitBreakerImpl

1. 概述

本文主要分享 斷路器 HystrixCircuitBreaker。

HystrixCircuitBreaker 有三種狀态 :

  • CLOSED

     :關閉
  • OPEN

     :打開
  • HALF_OPEN

     :半開

其中,斷路器處于 

OPEN

 狀态時,鍊路處于非健康狀态,指令執行時,直接調用回退邏輯,跳過正常邏輯。

HystrixCircuitBreaker 狀态變遷如下圖 :

Hystrix 源碼解析 —— 斷路器 HystrixCircuitBreaker1. 概述2. HystrixCircuitBreaker3. HystrixCircuitBreaker.Factory4. HystrixCircuitBreakerImpl
  • 紅線 :初始時,斷路器處于 

    CLOSED

     狀态,鍊路處于健康狀态。當滿足如下條件,斷路器從 

    CLOSED

     變成 

    OPEN

     狀态:
    • 周期( 可配,

      HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms

       )内,總請求數超過一定量( 可配,

      HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20

       ) 。
    • 錯誤請求占總請求數超過一定比例( 可配,

      HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50%

       ) 。
  • 綠線 :斷路器處于 

    OPEN

     狀态,指令執行時,若目前時間超過斷路器開啟時間一定時間( 

    HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds = 5000 ms

     ),斷路器變成 

    HALF_OPEN

     狀态,嘗試調用正常邏輯,根據執行是否成功,打開或關閉熔斷器【藍線】。

2. HystrixCircuitBreaker

com.netflix.hystrix.HystrixCircuitBreaker

 ,Hystrix 斷路器接口。定義接口如下代碼 :

public interface HystrixCircuitBreaker {

    /**
     * Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not.  It is idempotent and does
     * not modify any internal state, and takes into account the half-open logic which allows some requests through
     * after the circuit has been opened
     * 
     * @return boolean whether a request should be permitted
     */
    boolean allowRequest();

    /**
     * Whether the circuit is currently open (tripped).
     * 
     * @return boolean state of circuit breaker
     */
    boolean isOpen();

    /**
     * Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
     */
    void markSuccess();

    /**
     * Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
     */
    void markNonSuccess();

    /**
     * Invoked at start of command execution to attempt an execution.  This is non-idempotent - it may modify internal
     * state.
     */
    boolean attemptExecution();
}
           
  • #allowRequest()

     和 

    #attemptExecution()

     方法,方法目的基本類似,差别在于當斷路器滿足嘗試關閉條件時,前者不會将斷路器不會修改狀态( 

    CLOSE => HALF-OPEN

     ),而後者會。

HystrixCircuitBreaker 有兩個子類實作 :

  • NoOpCircuitBreaker :空的斷路器實作,用于不開啟斷路器功能的情況。
  • HystrixCircuitBreakerImpl :完整的斷路器實作。

在 AbstractCommand 建立時,初始化 HystrixCircuitBreaker ,代碼如下 :

/* package */abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    
    /**
     * 斷路器
     */
    protected final HystrixCircuitBreaker circuitBreaker;

    protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
            HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
            HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
            HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

        // ... 省略無關代碼
        
        // 初始化 斷路器
        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
        // ... 省略無關代碼
    }

    private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
                                                            HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
                                                            HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        if (enabled) {
            if (fromConstructor == null) {
                // get the default implementation of HystrixCircuitBreaker
                return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
            } else {
                return fromConstructor;
            }
        } else {
            return new NoOpCircuitBreaker();
        }
    }

}
           
  • 當 

    HystrixCommandProperties.circuitBreakerEnabled = true

     時,即斷路器功能開啟,使用 Factory 獲得 HystrixCircuitBreakerImpl 對象。在 「3. HystrixCircuitBreaker.Factory」 詳細解析。
  • 當 

    HystrixCommandProperties.circuitBreakerEnabled = false

     時,即斷路器功能關閉,建立 NoOpCircuitBreaker 對象。另外,NoOpCircuitBreaker 代碼簡單到腦殘,點選 連結 檢視實作。

3. HystrixCircuitBreaker.Factory

com.netflix.hystrix.HystrixCircuitBreaker.Factory

 ,HystrixCircuitBreaker 工廠,主要用于:

  • 建立 HystrixCircuitBreaker 對象,目前隻建立 HystrixCircuitBreakerImpl 。
  • HystrixCircuitBreaker 容器,基于 HystrixCommandKey 維護了 HystrixCircuitBreaker 單例對象 的映射。代碼如下 :
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
           

整體代碼灰常清晰,點選 連結 檢視代碼。

4. HystrixCircuitBreakerImpl

com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl

 ,完整的斷路器實作。

我們來逐個方法看看 HystrixCircuitBreakerImpl 的具體實作。

4.1 構造方法

構造方法,代碼如下 :

/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    private final HystrixCommandProperties properties;
    private final HystrixCommandMetrics metrics;

    enum Status {
        CLOSED, OPEN, HALF_OPEN
    }

    private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
    private final AtomicLong circuitOpened = new AtomicLong(-1);
    private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

    protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        this.properties = properties;
        this.metrics = metrics;

        //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
        Subscription s = subscribeToStream();
        activeSubscription.set(s);
    }
}
           
  • Status 枚舉類,斷路器的三種狀态。
  • status

     屬性,斷路器的狀态。
  • circuitOpened

     屬性,斷路器打開,即狀态變成 

    OPEN

     的時間。
  • activeSubscription

     屬性,基于 Hystrix Metrics 對請求量統計 Observable 的訂閱,在 「4.2 #subscribeToStream()」 詳細解析。

4.2 #subscribeToStream()

#subscribeToStream()

 方法,向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。代碼如下 :

private Subscription subscribeToStream() {
  1: private Subscription subscribeToStream() {
  2:     /*
  3:      * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
  4:      */
  5:     return metrics.getHealthCountsStream()
  6:             .observe()
  7:             .subscribe(new Subscriber<HealthCounts>() {
  8:                 @Override
  9:                 public void onCompleted() {
 10: 
 11:                 }
 12: 
 13:                 @Override
 14:                 public void onError(Throwable e) {
 15: 
 16:                 }
 17: 
 18:                 @Override
 19:                 public void onNext(HealthCounts hc) {
 20:                     System.out.println("totalRequests" + hc.getTotalRequests()); // 芋艿,用于調試
 21:                     // check if we are past the statisticalWindowVolumeThreshold
 22:                     if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
 23:                         // we are not past the minimum volume threshold for the stat window,
 24:                         // so no change to circuit status.
 25:                         // if it was CLOSED, it stays CLOSED
 26:                         // if it was half-open, we need to wait for a successful command execution
 27:                         // if it was open, we need to wait for sleep window to elapse
 28:                     } else {
 29:                         if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
 30:                             //we are not past the minimum error threshold for the stat window,
 31:                             // so no change to circuit status.
 32:                             // if it was CLOSED, it stays CLOSED
 33:                             // if it was half-open, we need to wait for a successful command execution
 34:                             // if it was open, we need to wait for sleep window to elapse
 35:                         } else {
 36:                             // our failure rate is too high, we need to set the state to OPEN
 37:                             if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
 38:                                 circuitOpened.set(System.currentTimeMillis());
 39:                             }
 40:                         }
 41:                     }
 42:                 }
 43:             });
 44: }
           
  • 第 5 至 7 行 :向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。這裡的 Observable 基于 RxJava Window 操作符。

    FROM 《ReactiveX文檔中文翻譯》「Window」

    定期将來自原始 Observable 的資料分解為一個 Observable 視窗,發射這些視窗,而不是每次發射一項資料

    • 簡單來說,固定間隔,

      #onNext()

       方法将不斷被調用,每次計算斷路器的狀态。
  • 第 22 行 :判斷周期( 可配,

    HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms

     )内,總請求數超過一定量( 可配,

    HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20

     ) 。
    • 這裡要注意下,請求次數統計的是周期内,超過周期的不計算在内。例如說,

      00:00

       内發起了 N 個請求,

      00:11

       不計算這 N 個請求。
  • 第 29 行 :錯誤請求占總請求數超過一定比例( 可配,

    HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50%

     ) 。
  • 第 37 至 39 行 :滿足斷路器打開條件,CAS 修改狀态( 

    CLOSED => OPEN

     ),并設定打開時間( 

    circuitOpened

     ) 。
  • 【補充】第 5 至 7 行 :😈 怕寫在上面,大家有壓力。Hystrix Metrics 對請求量統計 Observable 使用了兩種 RxJava Window 操作符 :
    • Observable#window(timespan, unit)

       方法,固定周期( 可配,

      HystrixCommandProperties.metricsHealthSnapshotIntervalInMilliseconds = 500 ms

       ),發射 Observable 視窗。點選 BucketedCounterStream 構造方法 檢視調用處的代碼。
    • Observable#window(count, skip)

       方法,每發射一次(

      skip

      ) Observable 忽略 

      count

       ( 可配,

      HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20

       ) 個資料項。為什麼?答案在第 22 行的代碼,周期内達到一定請求量是斷路器打開的一個條件。點選 BucketedRollingCounterStream 構造方法 檢視調用處的代碼。

目前該方法有兩處調用 :

  • 「4.1 構造方法」,在建立 HystrixCircuitBreakerImpl 時,向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。固定間隔,計算斷路器是否要關閉( 

    CLOSE

     )。
  • 「4.4 #markSuccess()」,清空 Hystrix Metrics 對請求量統計 Observable 的統計資訊,取消原有訂閱,并發起新的訂閱。

4.3 #attemptExecution()

如下是 

AbstractCommand#applyHystrixSemantics(_cmd)

 方法,對 

HystrixCircuitBreakerImpl#attemptExecution

 方法的調用的代碼 :

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

    // ...  省略無關代碼

   /* determine if we're allowed to execute */
   if (circuitBreaker.attemptExecution()) {
        // 執行【正常邏輯】
   } else {
        // 執行【回退邏輯】
   }
}
           
  • 使用 

    HystrixCircuitBreakerImpl#attemptExecution

     方法,判斷是否可以執行正常邏輯。

#attemptExecution

 方法,代碼如下 :

1: @Override
 2: public boolean attemptExecution() {
 3:     // 強制 打開
 4:     if (properties.circuitBreakerForceOpen().get()) {
 5:         return false;
 6:     }
 7:     // 強制 關閉
 8:     if (properties.circuitBreakerForceClosed().get()) {
 9:         return true;
10:     }
11:     // 打開時間為空
12:     if (circuitOpened.get() == -1) {
13:         return true;
14:     } else {
15:         // 滿足間隔嘗試斷路器時間
16:         if (isAfterSleepWindow()) {
17:             //only the first request after sleep window should execute
18:             //if the executing command succeeds, the status will transition to CLOSED
19:             //if the executing command fails, the status will transition to OPEN
20:             //if the executing command gets unsubscribed, the status will transition to OPEN
21:             if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
22:                 return true;
23:             } else {
24:                 return false;
25:             }
26:         } else {
27:             return false;
28:         }
29:     }
30: }
           
  • 第 4 至 6 行 :當 

    HystrixCommandProperties.circuitBreakerForceOpen = true

     ( 預設值 :

    false

    ) 時,即斷路器強制打開,傳回 

    false

     。當該配置接入配置中心後,可以動态實作打開熔斷。為什麼會有該配置?當 HystrixCircuitBreaker 建立完成後,無法動态切換 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通過該配置以實作類似效果。
  • 第 8 至 10 行 :當 

    HystrixCommandProperties.circuitBreakerForceClose = true

     ( 預設值 :

    false

    ) 時,即斷路器強制關閉,傳回 

    true

     。當該配置接入配置中心後,可以動态實作關閉熔斷。為什麼會有該配置?當 HystrixCircuitBreaker 建立完成後,無法動态切換 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通過該配置以實作類似效果。
  • 第 12 至 13 行 :斷路器打開時間( 

    circuitOpened

     ) 為"空",傳回 

    true

     。
  • 第 16 至 28 行 :調用 

    #isAfterSleepWindow()

     方法,判斷是否滿足嘗試調用正常邏輯的間隔時間。當滿足,使用 CAS 方式修改斷路器狀态( 

    OPEN => HALF_OPEN

     ),進而保證有且僅有一個線程能夠嘗試調用正常邏輯。

#isAfterSleepWindow()

 方法,代碼如下 :

private boolean isAfterSleepWindow() {
    final long circuitOpenTime = circuitOpened.get();
    final long currentTime = System.currentTimeMillis();
    final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
    return currentTime > circuitOpenTime + sleepWindowTime;
}
           
  • 在目前時間超過斷路器打開時間 

    HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds

     ( 預設值,

    5000 ms

     ),傳回 

    true

     。

4.4 #markSuccess()

當嘗試調用正常邏輯成功時,調用 

#markSuccess()

 方法,關閉斷路器。代碼如下 :

1: @Override
 2: public void markSuccess() {
 3:     if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
 4:         // 清空 Hystrix Metrics 對請求量統計 Observable 的**統計資訊**
 5:         //This thread wins the race to close the circuit - it resets the stream to start it over from 0
 6:         metrics.resetStream();
 7:         // 取消原有訂閱
 8:         Subscription previousSubscription = activeSubscription.get();
 9:         if (previousSubscription != null) {
10:             previousSubscription.unsubscribe();
11:         }
12:         // 發起新的訂閱
13:         Subscription newSubscription = subscribeToStream();
14:         activeSubscription.set(newSubscription);
15:         // 設定斷路器打開時間為空
16:         circuitOpened.set(-1L);
17:     }
18: }
           
  • 第 3 行 :使用 CAS 方式,修改斷路器狀态( 

    HALF_OPEN => CLOSED

     )。
  • 第 6 行 :清空 Hystrix Metrics 對請求量統計 Observable 的統計資訊。
  • 第 8 至 14 行 :取消原有訂閱,發起新的訂閱。
  • 第 16 行 :設定斷路器打開時間為"空" 。

如下兩處調用了 

#markNonSuccess()

 方法 :

  • markEmits

  • markOnCompleted

4.5 #markNonSuccess()

當嘗試調用正常邏輯失敗時,調用 

#markNonSuccess()

 方法,重新打開斷路器。代碼如下 :

1: @Override
2: public void markNonSuccess() {
3:     if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
4:         //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
5:         circuitOpened.set(System.currentTimeMillis());
6:     }
7: }
           
  • 第 3 行 :使用 CAS 方式,修改斷路器狀态( 

    HALF_OPEN => OPEN

     )。
  • 第 5 行 :設定設定斷路器打開時間為目前時間。這樣,

    #attemptExecution()

     過一段時間,可以再次嘗試執行正常邏輯。

如下兩處調用了 

#markNonSuccess()

 方法 :

  • handleFallback

  • unsubscribeCommandCleanup

4.6 #allowRequest()

#allowRequest()

 和 

#attemptExecution()

 方法,方法目的基本類似,差别在于當斷路器滿足嘗試關閉條件時,前者不會将斷路器不會修改狀态( 

CLOSE => HALF-OPEN

 ),而後者會。點選 連結 檢視代碼實作。

4.7 #isOpen()

#isOpen()

 方法,比較簡單,點選 連結 檢視代碼實作。

繼續閱讀