1. 概述
本文主要分享 斷路器 HystrixCircuitBreaker。
HystrixCircuitBreaker 有三種狀态 :
-
:關閉CLOSED
-
:打開OPEN
-
:半開HALF_OPEN
其中,斷路器處于
OPEN
狀态時,鍊路處于非健康狀态,指令執行時,直接調用回退邏輯,跳過正常邏輯。
HystrixCircuitBreaker 狀态變遷如下圖 :
- 紅線 :初始時,斷路器處于
狀态,鍊路處于健康狀态。當滿足如下條件,斷路器從CLOSED
變成CLOSED
狀态:OPEN
- 周期( 可配,
)内,總請求數超過一定量( 可配,HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms
) 。HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20
- 錯誤請求占總請求數超過一定比例( 可配,
) 。HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50%
- 周期( 可配,
- 綠線 :斷路器處于
狀态,指令執行時,若目前時間超過斷路器開啟時間一定時間(OPEN
),斷路器變成HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds = 5000 ms
狀态,嘗試調用正常邏輯,根據執行是否成功,打開或關閉熔斷器【藍線】。HALF_OPEN
2. HystrixCircuitBreaker
com.netflix.hystrix.HystrixCircuitBreaker
,Hystrix 斷路器接口。定義接口如下代碼 :
public interface HystrixCircuitBreaker {
/**
* Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not. It is idempotent and does
* not modify any internal state, and takes into account the half-open logic which allows some requests through
* after the circuit has been opened
*
* @return boolean whether a request should be permitted
*/
boolean allowRequest();
/**
* Whether the circuit is currently open (tripped).
*
* @return boolean state of circuit breaker
*/
boolean isOpen();
/**
* Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
*/
void markSuccess();
/**
* Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
*/
void markNonSuccess();
/**
* Invoked at start of command execution to attempt an execution. This is non-idempotent - it may modify internal
* state.
*/
boolean attemptExecution();
}
-
和#allowRequest()
方法,方法目的基本類似,差别在于當斷路器滿足嘗試關閉條件時,前者不會将斷路器不會修改狀态(#attemptExecution()
),而後者會。CLOSE => HALF-OPEN
HystrixCircuitBreaker 有兩個子類實作 :
- NoOpCircuitBreaker :空的斷路器實作,用于不開啟斷路器功能的情況。
- HystrixCircuitBreakerImpl :完整的斷路器實作。
在 AbstractCommand 建立時,初始化 HystrixCircuitBreaker ,代碼如下 :
/* package */abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
/**
* 斷路器
*/
protected final HystrixCircuitBreaker circuitBreaker;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
// ... 省略無關代碼
// 初始化 斷路器
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
// ... 省略無關代碼
}
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
if (enabled) {
if (fromConstructor == null) {
// get the default implementation of HystrixCircuitBreaker
return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
} else {
return fromConstructor;
}
} else {
return new NoOpCircuitBreaker();
}
}
}
- 當
時,即斷路器功能開啟,使用 Factory 獲得 HystrixCircuitBreakerImpl 對象。在 「3. HystrixCircuitBreaker.Factory」 詳細解析。HystrixCommandProperties.circuitBreakerEnabled = true
- 當
時,即斷路器功能關閉,建立 NoOpCircuitBreaker 對象。另外,NoOpCircuitBreaker 代碼簡單到腦殘,點選 連結 檢視實作。HystrixCommandProperties.circuitBreakerEnabled = false
3. HystrixCircuitBreaker.Factory
com.netflix.hystrix.HystrixCircuitBreaker.Factory
,HystrixCircuitBreaker 工廠,主要用于:
- 建立 HystrixCircuitBreaker 對象,目前隻建立 HystrixCircuitBreakerImpl 。
- HystrixCircuitBreaker 容器,基于 HystrixCommandKey 維護了 HystrixCircuitBreaker 單例對象 的映射。代碼如下 :
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
整體代碼灰常清晰,點選 連結 檢視代碼。
4. HystrixCircuitBreakerImpl
com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl
,完整的斷路器實作。
我們來逐個方法看看 HystrixCircuitBreakerImpl 的具體實作。
4.1 構造方法
構造方法,代碼如下 :
/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private final HystrixCommandProperties properties;
private final HystrixCommandMetrics metrics;
enum Status {
CLOSED, OPEN, HALF_OPEN
}
private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
private final AtomicLong circuitOpened = new AtomicLong(-1);
private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
Subscription s = subscribeToStream();
activeSubscription.set(s);
}
}
- Status 枚舉類,斷路器的三種狀态。
-
屬性,斷路器的狀态。status
-
屬性,斷路器打開,即狀态變成circuitOpened
的時間。OPEN
-
屬性,基于 Hystrix Metrics 對請求量統計 Observable 的訂閱,在 「4.2 #subscribeToStream()」 詳細解析。activeSubscription
4.2 #subscribeToStream()
#subscribeToStream()
方法,向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。代碼如下 :
private Subscription subscribeToStream() {
1: private Subscription subscribeToStream() {
2: /*
3: * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
4: */
5: return metrics.getHealthCountsStream()
6: .observe()
7: .subscribe(new Subscriber<HealthCounts>() {
8: @Override
9: public void onCompleted() {
10:
11: }
12:
13: @Override
14: public void onError(Throwable e) {
15:
16: }
17:
18: @Override
19: public void onNext(HealthCounts hc) {
20: System.out.println("totalRequests" + hc.getTotalRequests()); // 芋艿,用于調試
21: // check if we are past the statisticalWindowVolumeThreshold
22: if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
23: // we are not past the minimum volume threshold for the stat window,
24: // so no change to circuit status.
25: // if it was CLOSED, it stays CLOSED
26: // if it was half-open, we need to wait for a successful command execution
27: // if it was open, we need to wait for sleep window to elapse
28: } else {
29: if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
30: //we are not past the minimum error threshold for the stat window,
31: // so no change to circuit status.
32: // if it was CLOSED, it stays CLOSED
33: // if it was half-open, we need to wait for a successful command execution
34: // if it was open, we need to wait for sleep window to elapse
35: } else {
36: // our failure rate is too high, we need to set the state to OPEN
37: if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
38: circuitOpened.set(System.currentTimeMillis());
39: }
40: }
41: }
42: }
43: });
44: }
- 第 5 至 7 行 :向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。這裡的 Observable 基于 RxJava Window 操作符。
FROM 《ReactiveX文檔中文翻譯》「Window」
定期将來自原始 Observable 的資料分解為一個 Observable 視窗,發射這些視窗,而不是每次發射一項資料
- 簡單來說,固定間隔,
方法将不斷被調用,每次計算斷路器的狀态。#onNext()
- 簡單來說,固定間隔,
- 第 22 行 :判斷周期( 可配,
)内,總請求數超過一定量( 可配,HystrixCommandProperties.default_metricsRollingStatisticalWindow = 10000 ms
) 。HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20
- 這裡要注意下,請求次數統計的是周期内,超過周期的不計算在内。例如說,
内發起了 N 個請求,00:00
不計算這 N 個請求。00:11
- 這裡要注意下,請求次數統計的是周期内,超過周期的不計算在内。例如說,
- 第 29 行 :錯誤請求占總請求數超過一定比例( 可配,
) 。HystrixCommandProperties.circuitBreakerErrorThresholdPercentage = 50%
- 第 37 至 39 行 :滿足斷路器打開條件,CAS 修改狀态(
),并設定打開時間(CLOSED => OPEN
) 。circuitOpened
- 【補充】第 5 至 7 行 :😈 怕寫在上面,大家有壓力。Hystrix Metrics 對請求量統計 Observable 使用了兩種 RxJava Window 操作符 :
-
方法,固定周期( 可配,Observable#window(timespan, unit)
),發射 Observable 視窗。點選 BucketedCounterStream 構造方法 檢視調用處的代碼。HystrixCommandProperties.metricsHealthSnapshotIntervalInMilliseconds = 500 ms
-
方法,每發射一次(Observable#window(count, skip)
) Observable 忽略skip
( 可配,count
) 個資料項。為什麼?答案在第 22 行的代碼,周期内達到一定請求量是斷路器打開的一個條件。點選 BucketedRollingCounterStream 構造方法 檢視調用處的代碼。HystrixCommandProperties.circuitBreakerRequestVolumeThreshold = 20
-
目前該方法有兩處調用 :
- 「4.1 構造方法」,在建立 HystrixCircuitBreakerImpl 時,向 Hystrix Metrics 對請求量統計 Observable 的發起訂閱。固定間隔,計算斷路器是否要關閉(
)。CLOSE
- 「4.4 #markSuccess()」,清空 Hystrix Metrics 對請求量統計 Observable 的統計資訊,取消原有訂閱,并發起新的訂閱。
4.3 #attemptExecution()
如下是
AbstractCommand#applyHystrixSemantics(_cmd)
方法,對
HystrixCircuitBreakerImpl#attemptExecution
方法的調用的代碼 :
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// ... 省略無關代碼
/* determine if we're allowed to execute */
if (circuitBreaker.attemptExecution()) {
// 執行【正常邏輯】
} else {
// 執行【回退邏輯】
}
}
- 使用
方法,判斷是否可以執行正常邏輯。HystrixCircuitBreakerImpl#attemptExecution
#attemptExecution
方法,代碼如下 :
1: @Override
2: public boolean attemptExecution() {
3: // 強制 打開
4: if (properties.circuitBreakerForceOpen().get()) {
5: return false;
6: }
7: // 強制 關閉
8: if (properties.circuitBreakerForceClosed().get()) {
9: return true;
10: }
11: // 打開時間為空
12: if (circuitOpened.get() == -1) {
13: return true;
14: } else {
15: // 滿足間隔嘗試斷路器時間
16: if (isAfterSleepWindow()) {
17: //only the first request after sleep window should execute
18: //if the executing command succeeds, the status will transition to CLOSED
19: //if the executing command fails, the status will transition to OPEN
20: //if the executing command gets unsubscribed, the status will transition to OPEN
21: if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
22: return true;
23: } else {
24: return false;
25: }
26: } else {
27: return false;
28: }
29: }
30: }
- 第 4 至 6 行 :當
( 預設值 :HystrixCommandProperties.circuitBreakerForceOpen = true
) 時,即斷路器強制打開,傳回false
。當該配置接入配置中心後,可以動态實作打開熔斷。為什麼會有該配置?當 HystrixCircuitBreaker 建立完成後,無法動态切換 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通過該配置以實作類似效果。false
- 第 8 至 10 行 :當
( 預設值 :HystrixCommandProperties.circuitBreakerForceClose = true
) 時,即斷路器強制關閉,傳回false
。當該配置接入配置中心後,可以動态實作關閉熔斷。為什麼會有該配置?當 HystrixCircuitBreaker 建立完成後,無法動态切換 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通過該配置以實作類似效果。true
- 第 12 至 13 行 :斷路器打開時間(
) 為"空",傳回circuitOpened
。true
- 第 16 至 28 行 :調用
方法,判斷是否滿足嘗試調用正常邏輯的間隔時間。當滿足,使用 CAS 方式修改斷路器狀态(#isAfterSleepWindow()
),進而保證有且僅有一個線程能夠嘗試調用正常邏輯。OPEN => HALF_OPEN
#isAfterSleepWindow()
方法,代碼如下 :
private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
return currentTime > circuitOpenTime + sleepWindowTime;
}
- 在目前時間超過斷路器打開時間
( 預設值,HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds
),傳回5000 ms
。true
4.4 #markSuccess()
當嘗試調用正常邏輯成功時,調用
#markSuccess()
方法,關閉斷路器。代碼如下 :
1: @Override
2: public void markSuccess() {
3: if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
4: // 清空 Hystrix Metrics 對請求量統計 Observable 的**統計資訊**
5: //This thread wins the race to close the circuit - it resets the stream to start it over from 0
6: metrics.resetStream();
7: // 取消原有訂閱
8: Subscription previousSubscription = activeSubscription.get();
9: if (previousSubscription != null) {
10: previousSubscription.unsubscribe();
11: }
12: // 發起新的訂閱
13: Subscription newSubscription = subscribeToStream();
14: activeSubscription.set(newSubscription);
15: // 設定斷路器打開時間為空
16: circuitOpened.set(-1L);
17: }
18: }
- 第 3 行 :使用 CAS 方式,修改斷路器狀态(
)。HALF_OPEN => CLOSED
- 第 6 行 :清空 Hystrix Metrics 對請求量統計 Observable 的統計資訊。
- 第 8 至 14 行 :取消原有訂閱,發起新的訂閱。
- 第 16 行 :設定斷路器打開時間為"空" 。
如下兩處調用了
#markNonSuccess()
方法 :
-
markEmits
-
markOnCompleted
4.5 #markNonSuccess()
當嘗試調用正常邏輯失敗時,調用
#markNonSuccess()
方法,重新打開斷路器。代碼如下 :
1: @Override
2: public void markNonSuccess() {
3: if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
4: //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
5: circuitOpened.set(System.currentTimeMillis());
6: }
7: }
- 第 3 行 :使用 CAS 方式,修改斷路器狀态(
)。HALF_OPEN => OPEN
- 第 5 行 :設定設定斷路器打開時間為目前時間。這樣,
過一段時間,可以再次嘗試執行正常邏輯。#attemptExecution()
如下兩處調用了
#markNonSuccess()
方法 :
-
handleFallback
-
unsubscribeCommandCleanup
4.6 #allowRequest()
#allowRequest()
和
#attemptExecution()
方法,方法目的基本類似,差别在于當斷路器滿足嘗試關閉條件時,前者不會将斷路器不會修改狀态(
CLOSE => HALF-OPEN
),而後者會。點選 連結 檢視代碼實作。
4.7 #isOpen()
#isOpen()
方法,比較簡單,點選 連結 檢視代碼實作。