天天看點

【你好Hystrix】八:Hystrix執行流程分析-toObservable前言HystrixObservabletoObservable總結

前言

我們這一節的内容就是對Hystrix的請求流程做一個分析,把前面的内容串聯起來。

HystrixObservable

該接口提供了兩個方法,這兩個方法大緻提供了一緻的功能,都是傳回一個

Observable

對象。

//提供饑餓模式的Observable 立馬回執行 HystrixCommand#queue()/execute()指令
 Observable<R> observe();
//提供lazy/defer延遲模式的Observable執行個體,隻有在訂閱了Observable之後,才惰性的開始執行指令
//所有執行方法的基石 一切都源于它
 Observable<R> toObservable();
           

toObservable

AbstractCommand

繼承

HystrixObservable

是以自然也就實作了

HystrixObservable

的兩個方法。

public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;
 //   ...由于代碼太長是以省略了變量的定義.....
    //下面這個Func0表達式是整個執行的核心 這裡隻是定義
    final Func0<Observable<R>> applyHystrixSemantics = () -> {
        if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
            return Observable.never();
        }
        return applyHystrixSemantics(_cmd);
    };
   
   //構造一個Observable
    return Observable.defer(() -> {
        /**
         * commandState 初始狀态是NOT_STARTED 将commandState改成OBSERVABLE_CHAIN_CREATED 設定失敗就抛出異常
         */
        if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
            IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
            throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
        }
        //記錄指令開始執行時間戳
        commandStartTimestamp = System.currentTimeMillis();
        if (properties.requestLogEnabled().get()) {
            if (currentRequestLog != null) {
                currentRequestLog.addExecutedCommand(_cmd);
            }
        }
        //可以通過hystrix.command.default.requestCache.enabled來配置請求緩存
        //這個可以對請求結果按照一定的政策緩存 具體的邏輯可以在子類中定義
        final boolean requestCacheEnabled = isRequestCachingEnabled();
        final String cacheKey = getCacheKey();
        //如果開啟緩存(開啟的條件不僅要配置 而且子類要重寫getCacheKey方法)
        if (requestCacheEnabled) {
            HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
            //如果緩存不為空就傳回緩存的值
            if (fromCache != null) {
                isResponseFromCache = true;
                return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
            }
        }
        //下面這個表達式是Hystrix執行的核心的 
        //applyHystrixSemantics在前面定義的一個Func0(傳回一個Observable) 
        //wrapWithAllOnNextHooks 執行鈎子方法 onComplete 和 onEmit 這些方法是在
        //HystrixCommandExecutionHook中定義的。
        Observable<R> hystrixObservable =
                Observable.defer(applyHystrixSemantics)
                        .map(wrapWithAllOnNextHooks);
        //到這裡 hystrixObservable就是包裝之後的流對象了 下面的邏輯就是對
        //hystrixObservable進行包裝緩存 為了下次命中緩存直接傳回
        Observable<R> afterCache;
        if (requestCacheEnabled && cacheKey != null) {
        //這裡就會通過 HystrixCachedObservable 來包裝結果 包裝成一個
        //HystrixCommandResponseFromCache 這部分内容我們在 緩存的地方有介紹
            HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
            HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
            if (fromCache != null) {
                toCache.unsubscribe();
                isResponseFromCache = true;
                return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
            } else {
                afterCache = toCache.toObservable();
            }
        } else {
            afterCache = hystrixObservable;
        }
        //這裡的doOnTerminate、doOnUnsubscribe、doOnCompleted在前面都有定義 
        //主要是對各種狀态的更改 以及鈎子方法的應用 主要的邏輯還是在applyHystrixSemantics中
        //到這裡的Observable已經是構造好了的對象
        return afterCache
                .doOnTerminate(terminateCommandCleanup) 
                .doOnUnsubscribe(unsubscribeCommandCleanup)
                .doOnCompleted(fireOnCompletedHook);
    });
}
           

上面的方法的核心邏輯在

applyHystrixSemantics

方法裡面。

applyHystrixSemantics

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        //執行指令開始執行的鈎子方法 可能有人會問 前面綁定了那麼多的鈎子方法 這裡怎麼才開始
        //start 因為前面綁定但是并沒有執行。 當有訂閱者訂閱 這裡才是開始執行的代碼邏輯
        executionHook.onStart(_cmd);
        /**
         * 嘗試執行 如果斷路器是打開的狀态 通過這個操作可以将其變為半開的狀态
         * 如果斷路器打開 這裡直接進入else  進入 fallback
         */
        if (circuitBreaker.attemptExecution()) {
            /**
             * 如果是信号量隔離  傳回TryableSemaphoreActual 根據設定的并發量來判斷是否
             * 能執行 如果不能執行 進入fallback
             * 如果是線程池隔離 傳回TryableSemaphoreNoOp  直接傳回true沒有任何操作
             * 
             */
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };

            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };

            //如果是線程池隔離這裡永遠是 true
            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    /**
                     * executeCommandAndObserve 方法是核心方法
                     */
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                //信号量執行的時候并發太大直接回退
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            //熔斷開啟直接回退
            return handleShortCircuitViaFallback();
        }
    }
           

繼續跟進到

executeCommandAndObserve

方法中:

executeCommandAndObserve

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();


        //主要是來對HystrixCommand和HystrixObservableCommand記錄的事件是不同的
        final Action1<R> markEmits = new Action1<R>() {
            @Override
            public void call(R r) {
            //HystrixCommand傳回的是false 是以不會記錄onNext産生的事件流 
            //HystrixObservableCommand則相反
                if (shouldOutputOnNextEvents()) {
                    executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                    eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
                }
             //HystrixCommand 對commandIsScalar的實作傳回的是true  
             //是以在執行完onNext之後 斷路器就會記錄成功的狀态 發送成功的事件流
             //而HystrixObservableCommand則相反 因為onNext方法執行完成之後 
             //需要等到onComplete才會記錄成功狀态 發送成功事件流  
                if (commandIsScalar()) {
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                    executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                    eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                    circuitBreaker.markSuccess();
                }
            }
        };

        //這個變量就是 onComplete需要執行的 是以這裡的判斷和markEmits是相反的 主要是為了
        //相容HystrixObservableCommand和HystrixCommand對成功行為的判斷标準
        final Action0 markOnCompleted = new Action0() {
            @Override
            public void call() {
                if (!commandIsScalar()) {
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                    executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                    eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                    circuitBreaker.markSuccess();
                }
            }
        };

       //執行失敗的邏輯定義 
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                //熔斷器标記失敗
                circuitBreaker.markNonSuccess();
                //根據異常類型來進入具體的fallback邏輯
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }
                    return handleFailureViaFallback(e);
                }
            }
        };

//這個操作也是為了父子線程公用一份HystrixRequestContext
        final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
            @Override
            public void call(Notification<? super R> rNotification) {
                setRequestContextIfNeeded(currentRequestContext);
            }
        };

        Observable<R> execution;
        /**
         * 如果逾時開啟  使用HystrixObservableTimeoutOperator來對Observable做逾時處理
         * 是以不管是信号量隔離還是線程池隔離都會走該邏輯進行逾時控制
         */
        if (properties.executionTimeoutEnabled().get()) {
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }
           

上面的方法主要是定義執行成功的處理邏輯和執行失敗的處理邏輯以及逾時的處理邏輯。

總結

大緻的流程主要是上面所述的三個方法

toObservable

applyHystrixSemantics

executeCommandAndObserve

。在分析源碼的時候緊靠着三個方法就能把整個流程理順,并且能把之前的知識點聯系起來。

【你好Hystrix】八:Hystrix執行流程分析-toObservable前言HystrixObservabletoObservable總結