SpringCloud Hystrix原理分析
-
- 前置說明
- hystrix中用到的rxjava方法(會的請跳過)
- hystrix 執行過程原理分析
- hystrix熔斷器資料統計 --擴充知識
- 總結
前置說明
本文源碼基于
hystrix1.5.18
hystrix中用到的rxjava方法(會的請跳過)
首先rxjava是響應式流的程式設計模型, 所有的角色分為Observable(觀察對象), Observer(觀察者, Subscription(訂閱), Subject(又是觀察對象又是觀察者), 所有的操作分為來源操作, 中間操作, 訂閱操作
- hystrix使用的來源操作有, 例如
- defer - 這個操作就是用于延遲加載觀察對象, 尤其是在觀察對象來源于網絡時
- hystrix使用的中間操作 例如
- map/flatMap - 用來對每個資料對象進行轉換 ;
- doOnxxx - 這個操作用于在對應的狀态中進行資料觀察, 不能做出修改, 比如
- doOnNext, 那麼就是在onNext的狀态下擷取流經的資料
- window - 這是一個視窗函數, 有兩種用途, 一種根據次數進行視窗分組, 一種根據時間進行建立口分組, 那麼hystrix中将兩者結合實作滑動視窗的資料統計. 具體是将完成的事件按照時間分區(假設每100ms), 然後再按照每10個分組, 那麼就能繼續每秒的請求數量統計了
- share - 使得流成為共享流, 即每次訂閱都不會重新建立來源, 而是在原有的流上一起消費
- onBackpressureDrop - 被壓的一種政策, 丢棄政策
- startWith - 添加一些資料到作為流最開始發送的資料
- onErrorResumeNext - 在發送onError事件是, 進行流的替換
- subscribeOn - 切換發生訂閱時及其之後執行的所線上程
- hystrix使用的訂閱操作 這個沒啥可以介紹的, 就是觸發整個流程的, 進行最後的資料消費
舉例
@Test
public void test1(){
//建立一個信号發送器
Observable<String> map = Observable.defer(()-> //defer主要是延遲作用
Observable.create((Observable.OnSubscribe<Integer>) subscriber -> {
subscriber.onStart();
subscriber.onNext(1);
subscriber.onError(new RuntimeException());
}))
.startWith(0) //設定最開始的資料是0
.doOnError(e-> System.err.println("error:"+e.getLocalizedMessage())) //出現異常信号時列印
.onErrorResumeNext(Observable.just(1, 2, 3)) //發送異常時 重新發送1,2,3
.doOnNext(e->System.out.println("next:"+e)) // 每次next信号都列印一下
.map(Object::toString) //轉成字元串
.subscribe(System.out::println); //訂閱并列印
// subject 既可以作為可觀察者, 又可以作為觀察者
PublishSubject<String> objectPublishSubject = PublishSubject.create();
map.subscribe(objectPublishSubject);
objectPublishSubject.subscribe(System.out::println, e -> System.err.println(e.getLocalizedMessage()));
}
hystrix 執行過程原理分析
接下來開始分析hystrix源碼, 首先我們需要找到入口函數, 是以先看我給出的demo
public class HelloServiceCommand extends HystrixCommand<String> {
private RestTemplate restTemplate;
protected HelloServiceCommand(String commandGroupKey, RestTemplate restTemplate) {
super(HystrixCommandGroupKey.Factory.asKey(commandGroupKey));
this.restTemplate = restTemplate;
}
@Override
protected String run() throws Exception {
System.out.println(Thread.currentThread().getName());
return restTemplate.getForEntity("http://HELLO-SERVICE/hello", String.class).getBody() ;
}
@Override
protected String getFallback() {
return "error";
}
@Override
protected String getCacheKey() {
return "hello";
}
}
這個就是原生的hystrix的使用, 是以我們可以看到run就是我們本身要執行的可能會失敗或者逾時的業務代碼, 然後getFallback是降級處理, 之後我們調用
command.queue().get()
就能擷取到結果.
那麼我們快速進入源代碼中, 會省略很多無關的代碼
首先是AbstractCommand, 這裡最終要的就是
toObservable
方法, 我們會看到很多匿名内部類, 我們隻講重要的匿名類
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
// 這個匿名内部類主要是在流 完成時的清掃工作, 以及發送完成的事件
final Action0 terminateCommandCleanup = xxx
// 這個匿名内部類主要是在流 取消時的清掃工作, 以及發送完成的事件
final Action0 unsubscribeCommandCleanup =xxx
// 這個匿名類相當重要, 裡面包含了熔斷邏輯判斷, 隔離邏輯判斷, 以及業務執行邏輯
final Func0<Observable<R>> applyHystrixSemantics =xxx
//... 跳過一大堆鈎子方法的代碼來到最後的return
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// ...省略無關代碼
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
if (requestCacheEnabled) { //是否開啟緩存
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
// 如果緩存命中 則直接傳回
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
// 沒有命中緩存則執行原來的hystrix處理鍊
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics) //這個匿名内部類上面有提到的是執行原有業務邏輯用的
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// 放入緩存
if (requestCacheEnabled && cacheKey != null) {
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup)
.doOnUnsubscribe(unsubscribeCommandCleanup)
.doOnCompleted(fireOnCompletedHook);
}
});
}
下面具體分析一下 applyHystrixSemantics
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// ...
// 判斷斷路器是否開啟
if (circuitBreaker.allowRequest()) {
//...
// 嘗試擷取信号量, 在信号量隔離模式下這裡的實作類是TryableSemaphoreActual, 否則是TryableSemaphoreNoOp
if (executionSemaphore.tryAcquire()) {
try {
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd) //下面具體分析
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else { // 超過并發數 直接降級處理
return handleSemaphoreRejectionViaFallback();
}
} else { // 斷路器已經開啟 直接降價處理
return handleShortCircuitViaFallback();
}
}
// 繼續分析一下executeCommandAndObserve
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
// 這個匿名内部類 主要在下面的onErrorResumeNext, 也就是發送異常信号時傳回一個錯誤的新流
final Func1<Throwable, Observable<R>> handleFallback = xx
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) { //是否開啟逾時判斷
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd); //下面具體分析
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
// 執行隔離政策
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
// 是否線程池隔離模式
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
...
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); //執行使用者的代碼
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
// ... 省略其他的doOnxxx
// 設定訂閱時的執行線程池, 同時也是實作線程艙壁隔離的地方, 這裡可以了解為把每次流的處理都會交給線程池執行.,如果線程池滿了,那麼就會觸發任務拒絕
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}} else {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
// ...
return getUserExecutionObservable(_cmd); //執行使用者代碼
} catch (Throwable ex) {
return Observable.error(ex);
}
}
});
}
}
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
userObservable = getExecutionObservable();
} catch (Throwable ex) {
userObservable = Observable.error(ex);
}
return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(run()); //這裡終于調用我們自己重寫的run了
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
executionThread.set(Thread.currentThread());
}
});
}
到此的話基本的執行流程就走完了, 其實所有的邏輯基本都在一個類裡面, 都是通過rxjava進行穿起來. 我基本跳過了所有不重要的分支, 直走主幹.
hystrix熔斷器資料統計 --擴充知識
首先要HystrixCircuitBreaker.isOpen()是用來判斷是否符合熔斷, AbstractCommand#handleCommandEnd()是用來發送完成的事件。 那麼hystrix 是如何接受事件然後生成資料給熔斷器進行判斷呢?
這裡直接給出流的源頭和流經的地方
根據圖的流向, 我們可以判斷出HystrixCircuitBreaker.isOpen()最終是通過BucketedCounterStream的counterSubject來擷取滑動視窗的技術, 而AbstractCommand#handleCommandEnd()是發送給HystrixCommandCompletionStream的writeOnlySubject來完成事件的通知
最後給出一個基于rxjava的實作滑動視窗計算的demo, 了解這個demo之後, 再結合上面的資料流轉就可以明白hystrix計數對的原理了。
@Test
public void testWindow(){
Flux.interval(Duration.ofMillis(300))
.window(Duration.ofMillis(500L))
.flatMap(e -> e.count()) //計算每個500ms内的數量
.window(2)
.flatMap(e -> e.reduce((a, b) -> a + b)) 、、計算每2個視窗内的數量
.subscribe(e -> System.out.println(new Date()+":"+e));
}
總結
看完上面後, 下面是我梳理的hystrix 的執行流程, 看看是否和你的閱讀結果一緻
建立Command -> 調用execute方法 -> 判斷是否命中緩存 -> 判斷是否熔斷 -> 判斷是否超過隔離數量 -> 執行代碼 -> 是否執行逾時 -> 傳回執行結果/ 傳回降級結果
為啥hystrix官方不維護了?
首先是netflix自己是覺得功能比較完善了, 那麼沒必要繼續疊代什麼新的功能. 但是我發現hystrix給予rxjava1.x的版本, 實際上現在rxjava都已經3.x 了, 而各個版本之間的差異較大, 如果hystrix要繼續疊代, 那麼還需要将rxjava版本逐漸疊代上去, 這個就過于麻煩, 可能會有很多問題存在.