天天看點

SpringCloud - Hystrix的執行流程(上)0 Hystrix執行原理圖1 建立HystrixCommand/HystrixObservableCommand2 調用command的執行方法

0 Hystrix執行原理圖

SpringCloud - Hystrix的執行流程(上)0 Hystrix執行原理圖1 建立HystrixCommand/HystrixObservableCommand2 調用command的執行方法

1 建立HystrixCommand/HystrixObservableCommand

一個HystrixCommand或HystrixObservableCommand對象,代表對某個依賴服務發起的一次請求或者調用

構造的時候,可在構造器中傳入任何需要的參數。

HystrixCommand僅傳回一個結果的調用。

HystrixObservableCommand可能會傳回多條結果的調用。

SpringCloud - Hystrix的執行流程(上)0 Hystrix執行原理圖1 建立HystrixCommand/HystrixObservableCommand2 調用command的執行方法

直接繼承HystrixCommand并實作run方法即可。

public class GetUserAccountCommand extends HystrixCommand<UserAccount> {
    ...
    @Override
    protected UserAccount run() {
        /* 模拟執行網絡調用以檢索使用者資訊 */
        try {
            Thread.sleep((int) (Math.random() * 10) + 2);
        } catch (InterruptedException e) {
        }

        /* 5%的時間失敗來說明fallback的工作原理 */
        if (Math.random() > 0.95) {
            throw new RuntimeException("random failure processing UserAccount network response");
        }

        /* 延遲會增加5%的時間,是以有時會觸發逾時 */
        if (Math.random() > 0.95) {
            // 随機等待時間尖峰
            try {
                Thread.sleep((int) (Math.random() * 300) + 25);
            } catch (InterruptedException e) {
            }
        }

        /* 成功...使用遠端服務響應的資料建立UserAccount */
        return new UserAccount(86975, "John James", 2, true, false, true);
    }
    ...
}
      

2 調用command的執行方法

執行Command就可以發起一次對依賴服務的調用

要執行Command,需要在4個方法中選擇其中的一個

  • 前兩種是HystrixCommand獨有的哦
SpringCloud - Hystrix的執行流程(上)0 Hystrix執行原理圖1 建立HystrixCommand/HystrixObservableCommand2 調用command的執行方法

2.1 execute()

/**
     * 用于同步執行 command.
     * 
     * @return R
     *         如果command由于任何原因失敗,則執行 #run 或從 #getFallback() fallback的結果.
     *
     * @throws HystrixRuntimeException
     *             如果發生故障并且無法檢索fallback
     * @throws HystrixBadRequestException
     *             如果使用了無效的參數或狀态來表示使用者故障,而不是系統故障
     *
     * @throws IllegalStateException
     *             如果多次調用
     */
    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }      

調用後直接阻塞,直到依賴服務傳回單條結果,或抛異常

2.2 queue()

/**
     * 用于異步執行指令 command.
     *
     * 這将使該command線上程池上排隊,并在完成後傳回一個 Future 以擷取結果.
     * 注意:如果配置為不在單獨的線程中運作,則其效果與 #execute() 相同,并會阻塞.
     * 不會抛出異常,而隻是切換為同步執行,是以無需更改代碼即可 将command從運作在單獨的線程切換到調用線程.
     * (switch a command from running on a separate thread to the calling thread.)
     * 
     * @return {@code Future <R>}執行 #run() 的結果,或者如果command由于任何原因失敗,則傳回 #getFallback() 的結果.
     * @throws HystrixRuntimeException
     *             如果不存在fallback
     *             如果通過 ExecutionException#getCause() 中的{@code Future.get(), 如果不存在失敗發生的話
     *             或者如果無法将指令排隊(如,短路,線程池/信号被拒絕),則立即傳回
     * @throws HystrixBadRequestException
     *         通過 ExecutionException#getCause() 中的 Future.get() 如果使用了無效的參數或狀态來表示使用者故障而不是系統故障
     * @throws IllegalStateException
     *             如果多次調用
     */
    public Future<R> queue() {
        /*
         * 當Future.cancel(boolean)的“ mayInterrupt”标志設為true時
         * 由Observable.toBlocking().toFuture() 傳回的Future不實作執行線程的中斷
         * 是以,為了遵守Future的約定,我們必須圍繞它.
         */
        final Future<R> delegate = toObservable().toBlocking().toFuture();
        
        final Future<R> f = new Future<R>() {

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                if (delegate.isCancelled()) {
                    return false;
                }

                if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                    /*
                     * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
                     * than that interruption request cannot be taken back.
                     * 這裡唯一有效的轉換是false -> true.
                     * 如果存在由該指令建立(這很奇怪,但從未禁止過)的兩個futures,例如f1和f2,
                     * 并且對f1.cancel(true)和f2.cancel(false)的調用是由不同的線程發起,
                     * 尚不清楚在檢查mayInterruptOnCancel時将使用什麼值.
                     * 處理這種情況的最一緻的方法是說,如果在中斷的情況下調用了任何cancellation,則無法撤回該中斷請求.
                     */
                    interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
                }

                final boolean res = delegate.cancel(interruptOnFutureCancel.get());

                if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                    final Thread t = executionThread.get();
                    if (t != null && !t.equals(Thread.currentThread())) {
                        t.interrupt();
                    }
                }

                return res;
            }

            @Override
            public boolean isCancelled() {
                return delegate.isCancelled();
            }

            @Override
            public boolean isDone() {
                return delegate.isDone();
            }

            @Override
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();
            }

            @Override
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);
            }
            
        };

        /* 對立即抛出的錯誤狀态的特殊處理 */
        if (f.isDone()) {
            try {
                f.get();
                return f;
            } catch (Exception e) {
                Throwable t = decomposeException(e);
                if (t instanceof HystrixBadRequestException) {
                    return f;
                } else if (t instanceof HystrixRuntimeException) {
                    HystrixRuntimeException hre = (HystrixRuntimeException) t;
                    switch (hre.getFailureType()) {
                    case COMMAND_EXCEPTION:
                    case TIMEOUT:
                        // 不會僅從 queue().get() 中将這些類型從 queue() 中抛出, 因為它們是執行錯誤
                        return f;
                    default:
                        // these are errors we throw from queue() as they as rejection type errors
                        // 這些是從 queue() 抛出的錯誤,因為它們是拒絕類型錯誤
                        throw hre;
                    }
                } else {
                    throw Exceptions.sneakyThrow(t);
                }
            }
        }

        return f;
    }      

調用,傳回一個Future,後面可以通過Future擷取單條結果

2.3 observe()

訂閱一個Observable對象,Observable代表的是依賴服務傳回的結果,擷取到一個那個代表結果的Observable對象的拷貝對象

toObservable()

傳回一個Observable對象,如果我們訂閱這個對象,就會執行command并且擷取傳回結果

其中execute()和queue()僅對HystrixCommand适用

K             value   = command.execute();
Future<K>     fValue  = command.queue();
Observable<K> ohValue = command.observe();         
Observable<K> ocValue = command.toObservable();          

execute()實際上會調用queue().get()

SpringCloud - Hystrix的執行流程(上)0 Hystrix執行原理圖1 建立HystrixCommand/HystrixObservableCommand2 調用command的執行方法

在 queue() 方法中,會調用toObservable().toBlocking().toFuture()

SpringCloud - Hystrix的執行流程(上)0 Hystrix執行原理圖1 建立HystrixCommand/HystrixObservableCommand2 調用command的執行方法

即,無論是哪種執行command的方式,最終都是依賴

toObservable()

也就是說同步的HystrixCommand最終都會依賴Observable,盡管HystrixCommand是用來發射單個事件的

繼續閱讀