目錄
- 前言
- ScheduledExecutor
- TimerReference
- TimerListener
- HystrixTimer
- HystrixObservableTimeoutOperator
-
- AbstractCommand#executeCommandAndObserve
- HystrixObservableTimeoutOperator
- demo
前言
Hystrix
的逾時我們平時接觸的比較多,那
Hystrix
是如何實作的?線程隔離和信号量隔離實作的方式是一樣的嗎?這一篇文章我們從源碼的角度來探讨
Hystrix
對逾時的實作。
ScheduledExecutor
Hystrix
排程器,是一個靜态内部類。内部維護了一個
ScheduledThreadPoolExecutor
,看到這裡其實我們也不難猜出了
Hystrix
的逾時其實就是使用一個線程來定時執行任務監聽是否逾時。
static class ScheduledExecutor {
//這個線程池是關鍵 執行一個定時任務
volatile ScheduledThreadPoolExecutor executor;
private volatile boolean initialized;
//執行個體化線程池預設的線程大小是cpu的核心數量
//當然可以通過hystrix.timer.threadpool.default.coreSize來指定線程數量
//執行個體化完成之後更改initialized的狀态
public void initialize() {
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
int coreSize = propertiesStrategy.getTimerThreadPoolProperties().getCorePoolSize().get();
ThreadFactory threadFactory = null;
if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
threadFactory = new ThreadFactory() {
final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "HystrixTimer-" + counter.incrementAndGet());
thread.setDaemon(true);
return thread;
}
};
} else {
threadFactory = PlatformSpecific.getAppEngineThreadFactory();
}
executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);
initialized = true;
}
public ScheduledThreadPoolExecutor getThreadPool() {
return executor;
}
public boolean isInitialized() {
return initialized;
}
}
- 監聽逾時的實際上是一組線程完成
核心線程數預設是CPU的核數ScheduledThreadPoolExecutor
-
其實就是對線程池的包裝 我們可以簡單的認為它就是一個線程ScheduledExecutor
-
可以通過該配置來調整逾時線程池的大小hystrix.timer.threadpool.default.coreSize
TimerReference
這個類就更簡單了,上面
ScheduledExecutor
是對線程池的封裝 這個類是對執行結果
Future
的封裝. 但是注意它是一個軟引用。這裡多說一句 軟引用的回收條件是 在發現OOM之前 JVM會嘗試對軟引用進行回收。
private static class TimerReference extends SoftReference<TimerListener> {
private final ScheduledFuture<?> f;
TimerReference(TimerListener referent, ScheduledFuture<?> f) {
super(referent);
this.f = f;
}
@Override
public void clear() {
super.clear();
f.cancel(false);
}
}
主要注意
clear
方法 在該方法中取消了任務,如果任務正在執行 不打斷。
TimerListener
定時任務監聽器,提供了兩個方法
-
逾時執行的方法tick()
-
擷取定時任務執行的時間(逾時時間)getIntervalTimeInMilliseconds()
HystrixTimer
這個類可以說是
Hystrix
實作逾時的核心,像一個逾時容器,上面所有的逾時元件都是在這個類中定義。
//它以單例的形式存在
private static HystrixTimer INSTANCE = new HystrixTimer();
//執行器ScheduledExecutor 我們上面說過
AtomicReference<ScheduledExecutor> executor = new AtomicReference<ScheduledExecutor>();
//對外隻提供一個getInstance方法來擷取上面的執行個體
public static HystrixTimer getInstance() {
return INSTANCE;
}
public static void reset() {
ScheduledExecutor ex = INSTANCE.executor.getAndSet(null);
if (ex != null && ex.getThreadPool() != null) {
ex.getThreadPool().shutdownNow();
}
}
//這個方法很重要添加一個 逾時監聽器
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
startThreadIfNeeded();
//初始化一個任務 在任務内部執行 監聽器的tick方法,是以主邏輯的内容可以放在這個tick方法中
Runnable r = new Runnable() {
@Override
public void run() {
try {
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
};
//定時執行一個任務 間隔時間從主邏輯中傳入的TimerListener中擷取 預設是逾時時間
ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
//傳回一個Timer
return new TimerReference(listener, f);
}
//如果排程器沒有初始化 執行初始化方法 初始化方法上面已經說過
protected void startThreadIfNeeded() {
while (executor.get() == null || ! executor.get().isInitialized()) {
if (executor.compareAndSet(null, new ScheduledExecutor())) {
executor.get().initialize();
}
}
}
上面主要的邏輯是在
addTimerListener
方法中,這個方法送出了一個定時執行的任務 執行的内容就是
TimerListener
的
tick
方法。
HystrixObservableTimeoutOperator
我們上面把
Hystrix
逾時的工具都介紹了一遍。我們也知道
Hystrix
在執行
HystrixCommand.run
方法的時候 其實是把邏輯封裝成了一個
Observable
然後不停的變換
Observable
來達到目的。而
HystrixObservableTimeoutOperator
就是一個對Observable變換的邏輯。
AbstractCommand#executeCommandAndObserve
在指令的執行的過程中我們可以發現有這樣的幾行代碼,如果開啟逾時就調用處理逾時的邏輯。
lift(new HystrixObservableTimeoutOperator(_cmd))
我們簡單的了解
lift
就是對
Observable
進行轉換使其具備更進階的功能。而具體的邏輯就是在
HystrixObservableTimeoutOperator
中來實作。
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
HystrixObservableTimeoutOperator
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
final AbstractCommand<R> originalCommand;
public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
this.originalCommand = originalCommand;
}
//我們隻需要知道實作 call方法就能拿到訂閱者
@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
final CompositeSubscription s = new CompositeSubscription();
child.add(s);
final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();
TimerListener listener = new TimerListener() {
//逾時執行的邏輯
@Override
public void tick() {
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
//發送逾時時間
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
//取消訂閱
s.unsubscribe();
//如果逾時 抛出 HystrixTimeoutException異常
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
timeoutRunnable.run();
}
}
//從配置擷取逾時時間
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
//向HystrixTimer中添加監聽器
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
originalCommand.timeoutTimer.set(tl);
//定義Parent 和child關聯起來 隻有沒有逾時的時候才會執行child的 onNext onCompleted onError等方法
Subscriber<R> parent = new Subscriber<R>() {
@Override
public void onCompleted() {
if (isNotTimedOut()) {
tl.clear();
child.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (isNotTimedOut()) {
tl.clear();
child.onError(e);
}
}
@Override
public void onNext(R v) {
if (isNotTimedOut()) {
child.onNext(v);
}
}
private boolean isNotTimedOut() {
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
}
};
s.add(parent);
return parent;
}
}
demo
隔離政策為SEMAPHORE ,逾時時間9s
public class TimeoutCommand extends HystrixCommand<String> {
private String name;
protected TimeoutCommand(String name) {
super(HystrixCommandGroupKey.Factory.asKey(name));
this.name = name;
}
protected TimeoutCommand(Setter setter , String name) {
super(setter);
this.name = name;
}
@Override
protected String run() throws Exception {
TimeUnit.SECONDS.sleep(10);
return this.name;
}
}
@Test
public void fun2() throws InterruptedException {
HystrixCommand.Setter commandSetter = HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("coredy"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(9000)
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE));
TimeoutCommand command = new TimeoutCommand(commandSetter , "coredy");
command.toObservable().subscribe(item ->{
System.out.println(Thread.currentThread().getName());
System.out.println(item);
});
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}