天天看點

【SpringCloud技術專題】「Hystrix技術分析」故障切換的運作流程(含源碼)

背景介紹

目前對于一些非核心操作,如增減庫存後儲存記錄檔發送異步消息時(具體業務流程),一旦出現MQ服務異常時,會導緻接口響應逾時,是以可以考慮對非核心操作引入服務降級、服務隔離。

Hystrix說明

官方文檔

Hystrix是Netflix開源的一個容災架構,解決當外部依賴故障時拖垮業務系統、甚至引起雪崩的問題。

為什麼需要Hystrix?

  • 在大中型分布式系統中,通常系統很多依賴(HTTP,hession,Netty,Dubbo等),在高并發通路下,這些依賴的穩定性與否對系統的影響非常大,但是依賴有很多不可控問題:如網絡連接配接緩慢,資源繁忙,暫時不可用,服務脫機等。
  • 當依賴阻塞時,大多數伺服器的線程池就出現阻塞(BLOCK),影響整個線上服務的穩定性,在複雜的分布式架構的應用程式有很多的依賴,都會不可避免地在某些時候失敗。高并發的依賴失敗時如果沒有隔離措施,目前應用服務就有被拖垮的風險。
例如:一個依賴30個SOA服務的系統,每個服務99.99%可用。

99.99%的30次方 ≈ 99.7%

0.3% 意味着一億次請求 會有 3,000,00次失敗

換算成時間大約每月有2個小時服務不穩定.

随着服務依賴數量的變多,服務不穩定的機率會成指數性提高.

解決問題方案:對依賴做隔離。           

Hystrix設計理念

想要知道如何使用,必須先明白其核心設計理念,Hystrix基于指令模式,通過UML圖先直覺的認識一下這一設計模式。
【SpringCloud技術專題】「Hystrix技術分析」故障切換的運作流程(含源碼)
  • 可見,Command是在Receiver和Invoker之間添加的中間層,Command實作了對Receiver的封裝。
  • API既可以是Invoker又可以是reciever,通過繼承Hystrix核心類HystrixCommand來封裝這些API(例如,遠端接口調用,資料庫查詢之類可能會産生延時的操作)。
  • 就可以為API提供彈性保護了。

Hystrix如何解決依賴隔離

  1. Hystrix使用指令模式HystrixCommand(Command)包裝依賴調用邏輯,每個指令在單獨線程中/信号授權下執行。
  2. 可配置依賴調用逾時時間,逾時時間一般設為比99.5%平均時間略高即可。當調用逾時時,直接傳回或執行fallback邏輯。
  3. 為每個依賴提供一個小的線程池(或信号),如果線程池已滿調用将被立即拒絕,預設不采用排隊,加速失敗判定時間。
  4. 依賴調用結果分,成功,失敗(抛出異常),逾時,線程拒絕,短路。 請求失敗(異常,拒絕,逾時,短路)時執行fallback(降級)邏輯。
  5. 提供熔斷器元件,可以自動運作或手動調用,停止目前依賴一段時間(10秒),熔斷器預設錯誤率門檻值為50%,超過将自動運作。
  6. 提供近實時依賴的統計和監控

Hystrix流程結構解析

【SpringCloud技術專題】「Hystrix技術分析」故障切換的運作流程(含源碼)

流程說明:

  1. 每次調用建構HystrixCommand或者HystrixObservableCommand對象,把依賴調用封裝在run()方法中.
  2. 結果是否有緩存如果沒有執行execute()/queue做sync或async調用,對應真正的run()/construct()
  3. 判斷熔斷器(circuit-breaker)是否打開,如果打開跳到步驟8,進行降級政策,如果關閉進入步驟.
  4. 判斷線程池/隊列/信号量是否跑滿,如果跑滿進入降級步驟8,否則繼續後續步驟.
  5. 使用HystrixObservableCommand.construct()還是HystrixCommand.run(),運作依賴邏輯
  6. 依賴邏輯調用逾時,進入步驟8
  7. 判斷邏輯是否調用成功
    • 6a 傳回成功調用結果
    • 6b 調用出錯,進入步驟8.
  8. 計算熔斷器狀态,所有的運作狀态(成功, 失敗, 拒絕,逾時)上報給熔斷器,用于統計進而判斷熔斷器狀态.
  9. getFallback()降級邏輯.

    a. 沒有實作getFallback的Command将直接抛出異常

    b. fallback降級邏輯調用成功直接傳回

    c. 降級邏輯調用失敗抛出異常

  10. 傳回執行成功結果

以下四種情況将觸發getFallback調用:

  1. run()方法抛出非HystrixBadRequestException異常。
  2. run()方法調用逾時
  3. 熔斷器開啟短路調用
  4. 線程池/隊列/信号量是否跑滿

熔斷器:Circuit Breaker

每個熔斷器預設維護10個bucket,每秒一個bucket,每個bucket記錄成功,失敗,逾時,拒絕的狀态,預設錯誤超過50%且10秒内超過20個請求進行中斷短路。
【SpringCloud技術專題】「Hystrix技術分析」故障切換的運作流程(含源碼)

Hystrix隔離分析

Hystrix隔離方式采用線程/信号的方式,通過隔離限制依賴的并發量和阻塞擴散.

線程隔離

  • 執行依賴代碼的線程與請求線程(如:jetty線程)分離,請求線程可以自由控制離開的時間(異步過程)。
  • 通過線程池大小可以控制并發量,當線程池飽和時可以提前拒絕服務,防止依賴問題擴散。
  • 線上建議線程池不要設定過大,否則大量堵塞線程有可能會拖慢伺服器。
實際案例:
Netflix公司内部認為線程隔離開銷足夠小,不會造成重大的成本或性能的影響。Netflix 内部API 每天100億的HystrixCommand依賴請求使用線程隔,每個應用大約40多個線程池,每個線程池大約5-20個線程。

信号隔離

信号隔離也可以用于限制并發通路,防止阻塞擴散, 與線程隔離最大不同在于執行依賴代碼的線程依然是請求線程(該線程需要通過信号申請),如果用戶端是可信的且可以快速傳回,可以使用信号隔離替換線程隔離,降低開銷。

信号量的大小可以動态調整, 線程池大小不可以。

線程隔離與信号隔離差別如下圖:

【SpringCloud技術專題】「Hystrix技術分析」故障切換的運作流程(含源碼)

fallback故障切換降級機制

有興趣的小夥伴可以看看:官方參考文檔

源碼分析

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

executeCommandAndObserve

/**
     * This decorates "Hystrix" functionality around the run() Observable.
     * @return R
     */
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        //......
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable,
            Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain
                     * HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }
                    return handleFailureViaFallback(e);
                }
            }
        };
        //......
        Observable<R> execution;
        if (properties.executionTimeoutEnabled().get()) {
            execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }
        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }           

使用Observable的onErrorResumeNext,裡頭調用了handleFallback,handleFallback中區分不同的異常來調用不同的fallback。

  • RejectedExecutionException調用handleThreadPoolRejectionViaFallback
  • HystrixTimeoutException調用handleTimeoutViaFallback
  • 非HystrixBadRequestException的調用handleFailureViaFallback

applyHystrixSemantics

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // mark that we're starting execution on the ExecutionHook
        // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
        executionHook.onStart(_cmd);
        /* determine if we're allowed to execute */
        if (circuitBreaker.attemptExecution()) {
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };
            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };
            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            return handleShortCircuitViaFallback();
        }
    }           
  • applyHystrixSemantics方法針對executionSemaphore.tryAcquire()沒通過的調用
  • handleSemaphoreRejectionViaFallback
  • applyHystrixSemantics方法針對circuitBreaker.attemptExecution()沒通過的調用handleShortCircuitViaFallback()
ViaFallback方法
private Observable<R> handleSemaphoreRejectionViaFallback() {
        Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
        executionResult = executionResult.setExecutionException(semaphoreRejectionException);
        eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);
        logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it
        // retrieve a fallback or throw an exception if no fallback available
        return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
                "could not acquire a semaphore for execution", semaphoreRejectionException);
    }

    private Observable<R> handleShortCircuitViaFallback() {
        // record that we are returning a short-circuited fallback
        eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
        // short-circuit and go directly to fallback (or throw an exception if no fallback implemented)
        Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
        executionResult = executionResult.setExecutionException(shortCircuitException);
        try {
            return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
                    "short-circuited", shortCircuitException);
        } catch (Exception e) {
            return Observable.error(e);
        }
    }

    private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) {
        eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
        threadPool.markThreadRejection();
        // use a fallback instead (or throw exception if not implemented)
        return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", underlying);
    }

    private Observable<R> handleTimeoutViaFallback() {
        return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
    }

    private Observable<R> handleFailureViaFallback(Exception underlying) {
        /**
         * All other error handling
         */
        logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying);

        // report failure
        eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);

        // record the exception
        executionResult = executionResult.setException(underlying);
        return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
    }           
  • handleSemaphoreRejectionViaFallback、handleShortCircuitViaFallback、handleThreadPoolRejectionViaFallback、handleTimeoutViaFallback、handleFailureViaFallback這幾個方法調用了getFallbackOrThrowException
  • 其eventType分别是SEMAPHORE_REJECTED、SHORT_CIRCUITED、THREAD_POOL_REJECTED、TIMEOUT、FAILURE
  • AbstractCommand.getFallbackOrThrowException
/**
     * Execute <code>getFallback()</code> within protection of a semaphore that limits number of concurrent executions.
     * <p>
     * Fallback implementations shouldn't perform anything that can be blocking, but we protect against it anyways in case someone doesn't abide by the contract.
     * <p>
     * If something in the <code>getFallback()</code> implementation is latent (such as a network call) then the semaphore will cause us to start rejecting requests rather than allowing potentially
     * all threads to pile up and block.
     *
     * @return K
     * @throws UnsupportedOperationException
     *             if getFallback() not implemented
     * @throws HystrixRuntimeException
     *             if getFallback() fails (throws an Exception) or is rejected by the semaphore
     */
    private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
        final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
        // record the executionResult
        // do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144)
        executionResult = executionResult.addEvent((int) latency, eventType);

        if (isUnrecoverable(originalException)) {
            logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);

            /* executionHook for all errors */
            Exception e = wrapWithOnErrorHook(failureType, originalException);
            return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));
        } else {
            if (isRecoverableError(originalException)) {
                logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
            }

            if (properties.fallbackEnabled().get()) {
                /* fallback behavior is permitted so attempt */

                final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
                    @Override
                    public void call(Notification<? super R> rNotification) {
                        setRequestContextIfNeeded(requestContext);
                    }
                };

                final Action1<R> markFallbackEmit = new Action1<R>() {
                    @Override
                    public void call(R r) {
                        if (shouldOutputOnNextEvents()) {
                            executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT);
                            eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey);
                        }
                    }
                };
                final Action0 markFallbackCompleted = new Action0() {
                    @Override
                    public void call() {
                        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                        eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);
                        executionResult = executionResult.addEvent((int) latency, 
                        HystrixEventType.FALLBACK_SUCCESS);
                    }
                };
                final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
                    @Override
                    public Observable<R> call(Throwable t) {
                        /* executionHook for all errors */
                        Exception e = wrapWithOnErrorHook(failureType, originalException);
                        Exception fe = getExceptionFromThrowable(t);

                        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                        Exception toEmit;

                        if (fe instanceof UnsupportedOperationException) {
                            logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it
                            eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
                            executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);

                            toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe);
                        } else {
                            logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
                            eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
                            executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);

                            toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe);
                        }

                        // NOTE: we're suppressing fallback exception here
                        if (shouldNotBeWrapped(originalException)) {
                            return Observable.error(e);
                        }

                        return Observable.error(toEmit);
                    }
                };

                final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
                final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
                final Action0 singleSemaphoreRelease = new Action0() {
                    @Override
                    public void call() {
                        if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                            fallbackSemaphore.release();
                        }
                    }
                };

                Observable<R> fallbackExecutionChain;

                // acquire a permit
                if (fallbackSemaphore.tryAcquire()) {
                    try {
                        if (isFallbackUserDefined()) {
                            executionHook.onFallbackStart(this);
                            fallbackExecutionChain = getFallbackObservable();
                        } else {
                            //same logic as above without the hook invocation
                            fallbackExecutionChain = getFallbackObservable();
                        }
                    } catch (Throwable ex) {
                        //If hook or user-fallback throws, then use that as the result of the fallback lookup
                        fallbackExecutionChain = Observable.error(ex);
                    }

                    return fallbackExecutionChain
                            .doOnEach(setRequestContext)
                            .lift(new FallbackHookApplication(_cmd))
                            .lift(new DeprecatedOnFallbackHookApplication(_cmd))
                            .doOnNext(markFallbackEmit)
                            .doOnCompleted(markFallbackCompleted)
                            .onErrorResumeNext(handleFallbackError)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } else {
                   return handleFallbackRejectionByEmittingError();
                }
            } else {
                return handleFallbackDisabledByEmittingError(originalException, failureType, message);
            }
        }
    }           
  • fallbackExecutionChain的onErrorResumeNext,調用了handleFallbackError
  • fallbackExecutionChain的doOnCompleted,調用了markFallbackCompleted
  • AbstractCommand.getFallbackSemaphore
/**
     * Get the TryableSemaphore this HystrixCommand should use if a fallback occurs.
     * 
     * @return TryableSemaphore
     */
    protected TryableSemaphore getFallbackSemaphore() {
        if (fallbackSemaphoreOverride == null) {
            TryableSemaphore _s = fallbackSemaphorePerCircuit.get(commandKey.name());
            if (_s == null) {
                // we didn't find one cache so setup
                fallbackSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.fallbackIsolationSemaphoreMaxConcurrentRequests()));
                // assign whatever got set (this or another thread)
                return fallbackSemaphorePerCircuit.get(commandKey.name());
            } else {
                return _s;
            }
        } else {
            return fallbackSemaphoreOverride;
        }
    }           
fallback源碼分析小結
  • SEMAPHORE_REJECTED對應handleSemaphoreRejectionViaFallback
  • SHORT_CIRCUITED對應handleShortCircuitViaFallback
  • THREAD_POOL_REJECTED對應handleThreadPoolRejectionViaFallback
  • TIMEOUT對應handleTimeoutViaFallback
  • FAILURE對應handleFailureViaFallback
  • 這幾個方法最後都調用了getFallbackOrThrowException方法。

繼續閱讀