目录
- 前言
- ScheduledExecutor
- TimerReference
- TimerListener
- HystrixTimer
- HystrixObservableTimeoutOperator
-
- AbstractCommand#executeCommandAndObserve
- HystrixObservableTimeoutOperator
- demo
前言
Hystrix
的超时我们平时接触的比较多,那
Hystrix
是如何实现的?线程隔离和信号量隔离实现的方式是一样的吗?这一篇文章我们从源码的角度来探讨
Hystrix
对超时的实现。
ScheduledExecutor
Hystrix
调度器,是一个静态内部类。内部维护了一个
ScheduledThreadPoolExecutor
,看到这里其实我们也不难猜出了
Hystrix
的超时其实就是使用一个线程来定时执行任务监听是否超时。
static class ScheduledExecutor {
//这个线程池是关键 执行一个定时任务
volatile ScheduledThreadPoolExecutor executor;
private volatile boolean initialized;
//实例化线程池默认的线程大小是cpu的核心数量
//当然可以通过hystrix.timer.threadpool.default.coreSize来指定线程数量
//实例化完成之后更改initialized的状态
public void initialize() {
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
int coreSize = propertiesStrategy.getTimerThreadPoolProperties().getCorePoolSize().get();
ThreadFactory threadFactory = null;
if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
threadFactory = new ThreadFactory() {
final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "HystrixTimer-" + counter.incrementAndGet());
thread.setDaemon(true);
return thread;
}
};
} else {
threadFactory = PlatformSpecific.getAppEngineThreadFactory();
}
executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);
initialized = true;
}
public ScheduledThreadPoolExecutor getThreadPool() {
return executor;
}
public boolean isInitialized() {
return initialized;
}
}
- 监听超时的实际上是一组线程完成
核心线程数默认是CPU的核数ScheduledThreadPoolExecutor
-
其实就是对线程池的包装 我们可以简单的认为它就是一个线程ScheduledExecutor
-
可以通过该配置来调整超时线程池的大小hystrix.timer.threadpool.default.coreSize
TimerReference
这个类就更简单了,上面
ScheduledExecutor
是对线程池的封装 这个类是对执行结果
Future
的封装. 但是注意它是一个软引用。这里多说一句 软引用的回收条件是 在发现OOM之前 JVM会尝试对软引用进行回收。
private static class TimerReference extends SoftReference<TimerListener> {
private final ScheduledFuture<?> f;
TimerReference(TimerListener referent, ScheduledFuture<?> f) {
super(referent);
this.f = f;
}
@Override
public void clear() {
super.clear();
f.cancel(false);
}
}
主要注意
clear
方法 在该方法中取消了任务,如果任务正在执行 不打断。
TimerListener
定时任务监听器,提供了两个方法
-
超时执行的方法tick()
-
获取定时任务执行的时间(超时时间)getIntervalTimeInMilliseconds()
HystrixTimer
这个类可以说是
Hystrix
实现超时的核心,像一个超时容器,上面所有的超时组件都是在这个类中定义。
//它以单例的形式存在
private static HystrixTimer INSTANCE = new HystrixTimer();
//执行器ScheduledExecutor 我们上面说过
AtomicReference<ScheduledExecutor> executor = new AtomicReference<ScheduledExecutor>();
//对外只提供一个getInstance方法来获取上面的实例
public static HystrixTimer getInstance() {
return INSTANCE;
}
public static void reset() {
ScheduledExecutor ex = INSTANCE.executor.getAndSet(null);
if (ex != null && ex.getThreadPool() != null) {
ex.getThreadPool().shutdownNow();
}
}
//这个方法很重要添加一个 超时监听器
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
startThreadIfNeeded();
//初始化一个任务 在任务内部执行 监听器的tick方法,所以主逻辑的内容可以放在这个tick方法中
Runnable r = new Runnable() {
@Override
public void run() {
try {
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
};
//定时执行一个任务 间隔时间从主逻辑中传入的TimerListener中获取 默认是超时时间
ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
//返回一个Timer
return new TimerReference(listener, f);
}
//如果调度器没有初始化 执行初始化方法 初始化方法上面已经说过
protected void startThreadIfNeeded() {
while (executor.get() == null || ! executor.get().isInitialized()) {
if (executor.compareAndSet(null, new ScheduledExecutor())) {
executor.get().initialize();
}
}
}
上面主要的逻辑是在
addTimerListener
方法中,这个方法提交了一个定时执行的任务 执行的内容就是
TimerListener
的
tick
方法。
HystrixObservableTimeoutOperator
我们上面把
Hystrix
超时的工具都介绍了一遍。我们也知道
Hystrix
在执行
HystrixCommand.run
方法的时候 其实是把逻辑封装成了一个
Observable
然后不停的变换
Observable
来达到目的。而
HystrixObservableTimeoutOperator
就是一个对Observable变换的逻辑。
AbstractCommand#executeCommandAndObserve
在命令的执行的过程中我们可以发现有这样的几行代码,如果开启超时就调用处理超时的逻辑。
lift(new HystrixObservableTimeoutOperator(_cmd))
我们简单的理解
lift
就是对
Observable
进行转换使其具备更高级的功能。而具体的逻辑就是在
HystrixObservableTimeoutOperator
中来实现。
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
HystrixObservableTimeoutOperator
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
final AbstractCommand<R> originalCommand;
public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
this.originalCommand = originalCommand;
}
//我们只需要知道实现 call方法就能拿到订阅者
@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
final CompositeSubscription s = new CompositeSubscription();
child.add(s);
final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();
TimerListener listener = new TimerListener() {
//超时执行的逻辑
@Override
public void tick() {
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
//发送超时时间
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
//取消订阅
s.unsubscribe();
//如果超时 抛出 HystrixTimeoutException异常
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
timeoutRunnable.run();
}
}
//从配置获取超时时间
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
//向HystrixTimer中添加监听器
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
originalCommand.timeoutTimer.set(tl);
//定义Parent 和child关联起来 只有没有超时的时候才会执行child的 onNext onCompleted onError等方法
Subscriber<R> parent = new Subscriber<R>() {
@Override
public void onCompleted() {
if (isNotTimedOut()) {
tl.clear();
child.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (isNotTimedOut()) {
tl.clear();
child.onError(e);
}
}
@Override
public void onNext(R v) {
if (isNotTimedOut()) {
child.onNext(v);
}
}
private boolean isNotTimedOut() {
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
}
};
s.add(parent);
return parent;
}
}
demo
隔离策略为SEMAPHORE ,超时时间9s
public class TimeoutCommand extends HystrixCommand<String> {
private String name;
protected TimeoutCommand(String name) {
super(HystrixCommandGroupKey.Factory.asKey(name));
this.name = name;
}
protected TimeoutCommand(Setter setter , String name) {
super(setter);
this.name = name;
}
@Override
protected String run() throws Exception {
TimeUnit.SECONDS.sleep(10);
return this.name;
}
}
@Test
public void fun2() throws InterruptedException {
HystrixCommand.Setter commandSetter = HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("coredy"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(9000)
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE));
TimeoutCommand command = new TimeoutCommand(commandSetter , "coredy");
command.toObservable().subscribe(item ->{
System.out.println(Thread.currentThread().getName());
System.out.println(item);
});
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}