天天看點

SpringCloud Hystrix原理分析

SpringCloud Hystrix原理分析

    • 前置說明
    • hystrix中用到的rxjava方法(會的請跳過)
    • hystrix 執行過程原理分析
    • hystrix熔斷器資料統計 --擴充知識
    • 總結

前置說明

本文源碼基于

hystrix1.5.18

hystrix中用到的rxjava方法(會的請跳過)

首先rxjava是響應式流的程式設計模型, 所有的角色分為Observable(觀察對象), Observer(觀察者, Subscription(訂閱), Subject(又是觀察對象又是觀察者), 所有的操作分為來源操作, 中間操作, 訂閱操作

  • hystrix使用的來源操作有, 例如
    • defer - 這個操作就是用于延遲加載觀察對象, 尤其是在觀察對象來源于網絡時
  • hystrix使用的中間操作 例如
    • map/flatMap - 用來對每個資料對象進行轉換 ;
    • doOnxxx - 這個操作用于在對應的狀态中進行資料觀察, 不能做出修改, 比如
      • doOnNext, 那麼就是在onNext的狀态下擷取流經的資料
    • window - 這是一個視窗函數, 有兩種用途, 一種根據次數進行視窗分組, 一種根據時間進行建立口分組, 那麼hystrix中将兩者結合實作滑動視窗的資料統計. 具體是将完成的事件按照時間分區(假設每100ms), 然後再按照每10個分組, 那麼就能繼續每秒的請求數量統計了
    • share - 使得流成為共享流, 即每次訂閱都不會重新建立來源, 而是在原有的流上一起消費
    • onBackpressureDrop - 被壓的一種政策, 丢棄政策
    • startWith - 添加一些資料到作為流最開始發送的資料
    • onErrorResumeNext - 在發送onError事件是, 進行流的替換
    • subscribeOn - 切換發生訂閱時及其之後執行的所線上程
  • hystrix使用的訂閱操作 這個沒啥可以介紹的, 就是觸發整個流程的, 進行最後的資料消費

舉例

@Test
    public void test1(){
      //建立一個信号發送器
     Observable<String> map = Observable.defer(()-> //defer主要是延遲作用
                Observable.create((Observable.OnSubscribe<Integer>) subscriber -> {
            subscriber.onStart();
            subscriber.onNext(1);
            subscriber.onError(new RuntimeException());

        }))
                .startWith(0) //設定最開始的資料是0
                .doOnError(e-> System.err.println("error:"+e.getLocalizedMessage())) //出現異常信号時列印
                .onErrorResumeNext(Observable.just(1, 2, 3)) //發送異常時 重新發送1,2,3
                .doOnNext(e->System.out.println("next:"+e)) // 每次next信号都列印一下
                .map(Object::toString) //轉成字元串
                .subscribe(System.out::println); //訂閱并列印

 
 // subject 既可以作為可觀察者, 又可以作為觀察者
        PublishSubject<String> objectPublishSubject = PublishSubject.create();
        map.subscribe(objectPublishSubject);
        objectPublishSubject.subscribe(System.out::println, e -> System.err.println(e.getLocalizedMessage()));

    }
           

hystrix 執行過程原理分析

接下來開始分析hystrix源碼, 首先我們需要找到入口函數, 是以先看我給出的demo

public class HelloServiceCommand extends HystrixCommand<String> {

    private RestTemplate restTemplate;

    protected HelloServiceCommand(String commandGroupKey, RestTemplate restTemplate) {
        super(HystrixCommandGroupKey.Factory.asKey(commandGroupKey));
        this.restTemplate = restTemplate;
    }

    @Override
    protected String run() throws Exception {
        System.out.println(Thread.currentThread().getName());
        return restTemplate.getForEntity("http://HELLO-SERVICE/hello", String.class).getBody()  ;
    }

    @Override
    protected String getFallback() {
        return "error";
    }

    @Override
    protected String getCacheKey() {
        return "hello";
    }
}
           

這個就是原生的hystrix的使用, 是以我們可以看到run就是我們本身要執行的可能會失敗或者逾時的業務代碼, 然後getFallback是降級處理, 之後我們調用

command.queue().get()

就能擷取到結果.

那麼我們快速進入源代碼中, 會省略很多無關的代碼

首先是AbstractCommand, 這裡最終要的就是

toObservable

方法, 我們會看到很多匿名内部類, 我們隻講重要的匿名類

public Observable<R> toObservable() {
     	final AbstractCommand<R> _cmd = this;
	// 這個匿名内部類主要是在流 完成時的清掃工作, 以及發送完成的事件
	 	final Action0 terminateCommandCleanup = xxx
	// 這個匿名内部類主要是在流 取消時的清掃工作, 以及發送完成的事件	
		final Action0 unsubscribeCommandCleanup =xxx
	// 這個匿名類相當重要, 裡面包含了熔斷邏輯判斷, 隔離邏輯判斷, 以及業務執行邏輯
		final Func0<Observable<R>> applyHystrixSemantics =xxx
	//... 跳過一大堆鈎子方法的代碼來到最後的return
		return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                // ...省略無關代碼
                final boolean requestCacheEnabled = isRequestCachingEnabled();
                final String cacheKey = getCacheKey();
                if (requestCacheEnabled) { //是否開啟緩存
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        // 如果緩存命中 則直接傳回
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }
                	
                // 沒有命中緩存則執行原來的hystrix處理鍊
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics) //這個匿名内部類上面有提到的是執行原有業務邏輯用的
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // 放入緩存
                if (requestCacheEnabled && cacheKey != null) {
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    if (fromCache != null) {
                        toCache.unsubscribe();
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    } else {  
                        afterCache = toCache.toObservable();
                    }
                } else {
                    afterCache = hystrixObservable;
                }

                return afterCache 
                        .doOnTerminate(terminateCommandCleanup)   
                        .doOnUnsubscribe(unsubscribeCommandCleanup)
                        .doOnCompleted(fireOnCompletedHook);
            }
        });
	}
           

下面具體分析一下 applyHystrixSemantics

final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                    return Observable.never();
                }
                return applyHystrixSemantics(_cmd);
            }
        };

	private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // ...
        // 判斷斷路器是否開啟
        if (circuitBreaker.allowRequest()) {
            //...
            // 嘗試擷取信号量, 在信号量隔離模式下這裡的實作類是TryableSemaphoreActual, 否則是TryableSemaphoreNoOp
            if (executionSemaphore.tryAcquire()) {
                try {
              
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd) //下面具體分析
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else { // 超過并發數 直接降級處理
                return handleSemaphoreRejectionViaFallback();
            }
        } else { // 斷路器已經開啟 直接降價處理
            return handleShortCircuitViaFallback();
        }
    }

	// 繼續分析一下executeCommandAndObserve
 	private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {

		// 這個匿名内部類 主要在下面的onErrorResumeNext, 也就是發送異常信号時傳回一個錯誤的新流
		final Func1<Throwable, Observable<R>> handleFallback = xx

		 Observable<R> execution;
        if (properties.executionTimeoutEnabled().get()) { //是否開啟逾時判斷
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd); //下面具體分析
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);

	}
	
	// 執行隔離政策
 	private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
	 // 是否線程池隔離模式
 		if (properties.executionIsolationStrategy().get() == 		ExecutionIsolationStrategy.THREAD) {
 	 		return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    ...
                  
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                        HystrixCounters.incrementGlobalConcurrentThreads();
                        threadPool.markThreadExecution();
                        endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                        executionResult = executionResult.setExecutedInThread();
                        try {
                            executionHook.onThreadStart(_cmd);
                            executionHook.onRunStart(_cmd);
                            executionHook.onExecutionStart(_cmd);
                            return getUserExecutionObservable(_cmd); //執行使用者的代碼
                        } catch (Throwable ex) {
                            return Observable.error(ex);
                        }
                    } else {
                        return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                    }
				// ... 省略其他的doOnxxx
				 // 設定訂閱時的執行線程池, 同時也是實作線程艙壁隔離的地方, 這裡可以了解為把每次流的處理都會交給線程池執行.,如果線程池滿了,那麼就會觸發任務拒絕
                 }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
		}} else {
		 return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                  // ...
                        return getUserExecutionObservable(_cmd); //執行使用者代碼
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                }
            });
        }

	}

   private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
        Observable<R> userObservable;
        try {
            userObservable = getExecutionObservable();
        } catch (Throwable ex) {
            userObservable = Observable.error(ex);
        }
        return userObservable
                .lift(new ExecutionHookApplication(_cmd))
                .lift(new DeprecatedOnRunHookApplication(_cmd));
    }


 	final protected Observable<R> getExecutionObservable() {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                try {
                    return Observable.just(run()); //這裡終于調用我們自己重寫的run了
                } catch (Throwable ex) {
                    return Observable.error(ex);
                }
            }
        }).doOnSubscribe(new Action0() {
            @Override
            public void call() {
                executionThread.set(Thread.currentThread());
            }
        });
    }
           

到此的話基本的執行流程就走完了, 其實所有的邏輯基本都在一個類裡面, 都是通過rxjava進行穿起來. 我基本跳過了所有不重要的分支, 直走主幹.

hystrix熔斷器資料統計 --擴充知識

首先要HystrixCircuitBreaker.isOpen()是用來判斷是否符合熔斷, AbstractCommand#handleCommandEnd()是用來發送完成的事件。 那麼hystrix 是如何接受事件然後生成資料給熔斷器進行判斷呢?

這裡直接給出流的源頭和流經的地方

SpringCloud Hystrix原理分析

根據圖的流向, 我們可以判斷出HystrixCircuitBreaker.isOpen()最終是通過BucketedCounterStream的counterSubject來擷取滑動視窗的技術, 而AbstractCommand#handleCommandEnd()是發送給HystrixCommandCompletionStream的writeOnlySubject來完成事件的通知

最後給出一個基于rxjava的實作滑動視窗計算的demo, 了解這個demo之後, 再結合上面的資料流轉就可以明白hystrix計數對的原理了。

@Test
    public void testWindow(){
        Flux.interval(Duration.ofMillis(300))
                .window(Duration.ofMillis(500L))
                .flatMap(e -> e.count()) //計算每個500ms内的數量
                .window(2)
                .flatMap(e -> e.reduce((a, b) -> a + b)) 、、計算每2個視窗内的數量
                .subscribe(e -> System.out.println(new Date()+":"+e));
    }

           

總結

看完上面後, 下面是我梳理的hystrix 的執行流程, 看看是否和你的閱讀結果一緻

建立Command -> 調用execute方法 -> 判斷是否命中緩存 -> 判斷是否熔斷 -> 判斷是否超過隔離數量 -> 執行代碼 -> 是否執行逾時 -> 傳回執行結果/ 傳回降級結果

為啥hystrix官方不維護了?

首先是netflix自己是覺得功能比較完善了, 那麼沒必要繼續疊代什麼新的功能. 但是我發現hystrix給予rxjava1.x的版本, 實際上現在rxjava都已經3.x 了, 而各個版本之間的差異較大, 如果hystrix要繼續疊代, 那麼還需要将rxjava版本逐漸疊代上去, 這個就過于麻煩, 可能會有很多問題存在.

繼續閱讀