1. 概述
本文主要分享 Hystrix 指令執行(二)之執行隔離政策。
建議 :對 RxJava 已經有一定的了解的基礎上閱讀本文。
Hystrix 提供兩種執行隔離政策( ExecutionIsolationStrategy ) :
-
:信号量,指令在調用線程執行。在《Hystrix 源碼解析 —— 指令執行(一)之正常執行邏輯》「3. TryableSemaphore」 已經詳細解析。SEMAPHORE
-
:線程池,指令線上程池執行。在《Hystrix 源碼解析 —— 指令執行(一)之正常執行邏輯》「5. #executeCommandWithSpecifiedIsolation(…)」 的THREAD
方法中,調用#executeCommandWithSpecifiedIsolation(...)
方法,指定在 RxJava Scheduler 執行。Observable#subscribeOn(Scheduler)
- 如果你暫時不了解 Scheduler ,可以閱讀 《RxJava 源碼解析 —— Scheduler》 。
- 如果你暫時不了解
,可以閱讀 《RxJava 源碼解析 —— Observable#subscribeOn(Scheduler)》 。Observable#subscribeOn(Scheduler)
兩種方式的優缺點比較,推薦閱讀 《【翻譯】Hystrix文檔-實作原理》「依賴隔離」。
2. HystrixThreadPoolProperties
com.netflix.hystrix.HystrixThreadPoolProperties
,Hystrix 線程池屬性配置抽象類,點選 連結 檢視,已添加中文注釋說明。
com.netflix.hystrix.strategy.properties.HystrixPropertiesThreadPoolDefault
,Hystrix 線程池配置實作類,點選 連結 檢視。實際上沒什麼内容,官方如是說 :
Default implementation of {@link HystrixThreadPoolProperties} using Archaius (https://github.com/Netflix/archaius)
3. HystrixThreadPoolKey
com.netflix.hystrix.HystrixThreadPoolKey
,Hystrix 線程池辨別接口。
FROM HystrixThreadPoolKey 接口注釋
A key to represent a {@link HystrixThreadPool} for monitoring, metrics publishing, caching and other such uses.
This interface is intended to work natively with Enums so that implementing code can be an enum that implements this interface.
- 直白的說 ,希望通過相同的
( 辨別 ) 獲得同 HystrixThreadPoolKey 對象。通過在内部維持一個name
與 HystrixThreadPoolKey 對象的映射,以達到枚舉的效果。name
HystrixThreadPoolKey 代碼如下 :
1: public interface HystrixThreadPoolKey extends HystrixKey {
2: class Factory {
3: private Factory() {
4: }
5:
6: // used to intern instances so we don't keep re-creating them millions of times for the same key
7: private static final InternMap<String, HystrixThreadPoolKey> intern
8: = new InternMap<String, HystrixThreadPoolKey>(
9: new InternMap.ValueConstructor<String, HystrixThreadPoolKey>() {
10: @Override
11: public HystrixThreadPoolKey create(String key) {
12: return new HystrixThreadPoolKeyDefault(key);
13: }
14: });
15:
16: public static HystrixThreadPoolKey asKey(String name) {
17: return intern.interned(name);
18: }
19:
20: private static class HystrixThreadPoolKeyDefault extends HystrixKeyDefault implements HystrixThreadPoolKey {
21: public HystrixThreadPoolKeyDefault(String name) {
22: super(name);
23: }
24: }
25:
26: /* package-private */ static int getThreadPoolCount() {
27: return intern.size();
28: }
29: }
- HystrixThreadPoolKey 實作
接口,點選 連結 檢視。該接口定義的com.netflix.hystrix.HystrixKey
方法,即是上文我們所說的辨別( Key )。#name()
-
屬性,intern
與 HystrixThreadPoolKey 對象的映射,以達到枚舉的效果。name
-
,點選 連結 檢視帶中文注釋的代碼。com.netflix.hystrix.util.InternMap
-
-
方法,從#asKey(name)
獲得 HystrixThreadPoolKey 對象。intern
-
方法,獲得 HystrixThreadPoolKey 數量。#getThreadPoolCount()
在 AbstractCommand 構造方法裡,初始化指令的
threadPoolKey
屬性,代碼如下 :
protected final HystrixThreadPoolKey threadPoolKey;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
// ... 省略無關代碼
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
// 初始化 threadPoolKey
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
}
- 調用
方法,建立最終的#initThreadPoolKey(...)
屬性。代碼如下 :threadPoolKey
private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
if (threadPoolKeyOverride == null) {
// we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
if (threadPoolKey == null) {
/* use HystrixCommandGroup if HystrixThreadPoolKey is null */
return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
} else {
return threadPoolKey;
}
} else { // threadPoolKeyOverride 可覆寫屬性
// we have a property defining the thread-pool so use it instead
return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
}
}
-
- 優先級 :
>threadPoolKeyOverride
>threadPoolKey
groupKey
- 優先級 :
4. HystrixConcurrencyStrategy
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy
,Hystrix 并發政策抽象類。
HystrixConcurrencyStrategy#getThreadPool(...)
方法,代碼如下 :
1: public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
2: final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
3:
4: final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
5: final int dynamicCoreSize = threadPoolProperties.coreSize().get();
6: final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
7: final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
8:
9: final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
10:
11: if (allowMaximumSizeToDivergeFromCoreSize) {
12: final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
13: if (dynamicCoreSize > dynamicMaximumSize) {
14: logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
15: dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
16: dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
17: return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
18: } else {
19: return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
20: }
21: } else {
22: return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
23: }
24: }
- 第 2 行 :調用
方法,獲得 ThreadFactory 。點選 連結 檢視方法代碼。#getThreadFactory(...)
-
方法,無需細看,适用于 Google App Engine 場景。PlatformSpecific#getAppEngineThreadFactory()
-
- 第 4 至 7 行 :「2. HystrixThreadPoolProperties」 有詳細解析。
- 第 9 行 :調用
方法,獲得線程池的阻塞隊列。點選 連結 檢視方法代碼。#getBlockingQueue()
- 當
時( 預設值 :maxQueueSize <= 0
) 時,使用 SynchronousQueue 。超過線程池的-1
時,送出任務被拒絕。maximumPoolSize
- 《Java并發包中的同步隊列SynchronousQueue實作原理》
- 當
時,使用 LinkedBlockingQueue 。超過線程池的SynchronousQueue > 0
時,任務被拒絕。超過線程池的maximumPoolSize
+ 線程池隊列的maximumPoolSize
時,送出任務被阻塞等待。maxQueueSize
- 《Java阻塞隊列ArrayBlockingQueue和LinkedBlockingQueue實作原理分析》
- 推薦 :《聊聊并發(三)——JAVA線程池的分析和使用》
- 推薦 :《聊聊并發(七)——Java中的阻塞隊列》
- 《Java阻塞隊列ArrayBlockingQueue和LinkedBlockingQueue實作原理分析》
- 當
- 第 11 至 23 行 :建立 ThreadPoolExecutor 。看起來代碼比較多,根據
的情況,計算線程池的allowMaximumSizeToDivergeFromCoreSize
屬性。計算的方式和maximumPoolSize
方法是一緻的。HystrixThreadPoolProperties#actualMaximumSize()
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault
,Hystrix 并發政策實作類。代碼如下( 基本沒做啥 ) :
public class HystrixConcurrencyStrategyDefault extends HystrixConcurrencyStrategy {
/**
* 單例
*/
private static HystrixConcurrencyStrategyDefault INSTANCE = new HystrixConcurrencyStrategyDefault();
public static HystrixConcurrencyStrategy getInstance() {
return INSTANCE;
}
private HystrixConcurrencyStrategyDefault() {
}
}
在 AbstractCommand 構造方法裡,初始化指令的
threadPoolKey
屬性,代碼如下 :
protected final HystrixConcurrencyStrategy concurrencyStrategy;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
// ... 省略無關代碼
// 初始化 并發政策
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
}
- HystrixPlugins ,Hystrix 插件體系,https://github.com/Netflix/Hystrix/wiki/Plugins 有詳細解析。
- 調用
獲得 HystrixConcurrencyStrategy 對象。預設情況下,使用 HystrixConcurrencyStrategyDefault 。當然你也可以參考 Hystrix 插件體系,實作自定義的 HystrixConcurrencyStrategy 實作,以達到覆寫HystrixPlugins#getConcurrencyStrategy()
,#getThreadPool()
等方法。點選 連結 檢視該方法代碼。#getBlockingQueue()
5. HystrixThreadPool
com.netflix.hystrix.HystrixThreadPool
,Hystrix 線程池接口。當 Hystrix 指令使用
THREAD
執行隔離政策時,
HystrixCommand#run()
方法線上程池執行。點選 連結 檢視。HystrixThreadPool 定義接口如下 :
-
:獲得 ExecutorService 。#getExecutor()
-
/#getScheduler()
:獲得 RxJava Scheduler 。#getScheduler(Func0<Boolean>)
-
:線程池隊列是否有空餘。#isQueueSpaceAvailable()
-
/#markThreadExecution()
/#markThreadCompletion()
:TODO 【2002】【metrics】#markThreadRejection()
5.1 HystrixThreadPoolDefault
com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault
,Hystrix 線程池實作類。
構造方法,代碼如下 :
1: private final HystrixThreadPoolProperties properties;
2: private final BlockingQueue<Runnable> queue;
3: private final ThreadPoolExecutor threadPool;
4: private final HystrixThreadPoolMetrics metrics;
5: private final int queueSize;
6:
7: public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
8: // 初始化 HystrixThreadPoolProperties
9: this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
10: // 獲得 HystrixConcurrencyStrategy
11: HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
12: // 隊列大小
13: this.queueSize = properties.maxQueueSize().get();
14:
15: // TODO 【2002】【metrics】
16: this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
17: concurrencyStrategy.getThreadPool(threadPoolKey, properties), // 初始化 ThreadPoolExecutor
18: properties);
19:
20: // 獲得 ThreadPoolExecutor
21: this.threadPool = this.metrics.getThreadPool();
22: this.queue = this.threadPool.getQueue(); // 隊列
23:
24: // TODO 【2002】【metrics】
25: /* strategy: HystrixMetricsPublisherThreadPool */
26: HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
27: }
- 第 9 行 :初始化 HystrixThreadPoolProperties 。
- 第 11 行 :初始化 HystrixConcurrencyStrategy 。
- 第 13 行 :初始化
。queueSize
- 第 16 至 18 行 :TODO 【2002】【metrics】
- 第 17 行 :調用
方法,初始化 ThreadPoolExecutor 。HystrixConcurrencyStrategy#getThreadPool(...)
- 第 17 行 :調用
- 第 21 行 :獲得 ThreadPoolExecutor 。
- 第 22 行 :獲得 ThreadPoolExecutor 的隊列。
- 第 26 行 :TODO 【2002】【metrics】
#getExecutor()
方法,代碼如下 :
@Override
public ThreadPoolExecutor getExecutor() {
touchConfig();
return threadPool;
}
- 調用
方法,動态調整#touchConfig()
的threadPool
/coreSize
/maximumSize
參數。點選 連結 檢視該方法。keepAliveTime
#getScheduler()
/
#getScheduler(Func0<Boolean>)
方法,代碼如下 :
@Override
public Scheduler getScheduler() {
//by default, interrupt underlying threads on timeout
return getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return true;
}
});
}
@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
- HystrixContextScheduler 和
都在 「6. HystrixContextScheduler」 詳細解析。shouldInterruptThread
#isQueueSpaceAvailable()
方法,代碼如下 :
@Override
public boolean isQueueSpaceAvailable() {
if (queueSize <= 0) {
// we don't have a queue so we won't look for space but instead
// let the thread-pool reject or not
return true;
} else {
return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
}
}
- 由于線程池的隊列大小不能動态調整,該方法的實作通過
屬性控制。HystrixThreadPoolProperties.queueSizeRejectionThreshold
- 注意
屬性,決定了線程池的隊列類型。queueSize
-
時,queueSize <= 0
都傳回#isQueueSpaceAvailable()
的原因是,線程池使用 SynchronousQueue 作為隊列,不支援新任務排隊,任務超過線程池的true
時,新任務被拒絕。maximumPoolSize
-
時,queueSize > 0
根據情況#isQueueSpaceAvailable()
/true
的原因是,線程池使用 LinkedBlockingQueue 作為隊列,支援一定數量的阻塞排隊,但是這個數量無法調整。通過false
方法的判斷,動态調整。另外,初始配置的#isQueueSpaceAvailable()
要相對大,否則即使queueSize
配置的大于queueSizeRejectionThreshold
,實際送出任務到線程池,也會被拒絕。queueSize
-
5.2 Factory
com.netflix.hystrix.HystrixThreadPool.Factory
,HystrixThreadPool 工廠類,不僅限于 HystrixThreadPool 的建立,也提供了 HystrixThreadPool 的管理( HystrixThreadPool 的容器 )。
threadPools
屬性,維護建立的 HystrixThreadPool 對應的映射,代碼如下 :
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
- Key 為
,每個 HystrixThreadPoolKey 對應一個 HystrixThreadPool 對象。HystrixThreadPoolKey#name()
#getInstance(...)
方法,獲得 HystrixThreadPool 對象,代碼如下 :
/* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
// get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
String key = threadPoolKey.name();
// this should find it for all but the first time
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
- 根據
先從threadPoolKey
擷取已建立的 HystrixThreadPool ;擷取不到,建立對應的 HystrixThreadPool 傳回,并添加到threadPool
。threadPool
#shutdown()
/
#shutdown(timeout, unit)
方法,比較易懂,點選 連結 檢視。
5.3 初始化
在 AbstractCommand 構造方法裡,初始化指令的
threadPool
屬性,代碼如下 :
protected final HystrixThreadPool threadPool;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
// ... 省略其他代碼
// 初始化 threadPoolKey
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
// 初始化 threadPool
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
}
- 調用
方法,獲得 HystrixThreadPool ,點選 連結 檢視。#initThreadPool(...)
6. HystrixScheduler
Hystrix 實作了自定義的 RxJava Scheduler ,整體類圖如下 :
- HystrixContextScheduler ( 實作 RxJava Scheduler 抽象類 ),内嵌類型為 ThreadPoolScheduler ( 實作 RxJava Scheduler 抽象類 )的
屬性。actualScheduler
- HystrixContextWorker ( 實作 RxJava Worker 抽象類 ),内嵌類型為 ThreadPoolWorker ( 實作 RxJava Worker 抽象類 )的
屬性。worker
6.1 HystrixContextScheduler
構造方法,代碼如下 :
public class HystrixContextScheduler extends Scheduler {
private final HystrixConcurrencyStrategy concurrencyStrategy;
private final Scheduler actualScheduler;
private final HystrixThreadPool threadPool;
// ... 省略無關代碼
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}
}
-
屬性,類型為 ThreadPoolScheduler 。actualScheduler
#createWorker()
方法,代碼如下 :
@Override
public Worker createWorker() {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
- 使用
建立 ThreadPoolWorker ,傳參給 HystrixContextSchedulerWorker 。actualScheduler
6.2 HystrixContextSchedulerWorker
構造方法,代碼如下 :
private class HystrixContextSchedulerWorker extends Worker {
private final Worker worker;
// ... 省略無關代碼
private HystrixContextSchedulerWorker(Worker actualWorker) {
this.worker = actualWorker;
}
}
-
屬性,類型為 ThreadPoolWorker 。worker
#schedule(Action0)
方法,代碼如下 :
@Override
public Subscription schedule(Action0 action) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
- 調用
方法,判斷線程池隊列是否有空餘。這個就是 HystrixContextScheduler 的實際用途。ThreadPool#isQueueSpaceAvailable()
#unsubscribe()
/
#isUnsubscribed()
方法,使用
worker
判斷,點選 連結檢視。
6.3 ThreadPoolScheduler
ThreadPoolScheduler 比較簡單,點選 連結 檢視。
6.4 ThreadPoolWorker
構造方法,代碼如下 :
private static class ThreadPoolWorker extends Worker {
private final HystrixThreadPool threadPool;
private final CompositeSubscription subscription = new CompositeSubscription();
private final Func0<Boolean> shouldInterruptThread;
// ... 省略無關代碼
public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
}
}
-
屬性,訂閱資訊。subscription
#schedule(Action0)
方法,代碼如下 :
1: @Override
2: public Subscription schedule(final Action0 action) {
3: // 未訂閱,傳回
4: if (subscription.isUnsubscribed()) {
5: // don't schedule, we are unsubscribed
6: return Subscriptions.unsubscribed();
7: }
8:
9: // 建立 ScheduledAction
10: // This is internal RxJava API but it is too useful.
11: ScheduledAction sa = new ScheduledAction(action);
12:
13: // 添加到 訂閱
14: subscription.add(sa);
15: sa.addParent(subscription);
16:
17: // 送出 任務
18: ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
19: FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
20: sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
21:
22: return sa;
23: }
- 第 4 至 7 行 :未訂閱,傳回。
- 第 11 行 : 建立 ScheduledAction 。在 TODO 【2013】【ScheduledAction】 詳細解析。
- 第 14 至 15 行 :添加到訂閱(
)。subscription
- 第 18 至 20 行 :使用
,送出任務,并建立 FutureCompleterWithConfigurableInterrupt 添加到訂閱(threadPool
)。sa
- 第 22 行 :傳回訂閱(
)。整體訂閱關系如下 :sa
#unsubscribe()
/
#isUnsubscribed()
方法,使用
subscription
判斷,點選 連結檢視。
6.5 FutureCompleterWithConfigurableInterrupt
com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.FutureCompleterWithConfigurableInterrupt
,實作類似
rx.internal.schedulers.ScheduledAction.FutureCompleter
,在它的基礎上,支援配置
FutureTask#cancel(Boolean)
是否可打斷運作(
mayInterruptIfRunning
)。
構造方法,代碼如下 :
private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
private final FutureTask<?> f;
private final Func0<Boolean> shouldInterruptThread;
private final ThreadPoolExecutor executor;
// ... 省略無關代碼
private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
this.f = f;
this.shouldInterruptThread = shouldInterruptThread;
this.executor = executor;
}
}
當指令執行逾時,或是主動取消指令執行時,調用
#unsubscribe()
方法,取消執行。
當指令執行逾時,或是主動取消指令執行時,調用
#unsubscribe()
方法,取消執行。
當指令執行逾時,或是主動取消指令執行時,調用
#unsubscribe()
方法,取消執行。
#unsubscribe()
方法,代碼如下 :
@Override
public void unsubscribe() {
// 從 線程池 移除 任務
executor.remove(f);
// 根據 shouldInterruptThread 配置,是否強制取消
if (shouldInterruptThread.call()) {
f.cancel(true);
} else {
f.cancel(false);
}
}
- 根據
方法,判斷是否強制取消。shouldInterruptThread
-
對應的方法,實作代碼如下 :shouldInterruptThread
subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
- 當
時,指令可執行逾時。當指令可執行逾時時,強制取消。executionIsolationThreadInterruptOnTimeout = true
- 當使用
傳回的 Future ,可以使用HystrixCommand.queue()
取消指令執行。從Future#cancel(Boolean)
對應的方法可以看到,如果此時不滿足指令執行逾時的條件,指令執行取消的方式是非強制的。此時當shouldInterruptThread
時,并且調用executionIsolationThreadInterruptOnFutureCancel = true
傳遞Future#cancel(Boolean)
,強制取消指令執行。mayInterruptIfRunning = true
- 模拟測試用例 :
CommandHelloWorld#testAsynchronous3()
-
:點選 連結 檢視HystrixCommand#queue()
方法。Future#cancel(Boolean)
- 模拟測試用例 :