天天看點

Hystrix 源碼解析 —— 指令執行(二)之執行隔離政策1. 概述2. HystrixThreadPoolProperties3. HystrixThreadPoolKey4. HystrixConcurrencyStrategy5. HystrixThreadPool6. HystrixScheduler

1. 概述

本文主要分享 Hystrix 指令執行(二)之執行隔離政策。

建議 :對 RxJava 已經有一定的了解的基礎上閱讀本文。

Hystrix 提供兩種執行隔離政策( ExecutionIsolationStrategy ) :

  • SEMAPHORE

     :信号量,指令在調用線程執行。在《Hystrix 源碼解析 —— 指令執行(一)之正常執行邏輯》「3. TryableSemaphore」 已經詳細解析。
  • THREAD

     :線程池,指令線上程池執行。在《Hystrix 源碼解析 —— 指令執行(一)之正常執行邏輯》「5. #executeCommandWithSpecifiedIsolation(…)」 的 

    #executeCommandWithSpecifiedIsolation(...)

     方法中,調用 

    Observable#subscribeOn(Scheduler)

     方法,指定在 RxJava Scheduler 執行。
    • 如果你暫時不了解 Scheduler ,可以閱讀 《RxJava 源碼解析 —— Scheduler》 。
    • 如果你暫時不了解 

      Observable#subscribeOn(Scheduler)

       ,可以閱讀 《RxJava 源碼解析 —— Observable#subscribeOn(Scheduler)》 。

兩種方式的優缺點比較,推薦閱讀 《【翻譯】Hystrix文檔-實作原理》「依賴隔離」。

2. HystrixThreadPoolProperties

com.netflix.hystrix.HystrixThreadPoolProperties

 ,Hystrix 線程池屬性配置抽象類,點選 連結 檢視,已添加中文注釋說明。

com.netflix.hystrix.strategy.properties.HystrixPropertiesThreadPoolDefault

 ,Hystrix 線程池配置實作類,點選 連結 檢視。實際上沒什麼内容,官方如是說 :

Default implementation of {@link HystrixThreadPoolProperties} using Archaius (https://github.com/Netflix/archaius)

3. HystrixThreadPoolKey

com.netflix.hystrix.HystrixThreadPoolKey

 ,Hystrix 線程池辨別接口。

FROM HystrixThreadPoolKey 接口注釋

A key to represent a {@link HystrixThreadPool} for monitoring, metrics publishing, caching and other such uses.

This interface is intended to work natively with Enums so that implementing code can be an enum that implements this interface.

  • 直白的說 ,希望通過相同的 

    name

     ( 辨別 ) 獲得同 HystrixThreadPoolKey 對象。通過在内部維持一個 

    name

     與 HystrixThreadPoolKey 對象的映射,以達到枚舉的效果。

HystrixThreadPoolKey 代碼如下 :

1: public interface HystrixThreadPoolKey extends HystrixKey {
 2:     class Factory {
 3:         private Factory() {
 4:         }
 5: 
 6:         // used to intern instances so we don't keep re-creating them millions of times for the same key
 7:         private static final InternMap<String, HystrixThreadPoolKey> intern
 8:                 = new InternMap<String, HystrixThreadPoolKey>(
 9:                 new InternMap.ValueConstructor<String, HystrixThreadPoolKey>() {
10:                     @Override
11:                     public HystrixThreadPoolKey create(String key) {
12:                         return new HystrixThreadPoolKeyDefault(key);
13:                     }
14:                 });
15: 
16:         public static HystrixThreadPoolKey asKey(String name) {
17:            return intern.interned(name);
18:         }
19: 
20:         private static class HystrixThreadPoolKeyDefault extends HystrixKeyDefault implements HystrixThreadPoolKey {
21:             public HystrixThreadPoolKeyDefault(String name) {
22:                 super(name);
23:             }
24:         }
25: 
26:         /* package-private */ static int getThreadPoolCount() {
27:             return intern.size();
28:         }
29:     }
           
  • HystrixThreadPoolKey 實作 

    com.netflix.hystrix.HystrixKey

     接口,點選 連結 檢視。該接口定義的 

    #name()

     方法,即是上文我們所說的辨別( Key )。
  • intern

     屬性,

    name

     與 HystrixThreadPoolKey 對象的映射,以達到枚舉的效果。
    • com.netflix.hystrix.util.InternMap

       ,點選 連結 檢視帶中文注釋的代碼。
  • #asKey(name)

     方法,從 

    intern

     獲得 HystrixThreadPoolKey 對象。
  • #getThreadPoolCount()

     方法,獲得 HystrixThreadPoolKey 數量。

在 AbstractCommand 構造方法裡,初始化指令的 

threadPoolKey

 屬性,代碼如下 :

protected final HystrixThreadPoolKey threadPoolKey;

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
       HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
       HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
       HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

   // ... 省略無關代碼

   this.commandGroup = initGroupKey(group);
   this.commandKey = initCommandKey(key, getClass());
   this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
   // 初始化 threadPoolKey
   this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());

}
           
  • 調用 

    #initThreadPoolKey(...)

     方法,建立最終的 

    threadPoolKey

     屬性。代碼如下 : 
private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
   if (threadPoolKeyOverride == null) {
       // we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
       if (threadPoolKey == null) {
           /* use HystrixCommandGroup if HystrixThreadPoolKey is null */
           return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
       } else {
           return threadPoolKey;
       }
   } else { // threadPoolKeyOverride 可覆寫屬性
       // we have a property defining the thread-pool so use it instead
       return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
   }
}
           
    • 優先級 :

      threadPoolKeyOverride

       > 

      threadPoolKey

       > 

      groupKey

4. HystrixConcurrencyStrategy

com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy

 ,Hystrix 并發政策抽象類。

HystrixConcurrencyStrategy#getThreadPool(...)

 方法,代碼如下 :

1: public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
 2:     final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
 3: 
 4:     final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
 5:     final int dynamicCoreSize = threadPoolProperties.coreSize().get();
 6:     final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
 7:     final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
 8: 
 9:     final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
10: 
11:     if (allowMaximumSizeToDivergeFromCoreSize) {
12:         final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
13:         if (dynamicCoreSize > dynamicMaximumSize) {
14:             logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
15:                     dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
16:                     dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
17:             return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
18:         } else {
19:             return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
20:         }
21:     } else {
22:         return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
23:     }
24: }
           
  • 第 2 行 :調用 

    #getThreadFactory(...)

     方法,獲得 ThreadFactory 。點選 連結 檢視方法代碼。
    • PlatformSpecific#getAppEngineThreadFactory()

       方法,無需細看,适用于 Google App Engine 場景。
  • 第 4 至 7 行 :「2. HystrixThreadPoolProperties」 有詳細解析。
  • 第 9 行 :調用 

    #getBlockingQueue()

     方法,獲得線程池的阻塞隊列。點選 連結 檢視方法代碼。
    • 當 

      maxQueueSize <= 0

       時( 預設值 :

      -1

       ) 時,使用 SynchronousQueue 。超過線程池的 

      maximumPoolSize

       時,送出任務被拒絕。
      • 《Java并發包中的同步隊列SynchronousQueue實作原理》
    • 當 

      SynchronousQueue > 0

       時,使用 LinkedBlockingQueue 。超過線程池的 

      maximumPoolSize

       時,任務被拒絕。超過線程池的 

      maximumPoolSize

       + 線程池隊列的 

      maxQueueSize

       時,送出任務被阻塞等待。
      • 《Java阻塞隊列ArrayBlockingQueue和LinkedBlockingQueue實作原理分析》
        • 推薦 :《聊聊并發(三)——JAVA線程池的分析和使用》
        • 推薦 :《聊聊并發(七)——Java中的阻塞隊列》
  • 第 11 至 23 行 :建立 ThreadPoolExecutor 。看起來代碼比較多,根據 

    allowMaximumSizeToDivergeFromCoreSize

     的情況,計算線程池的 

    maximumPoolSize

     屬性。計算的方式和 

    HystrixThreadPoolProperties#actualMaximumSize()

     方法是一緻的。

com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault

 ,Hystrix 并發政策實作類。代碼如下( 基本沒做啥 ) :

public class HystrixConcurrencyStrategyDefault extends HystrixConcurrencyStrategy {

    /**
     * 單例
     */
    private static HystrixConcurrencyStrategyDefault INSTANCE = new HystrixConcurrencyStrategyDefault();

    public static HystrixConcurrencyStrategy getInstance() {
        return INSTANCE;
    }

    private HystrixConcurrencyStrategyDefault() {
    }

}
           

在 AbstractCommand 構造方法裡,初始化指令的 

threadPoolKey

 屬性,代碼如下 :

protected final HystrixConcurrencyStrategy concurrencyStrategy;

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
       HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
       HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
       HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

    // ... 省略無關代碼
   
    // 初始化 并發政策
    this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
   
}
           
  • HystrixPlugins ,Hystrix 插件體系,https://github.com/Netflix/Hystrix/wiki/Plugins 有詳細解析。
  • 調用 

    HystrixPlugins#getConcurrencyStrategy()

     獲得 HystrixConcurrencyStrategy 對象。預設情況下,使用 HystrixConcurrencyStrategyDefault 。當然你也可以參考 Hystrix 插件體系,實作自定義的 HystrixConcurrencyStrategy 實作,以達到覆寫 

    #getThreadPool()

    #getBlockingQueue()

     等方法。點選 連結 檢視該方法代碼。

5. HystrixThreadPool

com.netflix.hystrix.HystrixThreadPool

 ,Hystrix 線程池接口。當 Hystrix 指令使用 

THREAD

 執行隔離政策時,

HystrixCommand#run()

 方法線上程池執行。點選 連結 檢視。HystrixThreadPool 定義接口如下 :

  • #getExecutor()

     :獲得 ExecutorService 。
  • #getScheduler()

     / 

    #getScheduler(Func0<Boolean>)

     :獲得 RxJava Scheduler 。
  • #isQueueSpaceAvailable()

     :線程池隊列是否有空餘。
  • #markThreadExecution()

     / 

    #markThreadCompletion()

     / 

    #markThreadRejection()

     :TODO 【2002】【metrics】

5.1 HystrixThreadPoolDefault

com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault

 ,Hystrix 線程池實作類。

構造方法,代碼如下 :

1: private final HystrixThreadPoolProperties properties;
 2: private final BlockingQueue<Runnable> queue;
 3: private final ThreadPoolExecutor threadPool;
 4: private final HystrixThreadPoolMetrics metrics;
 5: private final int queueSize;
 6: 
 7: public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
 8:     // 初始化 HystrixThreadPoolProperties
 9:     this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
10:     // 獲得 HystrixConcurrencyStrategy
11:     HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
12:     // 隊列大小
13:     this.queueSize = properties.maxQueueSize().get();
14: 
15:     // TODO 【2002】【metrics】
16:     this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
17:             concurrencyStrategy.getThreadPool(threadPoolKey, properties), // 初始化 ThreadPoolExecutor
18:             properties);
19: 
20:     // 獲得 ThreadPoolExecutor
21:     this.threadPool = this.metrics.getThreadPool();
22:     this.queue = this.threadPool.getQueue(); // 隊列
23: 
24:     // TODO 【2002】【metrics】
25:     /* strategy: HystrixMetricsPublisherThreadPool */
26:     HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
27: }
           
  • 第 9 行 :初始化 HystrixThreadPoolProperties 。
  • 第 11 行 :初始化 HystrixConcurrencyStrategy 。
  • 第 13 行 :初始化 

    queueSize

     。
  • 第 16 至 18 行 :TODO 【2002】【metrics】
    • 第 17 行 :調用 

      HystrixConcurrencyStrategy#getThreadPool(...)

       方法,初始化 ThreadPoolExecutor 。
  • 第 21 行 :獲得 ThreadPoolExecutor 。
  • 第 22 行 :獲得 ThreadPoolExecutor 的隊列。
  • 第 26 行 :TODO 【2002】【metrics】

#getExecutor()

 方法,代碼如下 :

@Override
public ThreadPoolExecutor getExecutor() {
    touchConfig();
    return threadPool;
}
           
  • 調用 

    #touchConfig()

     方法,動态調整 

    threadPool

     的 

    coreSize

     / 

    maximumSize

     / 

    keepAliveTime

     參數。點選 連結 檢視該方法。

#getScheduler()

 / 

#getScheduler(Func0<Boolean>)

 方法,代碼如下 :

@Override
public Scheduler getScheduler() {
    //by default, interrupt underlying threads on timeout
    return getScheduler(new Func0<Boolean>() {
        @Override
        public Boolean call() {
            return true;
        }
    });
}

@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
    touchConfig();
    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
           
  • HystrixContextScheduler 和 

    shouldInterruptThread

     都在 「6. HystrixContextScheduler」 詳細解析。

#isQueueSpaceAvailable()

 方法,代碼如下 :

@Override
public boolean isQueueSpaceAvailable() {
    if (queueSize <= 0) {
        // we don't have a queue so we won't look for space but instead
        // let the thread-pool reject or not
        return true;
    } else {
        return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
    }
}
           
  • 由于線程池的隊列大小不能動态調整,該方法的實作通過 

    HystrixThreadPoolProperties.queueSizeRejectionThreshold

     屬性控制。
  • 注意 

    queueSize

     屬性,決定了線程池的隊列類型。
    • queueSize <= 0

       時,

      #isQueueSpaceAvailable()

       都傳回 

      true

       的原因是,線程池使用 SynchronousQueue 作為隊列,不支援新任務排隊,任務超過線程池的 

      maximumPoolSize

       時,新任務被拒絕。
    • queueSize > 0

       時,

      #isQueueSpaceAvailable()

       根據情況

      true

      /

      false

       的原因是,線程池使用 LinkedBlockingQueue 作為隊列,支援一定數量的阻塞排隊,但是這個數量無法調整。通過 

      #isQueueSpaceAvailable()

       方法的判斷,動态調整。另外,初始配置的 

      queueSize

       要相對大,否則即使 

      queueSizeRejectionThreshold

       配置的大于 

      queueSize

       ,實際送出任務到線程池,也會被拒絕。

5.2 Factory

com.netflix.hystrix.HystrixThreadPool.Factory

 ,HystrixThreadPool 工廠類,不僅限于 HystrixThreadPool 的建立,也提供了 HystrixThreadPool 的管理( HystrixThreadPool 的容器 )。

threadPools

 屬性,維護建立的 HystrixThreadPool 對應的映射,代碼如下 :

final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
           
  • Key 為 

    HystrixThreadPoolKey#name()

     ,每個 HystrixThreadPoolKey 對應一個 HystrixThreadPool 對象。

#getInstance(...)

 方法,獲得 HystrixThreadPool 對象,代碼如下 :

/* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
    // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
    String key = threadPoolKey.name();

    // this should find it for all but the first time
    HystrixThreadPool previouslyCached = threadPools.get(key);
    if (previouslyCached != null) {
         return previouslyCached;
     }

     // if we get here this is the first time so we need to initialize
     synchronized (HystrixThreadPool.class) {
        if (!threadPools.containsKey(key)) {
            threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
       }
     }
     return threadPools.get(key);
}
           
  • 根據 

    threadPoolKey

     先從 

    threadPool

     擷取已建立的 HystrixThreadPool ;擷取不到,建立對應的 HystrixThreadPool 傳回,并添加到 

    threadPool

     。

#shutdown()

 / 

#shutdown(timeout, unit)

 方法,比較易懂,點選 連結 檢視。

5.3 初始化

在 AbstractCommand 構造方法裡,初始化指令的 

threadPool

 屬性,代碼如下 :

protected final HystrixThreadPool threadPool;

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
       HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
       HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
       HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

    // ... 省略其他代碼

    // 初始化 threadPoolKey
    this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
    // 初始化 threadPool
    this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

}
           
  • 調用 

    #initThreadPool(...)

     方法,獲得 HystrixThreadPool ,點選 連結 檢視。

6. HystrixScheduler

Hystrix 實作了自定義的 RxJava Scheduler ,整體類圖如下 :

Hystrix 源碼解析 —— 指令執行(二)之執行隔離政策1. 概述2. HystrixThreadPoolProperties3. HystrixThreadPoolKey4. HystrixConcurrencyStrategy5. HystrixThreadPool6. HystrixScheduler
  • HystrixContextScheduler ( 實作 RxJava Scheduler 抽象類 ),内嵌類型為 ThreadPoolScheduler ( 實作 RxJava Scheduler 抽象類 )的 

    actualScheduler

     屬性。
  • HystrixContextWorker ( 實作 RxJava Worker 抽象類 ),内嵌類型為 ThreadPoolWorker ( 實作 RxJava Worker 抽象類 )的 

    worker

     屬性。

6.1 HystrixContextScheduler

構造方法,代碼如下 :

public class HystrixContextScheduler extends Scheduler {

    private final HystrixConcurrencyStrategy concurrencyStrategy;
    private final Scheduler actualScheduler;
    private final HystrixThreadPool threadPool;
    
    // ... 省略無關代碼
    
    public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
        this.concurrencyStrategy = concurrencyStrategy;
        this.threadPool = threadPool;
        this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
    }
}
           
  • actualScheduler

     屬性,類型為 ThreadPoolScheduler 。

#createWorker()

 方法,代碼如下 :

@Override
public Worker createWorker() {
    return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
           
  • 使用 

    actualScheduler

     建立 ThreadPoolWorker ,傳參給 HystrixContextSchedulerWorker 。

6.2 HystrixContextSchedulerWorker

構造方法,代碼如下 :

private class HystrixContextSchedulerWorker extends Worker {

    private final Worker worker;

    // ... 省略無關代碼

    private HystrixContextSchedulerWorker(Worker actualWorker) {
        this.worker = actualWorker;
    }
   
}
           
  • worker

     屬性,類型為 ThreadPoolWorker 。

#schedule(Action0)

 方法,代碼如下 :

@Override
public Subscription schedule(Action0 action) {
    if (threadPool != null) {
        if (!threadPool.isQueueSpaceAvailable()) {
            throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
        }
    }
    return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
           
  • 調用 

    ThreadPool#isQueueSpaceAvailable()

     方法,判斷線程池隊列是否有空餘。這個就是 HystrixContextScheduler 的實際用途。

#unsubscribe()

 / 

#isUnsubscribed()

 方法,使用 

worker

 判斷,點選 連結檢視。

6.3 ThreadPoolScheduler

ThreadPoolScheduler 比較簡單,點選 連結 檢視。

6.4 ThreadPoolWorker

構造方法,代碼如下 :

private static class ThreadPoolWorker extends Worker {

    private final HystrixThreadPool threadPool;
    private final CompositeSubscription subscription = new CompositeSubscription();
    private final Func0<Boolean> shouldInterruptThread;

    // ... 省略無關代碼

    public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
        this.threadPool = threadPool;
        this.shouldInterruptThread = shouldInterruptThread;
    }
}
           
  • subscription

     屬性,訂閱資訊。

#schedule(Action0)

 方法,代碼如下 :

1: @Override
 2: public Subscription schedule(final Action0 action) {
 3:     // 未訂閱,傳回
 4:     if (subscription.isUnsubscribed()) {
 5:         // don't schedule, we are unsubscribed
 6:         return Subscriptions.unsubscribed();
 7:     }
 8: 
 9:     // 建立 ScheduledAction
10:     // This is internal RxJava API but it is too useful.
11:     ScheduledAction sa = new ScheduledAction(action);
12: 
13:     // 添加到 訂閱
14:     subscription.add(sa);
15:     sa.addParent(subscription);
16: 
17:     // 送出 任務
18:     ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
19:     FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
20:     sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
21: 
22:     return sa;
23: }
           
  • 第 4 至 7 行 :未訂閱,傳回。
  • 第 11 行 : 建立 ScheduledAction 。在 TODO 【2013】【ScheduledAction】 詳細解析。
  • 第 14 至 15 行 :添加到訂閱( 

    subscription

     )。
  • 第 18 至 20 行 :使用 

    threadPool

     ,送出任務,并建立 FutureCompleterWithConfigurableInterrupt 添加到訂閱( 

    sa

     )。
  • 第 22 行 :傳回訂閱( 

    sa

     )。整體訂閱關系如下 :
    Hystrix 源碼解析 —— 指令執行(二)之執行隔離政策1. 概述2. HystrixThreadPoolProperties3. HystrixThreadPoolKey4. HystrixConcurrencyStrategy5. HystrixThreadPool6. HystrixScheduler

#unsubscribe()

 / 

#isUnsubscribed()

 方法,使用 

subscription

 判斷,點選 連結檢視。

6.5 FutureCompleterWithConfigurableInterrupt

com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.FutureCompleterWithConfigurableInterrupt

 ,實作類似 

rx.internal.schedulers.ScheduledAction.FutureCompleter

 ,在它的基礎上,支援配置 

FutureTask#cancel(Boolean)

 是否可打斷運作( 

mayInterruptIfRunning

 )。

構造方法,代碼如下 :

private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
    private final FutureTask<?> f;
    private final Func0<Boolean> shouldInterruptThread;
    private final ThreadPoolExecutor executor;

    // ... 省略無關代碼

    private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
        this.f = f;
        this.shouldInterruptThread = shouldInterruptThread;
        this.executor = executor;
    }
}
           

當指令執行逾時,或是主動取消指令執行時,調用 

#unsubscribe()

 方法,取消執行。

當指令執行逾時,或是主動取消指令執行時,調用 

#unsubscribe()

 方法,取消執行。

當指令執行逾時,或是主動取消指令執行時,調用 

#unsubscribe()

 方法,取消執行。

#unsubscribe()

 方法,代碼如下 :

@Override
public void unsubscribe() {
    // 從 線程池 移除 任務
    executor.remove(f);
    // 根據 shouldInterruptThread 配置,是否強制取消
    if (shouldInterruptThread.call()) {
        f.cancel(true);
    } else {
        f.cancel(false);
    }
}
           
  • 根據 

    shouldInterruptThread

     方法,判斷是否強制取消。
  • shouldInterruptThread

     對應的方法,實作代碼如下 :
subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
    @Override
    public Boolean call() {
        return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
    }
}));
           
  • 當 

    executionIsolationThreadInterruptOnTimeout = true

     時,指令可執行逾時。當指令可執行逾時時,強制取消。
  • 當使用 

    HystrixCommand.queue()

     傳回的 Future ,可以使用 

    Future#cancel(Boolean)

     取消指令執行。從 

    shouldInterruptThread

     對應的方法可以看到,如果此時不滿足指令執行逾時的條件,指令執行取消的方式是非強制的。此時當 

    executionIsolationThreadInterruptOnFutureCancel = true

     時,并且調用 

    Future#cancel(Boolean)

     傳遞 

    mayInterruptIfRunning = true

     ,強制取消指令執行。
    • 模拟測試用例 :

      CommandHelloWorld#testAsynchronous3()

    • HystrixCommand#queue()

       :點選 連結 檢視 

      Future#cancel(Boolean)

       方法。 

繼續閱讀