天天看點

從源碼看hystrix的工作原理

Hystrix是Netflix開源的一個限流熔斷的項目、主要有以下功能:

  • 隔離(線程池隔離和信号量隔離):限制調用分布式服務的資源使用,某一個調用的服務出現問題不會影響其他服務調用。
  • 優雅的降級機制:逾時降級、資源不足時(線程或信号量)降級,降級後可以配合降級接口傳回托底資料。
  • 融斷:當失敗率達到閥值自動觸發降級(如因網絡故障/逾時造成的失敗率高),熔斷器觸發的快速失敗會進行快速恢複。
  • 緩存:提供了請求緩存、請求合并實作。支援實時監控、報警、控制(修改配置)

下面是他的工作流程:

從源碼看hystrix的工作原理

Hystrix主要有4種調用方式:

  • toObservable() 方法 :未做訂閱,隻是傳回一個Observable 。
  • observe() 方法 :調用 #toObservable() 方法,并向 Observable 注冊 rx.subjects.ReplaySubject 發起訂閱。
  • queue() 方法 :調用 #toObservable() 方法的基礎上,調用:Observable#toBlocking() 和 BlockingObservable#toFuture() 傳回 Future 對象
  • execute() 方法 :調用 #queue() 方法的基礎上,調用 Future#get() 方法,同步傳回 #run() 的執行結果。

主要的執行邏輯:

1.每次調用建立一個新的HystrixCommand,把依賴調用封裝在run()方法中.

2.執行execute()/queue做同步或異步調用.

3.判斷熔斷器(circuit-breaker)是否打開,如果打開跳到步驟8,進行降級政策,如果關閉進入步驟.

4.判斷線程池/隊列/信号量是否跑滿,如果跑滿進入降級步驟8,否則繼續後續步驟.

5.調用HystrixCommand的run方法.運作依賴邏輯

依賴邏輯調用逾時,進入步驟8.

6.判斷邏輯是否調用成功。傳回成功調用結果;調用出錯,進入步驟8.

7.計算熔斷器狀态,所有的運作狀态(成功, 失敗, 拒絕,逾時)上報給熔斷器,用于統計進而判斷熔斷器狀态.

8.getFallback()降級邏輯。以下四種情況将觸發getFallback調用:

  • run()方法抛出非HystrixBadRequestException異常。
  • run()方法調用逾時
  • 熔斷器開啟攔截調用
  • 線程池/隊列/信号量是否跑滿
  • 沒有實作getFallback的Command将直接抛出異常,fallback降級邏輯調用成功直接傳回,降級邏輯調用失敗抛出異常.

9.傳回執行成功結果

下面結合源碼看一下:

在execute中調用queue的get方法,同步擷取傳回結果

public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
}
           

在queue中是調用toObservable()方法

public Future<R> queue() {
        final Future<R> delegate = toObservable().toBlocking().toFuture();
        //省略一些無關的代碼
};
           

在toObservale中先判斷是否在緩存中有,如果有的話則從緩存中取,沒有的話則進入applyHystrixSemantics

public Observable<R> toObservable() {
        //省略....
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                    IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                    //TODO make a new error type for this
                    throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
                }

                commandStartTimestamp = System.currentTimeMillis();

                if (properties.requestLogEnabled().get()) {
                    // log this command execution regardless of what happened
                    if (currentRequestLog != null) {
                        currentRequestLog.addExecutedCommand(_cmd);
                    }
                }

                final boolean requestCacheEnabled = isRequestCachingEnabled();
                final String cacheKey = getCacheKey();
                //先從緩存中擷取如果有的話直接傳回
                /* try from cache first */
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }

                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // put in cache
                if (requestCacheEnabled && cacheKey != null) {
                    // wrap it for caching
                    //裡面訂閱了,是以開始執行hystrixObservable
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    if (fromCache != null) {
                        // another thread beat us so we'll use the cached value instead
                        toCache.unsubscribe();
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    } else {
                        // we just created an ObservableCommand so we cast and return it
                        afterCache = toCache.toObservable();
                    }
                } else {
                    afterCache = hystrixObservable;
                }

                return afterCache
                        .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                        .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                        .doOnCompleted(fireOnCompletedHook);
            }
        });
    }
           

在applyHystrixSemantics中先判斷是否開啟熔斷器,如果開啟直接走失敗邏輯,接着嘗試擷取信号量執行個體,如果擷取不到,則也失敗(hystrix的隔離模式有信号量和線程池2種,這裡如果沒開啟信号量模式,這裡的TryableSemaphore tryAcquire預設傳回的都是true),接着在return 中進入executeCommandAndObserve

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        executionHook.onStart(_cmd);

        /* determine if we're allowed to execute */
        //判斷是否開啟熔斷器
        if (circuitBreaker.attemptExecution()) {
            //擷取信号量執行個體
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            //singleSemaphoreRelease可能在doOnTerminate、doOnUnsubscribe中重複調用
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };

            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };
            //嘗試擷取信号量
            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                //拒絕邏輯
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            //直接走失敗的邏輯
            return handleShortCircuitViaFallback();
        }
    }
           

在executeCommandAndObserve中進行一些逾時,及失敗的邏輯處理之後,進入executeCommandWithSpecifiedIsolation

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
        //省略...
        //失敗處理邏輯
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }

                    return handleFailureViaFallback(e);
                }
            }
        };
        //判斷是否開啟逾時設定
        if (properties.executionTimeoutEnabled().get()) {
           //list添加逾時操作
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }
           

在executeCommandWithSpecifiedIsolation,先判斷是否進行線程隔離,及一些狀态變化之後,進入getUserExecutionObservable

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
        //線程隔離
        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
            // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
            //當觀察者訂閱時,才建立Observable,并且針對每個觀察者建立都是一個新的Observable。Observable是傳回的
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    //狀态校驗
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }
                    //統計标記指令
                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

                    if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                        // the command timed out in the wrapping thread so we will return immediately
                        // and not increment any of the counters below or other such logic
                        return Observable.error(new RuntimeException("timed out before executing run()"));
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                        //we have not been unsubscribed, so should proceed
                        HystrixCounters.incrementGlobalConcurrentThreads();
                        threadPool.markThreadExecution();
                        // store the command that is being run
                        endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                        executionResult = executionResult.setExecutedInThread();
                        /**
                         * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
                         */
                        try {
                            executionHook.onThreadStart(_cmd);
                            executionHook.onRunStart(_cmd);
                            executionHook.onExecutionStart(_cmd);
                            return getUserExecutionObservable(_cmd);
                        } catch (Throwable ex) {
                            return Observable.error(ex);
                        }
                    } else {
                        //command has already been unsubscribed, so return immediately
                        return Observable.empty();
                    }
                }
            }).doOnTerminate(new Action0() {
                @Override
                public void call() {
                    if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                        handleThreadEnd(_cmd);
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                        //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
                    }
                    //if it was unsubscribed, then other cleanup handled it
                }
            }).doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                        handleThreadEnd(_cmd);
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                        //if it was never started and was cancelled, then no need to clean up
                    }
                    //if it was terminal, then other cleanup handled it
                }
            }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
            }));
        } else {
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }

                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                    // semaphore isolated
                    // store the command that is being run
                    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    try {
                        executionHook.onRunStart(_cmd);
                        executionHook.onExecutionStart(_cmd);
                        return getUserExecutionObservable(_cmd);  //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
                    } catch (Throwable ex) {
                        //If the above hooks throw, then use that as the result of the run method
                        return Observable.error(ex);
                    }
                }
            });
        }
    }
           

在getUserExecutionObservable和getExecutionObservable中,主要是封裝使用者定義的run方法

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
        Observable<R> userObservable;

        try {
            //擷取使用者定義邏輯的observable
            userObservable = getExecutionObservable();
        } catch (Throwable ex) {
            userObservable = Observable.error(ex);
        }

        return userObservable
                .lift(new ExecutionHookApplication(_cmd))
                .lift(new DeprecatedOnRunHookApplication(_cmd));
    }

final protected Observable<R> getExecutionObservable() {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                try {
                    //包裝定義的run方法
                    return Observable.just(run());
                } catch (Throwable ex) {
                    return Observable.error(ex);
                }
            }
        }).doOnSubscribe(new Action0() {
            @Override
            public void call() {
                // Save thread on which we get subscribed so that we can interrupt it later if needed
                executionThread.set(Thread.currentThread());
            }
        });
    }
           

繼續閱讀