天天看点

【你好Hystrix】七:Hystrix超时源码分析-HystrixObservableTimeoutOperator前言ScheduledExecutorTimerReferenceTimerListenerHystrixTimerHystrixObservableTimeoutOperatordemo

目录

  • 前言
  • ScheduledExecutor
  • TimerReference
  • TimerListener
  • HystrixTimer
  • HystrixObservableTimeoutOperator
    • AbstractCommand#executeCommandAndObserve
    • HystrixObservableTimeoutOperator
  • demo

前言

Hystrix

的超时我们平时接触的比较多,那

Hystrix

是如何实现的?线程隔离和信号量隔离实现的方式是一样的吗?这一篇文章我们从源码的角度来探讨

Hystrix

对超时的实现。

ScheduledExecutor

Hystrix

调度器,是一个静态内部类。内部维护了一个

ScheduledThreadPoolExecutor

,看到这里其实我们也不难猜出了

Hystrix

的超时其实就是使用一个线程来定时执行任务监听是否超时。

static class ScheduledExecutor {
      //这个线程池是关键 执行一个定时任务 
	  volatile ScheduledThreadPoolExecutor executor;
	  private volatile boolean initialized;
	  //实例化线程池默认的线程大小是cpu的核心数量
	  //当然可以通过hystrix.timer.threadpool.default.coreSize来指定线程数量
	  //实例化完成之后更改initialized的状态
	  public void initialize() {
	      HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
	      int coreSize = propertiesStrategy.getTimerThreadPoolProperties().getCorePoolSize().get();
	      ThreadFactory threadFactory = null;
	      if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
	          threadFactory = new ThreadFactory() {
	              final AtomicInteger counter = new AtomicInteger();
	              @Override
	              public Thread newThread(Runnable r) {
	                  Thread thread = new Thread(r, "HystrixTimer-" + counter.incrementAndGet());
	                  thread.setDaemon(true);
	                  return thread;
	              }
	          };
	      } else {
	          threadFactory = PlatformSpecific.getAppEngineThreadFactory();
	      }
	      executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);
	      initialized = true;
	  }
	  public ScheduledThreadPoolExecutor getThreadPool() {
	      return executor;
	  }
	  public boolean isInitialized() {
	      return initialized;
	  }
	}
           
  • 监听超时的实际上是一组线程完成

    ScheduledThreadPoolExecutor

    核心线程数默认是CPU的核数
  • ScheduledExecutor

    其实就是对线程池的包装 我们可以简单的认为它就是一个线程
  • hystrix.timer.threadpool.default.coreSize

    可以通过该配置来调整超时线程池的大小

TimerReference

这个类就更简单了,上面

ScheduledExecutor

是对线程池的封装 这个类是对执行结果

Future

的封装. 但是注意它是一个软引用。这里多说一句 软引用的回收条件是 在发现OOM之前 JVM会尝试对软引用进行回收。

private static class TimerReference extends SoftReference<TimerListener> {
   private final ScheduledFuture<?> f;
   TimerReference(TimerListener referent, ScheduledFuture<?> f) {
       super(referent);
       this.f = f;
   }
   @Override
   public void clear() {
       super.clear();
       f.cancel(false);
   }
}
           

主要注意

clear

方法 在该方法中取消了任务,如果任务正在执行 不打断。

TimerListener

定时任务监听器,提供了两个方法

  • tick()

    超时执行的方法
  • getIntervalTimeInMilliseconds()

    获取定时任务执行的时间(超时时间)

HystrixTimer

这个类可以说是

Hystrix

实现超时的核心,像一个超时容器,上面所有的超时组件都是在这个类中定义。

//它以单例的形式存在
private static HystrixTimer INSTANCE = new HystrixTimer();
//执行器ScheduledExecutor 我们上面说过
    AtomicReference<ScheduledExecutor> executor = new AtomicReference<ScheduledExecutor>();
  //对外只提供一个getInstance方法来获取上面的实例  
    public static HystrixTimer getInstance() {
        return INSTANCE;
    }
    public static void reset() {
        ScheduledExecutor ex = INSTANCE.executor.getAndSet(null);
        if (ex != null && ex.getThreadPool() != null) {
            ex.getThreadPool().shutdownNow();
        }
    }
   //这个方法很重要添加一个 超时监听器 
    public Reference<TimerListener> addTimerListener(final TimerListener listener) {
        startThreadIfNeeded();
        //初始化一个任务 在任务内部执行 监听器的tick方法,所以主逻辑的内容可以放在这个tick方法中
        Runnable r = new Runnable() {
            @Override
            public void run() {
                try {
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e);
                }
            }
        };
        //定时执行一个任务 间隔时间从主逻辑中传入的TimerListener中获取 默认是超时时间 
        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
        //返回一个Timer
        return new TimerReference(listener, f);
    }
    //如果调度器没有初始化 执行初始化方法 初始化方法上面已经说过
     protected void startThreadIfNeeded() {
        while (executor.get() == null || ! executor.get().isInitialized()) {
            if (executor.compareAndSet(null, new ScheduledExecutor())) {
                executor.get().initialize();
            }
        }
    }
           

上面主要的逻辑是在

addTimerListener

方法中,这个方法提交了一个定时执行的任务 执行的内容就是

TimerListener

tick

方法。

HystrixObservableTimeoutOperator

我们上面把

Hystrix

超时的工具都介绍了一遍。我们也知道

Hystrix

在执行

HystrixCommand.run

方法的时候 其实是把逻辑封装成了一个

Observable

然后不停的变换

Observable

来达到目的。而

HystrixObservableTimeoutOperator

就是一个对Observable变换的逻辑。

AbstractCommand#executeCommandAndObserve

在命令的执行的过程中我们可以发现有这样的几行代码,如果开启超时就调用处理超时的逻辑。

lift(new HystrixObservableTimeoutOperator(_cmd))

我们简单的理解

lift

就是对

Observable

进行转换使其具备更高级的功能。而具体的逻辑就是在

HystrixObservableTimeoutOperator

中来实现。

if (properties.executionTimeoutEnabled().get()) {
      execution = executeCommandWithSpecifiedIsolation(_cmd)
              .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
  } else {
      execution = executeCommandWithSpecifiedIsolation(_cmd);
  }
           

HystrixObservableTimeoutOperator

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
        final AbstractCommand<R> originalCommand;
        public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
            this.originalCommand = originalCommand;
        }
        //我们只需要知道实现 call方法就能拿到订阅者
        @Override
        public Subscriber<? super R> call(final Subscriber<? super R> child) {
            final CompositeSubscription s = new CompositeSubscription();
            child.add(s);
            final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();
            TimerListener listener = new TimerListener() {
              //超时执行的逻辑
                @Override
                public void tick() {
                    if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                    //发送超时时间
                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
                        //取消订阅
                        s.unsubscribe();
                  //如果超时 抛出 HystrixTimeoutException异常
                        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
                            @Override
                            public void run() {
                                child.onError(new HystrixTimeoutException());
                            }
                        });
                        timeoutRunnable.run();
                    }
                }
                //从配置获取超时时间
                @Override
                public int getIntervalTimeInMilliseconds() {
                    return originalCommand.properties.executionTimeoutInMilliseconds().get();
                }
            };
            //向HystrixTimer中添加监听器
            final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
            originalCommand.timeoutTimer.set(tl);
            //定义Parent 和child关联起来 只有没有超时的时候才会执行child的 onNext onCompleted onError等方法
            Subscriber<R> parent = new Subscriber<R>() {
                @Override
                public void onCompleted() {
                    if (isNotTimedOut()) {
                        tl.clear();
                        child.onCompleted();
                    }
                }
                @Override
                public void onError(Throwable e) {
                    if (isNotTimedOut()) {
                        tl.clear();
                        child.onError(e);
                    }
                }
                @Override
                public void onNext(R v) {
                    if (isNotTimedOut()) {
                        child.onNext(v);
                    }
                }
                private boolean isNotTimedOut() {
                    return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||                            originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
                }
            };
            s.add(parent);
            return parent;
        }

    }

           

demo

隔离策略为SEMAPHORE ,超时时间9s

public class TimeoutCommand extends HystrixCommand<String> {

  private String name;

  protected TimeoutCommand(String name) {

    super(HystrixCommandGroupKey.Factory.asKey(name));
    this.name = name;
  }

  protected TimeoutCommand(Setter setter , String name) {
    super(setter);
    this.name = name;
  }

  @Override
  protected String run() throws Exception {
    TimeUnit.SECONDS.sleep(10);
    return this.name;
  }
}

 @Test
  public void fun2() throws InterruptedException {
    HystrixCommand.Setter commandSetter = HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("coredy"))
                                       .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                                       .withExecutionTimeoutInMilliseconds(9000)
                                       .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE));
    TimeoutCommand command = new TimeoutCommand(commandSetter , "coredy");
    command.toObservable().subscribe(item ->{
      System.out.println(Thread.currentThread().getName());
      System.out.println(item);
    });
    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
  }

           

继续阅读