天天看點

【你好Hystrix】七:Hystrix逾時源碼分析-HystrixObservableTimeoutOperator前言ScheduledExecutorTimerReferenceTimerListenerHystrixTimerHystrixObservableTimeoutOperatordemo

目錄

  • 前言
  • ScheduledExecutor
  • TimerReference
  • TimerListener
  • HystrixTimer
  • HystrixObservableTimeoutOperator
    • AbstractCommand#executeCommandAndObserve
    • HystrixObservableTimeoutOperator
  • demo

前言

Hystrix

的逾時我們平時接觸的比較多,那

Hystrix

是如何實作的?線程隔離和信号量隔離實作的方式是一樣的嗎?這一篇文章我們從源碼的角度來探讨

Hystrix

對逾時的實作。

ScheduledExecutor

Hystrix

排程器,是一個靜态内部類。内部維護了一個

ScheduledThreadPoolExecutor

,看到這裡其實我們也不難猜出了

Hystrix

的逾時其實就是使用一個線程來定時執行任務監聽是否逾時。

static class ScheduledExecutor {
      //這個線程池是關鍵 執行一個定時任務 
	  volatile ScheduledThreadPoolExecutor executor;
	  private volatile boolean initialized;
	  //執行個體化線程池預設的線程大小是cpu的核心數量
	  //當然可以通過hystrix.timer.threadpool.default.coreSize來指定線程數量
	  //執行個體化完成之後更改initialized的狀态
	  public void initialize() {
	      HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
	      int coreSize = propertiesStrategy.getTimerThreadPoolProperties().getCorePoolSize().get();
	      ThreadFactory threadFactory = null;
	      if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
	          threadFactory = new ThreadFactory() {
	              final AtomicInteger counter = new AtomicInteger();
	              @Override
	              public Thread newThread(Runnable r) {
	                  Thread thread = new Thread(r, "HystrixTimer-" + counter.incrementAndGet());
	                  thread.setDaemon(true);
	                  return thread;
	              }
	          };
	      } else {
	          threadFactory = PlatformSpecific.getAppEngineThreadFactory();
	      }
	      executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);
	      initialized = true;
	  }
	  public ScheduledThreadPoolExecutor getThreadPool() {
	      return executor;
	  }
	  public boolean isInitialized() {
	      return initialized;
	  }
	}
           
  • 監聽逾時的實際上是一組線程完成

    ScheduledThreadPoolExecutor

    核心線程數預設是CPU的核數
  • ScheduledExecutor

    其實就是對線程池的包裝 我們可以簡單的認為它就是一個線程
  • hystrix.timer.threadpool.default.coreSize

    可以通過該配置來調整逾時線程池的大小

TimerReference

這個類就更簡單了,上面

ScheduledExecutor

是對線程池的封裝 這個類是對執行結果

Future

的封裝. 但是注意它是一個軟引用。這裡多說一句 軟引用的回收條件是 在發現OOM之前 JVM會嘗試對軟引用進行回收。

private static class TimerReference extends SoftReference<TimerListener> {
   private final ScheduledFuture<?> f;
   TimerReference(TimerListener referent, ScheduledFuture<?> f) {
       super(referent);
       this.f = f;
   }
   @Override
   public void clear() {
       super.clear();
       f.cancel(false);
   }
}
           

主要注意

clear

方法 在該方法中取消了任務,如果任務正在執行 不打斷。

TimerListener

定時任務監聽器,提供了兩個方法

  • tick()

    逾時執行的方法
  • getIntervalTimeInMilliseconds()

    擷取定時任務執行的時間(逾時時間)

HystrixTimer

這個類可以說是

Hystrix

實作逾時的核心,像一個逾時容器,上面所有的逾時元件都是在這個類中定義。

//它以單例的形式存在
private static HystrixTimer INSTANCE = new HystrixTimer();
//執行器ScheduledExecutor 我們上面說過
    AtomicReference<ScheduledExecutor> executor = new AtomicReference<ScheduledExecutor>();
  //對外隻提供一個getInstance方法來擷取上面的執行個體  
    public static HystrixTimer getInstance() {
        return INSTANCE;
    }
    public static void reset() {
        ScheduledExecutor ex = INSTANCE.executor.getAndSet(null);
        if (ex != null && ex.getThreadPool() != null) {
            ex.getThreadPool().shutdownNow();
        }
    }
   //這個方法很重要添加一個 逾時監聽器 
    public Reference<TimerListener> addTimerListener(final TimerListener listener) {
        startThreadIfNeeded();
        //初始化一個任務 在任務内部執行 監聽器的tick方法,是以主邏輯的内容可以放在這個tick方法中
        Runnable r = new Runnable() {
            @Override
            public void run() {
                try {
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e);
                }
            }
        };
        //定時執行一個任務 間隔時間從主邏輯中傳入的TimerListener中擷取 預設是逾時時間 
        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
        //傳回一個Timer
        return new TimerReference(listener, f);
    }
    //如果排程器沒有初始化 執行初始化方法 初始化方法上面已經說過
     protected void startThreadIfNeeded() {
        while (executor.get() == null || ! executor.get().isInitialized()) {
            if (executor.compareAndSet(null, new ScheduledExecutor())) {
                executor.get().initialize();
            }
        }
    }
           

上面主要的邏輯是在

addTimerListener

方法中,這個方法送出了一個定時執行的任務 執行的内容就是

TimerListener

tick

方法。

HystrixObservableTimeoutOperator

我們上面把

Hystrix

逾時的工具都介紹了一遍。我們也知道

Hystrix

在執行

HystrixCommand.run

方法的時候 其實是把邏輯封裝成了一個

Observable

然後不停的變換

Observable

來達到目的。而

HystrixObservableTimeoutOperator

就是一個對Observable變換的邏輯。

AbstractCommand#executeCommandAndObserve

在指令的執行的過程中我們可以發現有這樣的幾行代碼,如果開啟逾時就調用處理逾時的邏輯。

lift(new HystrixObservableTimeoutOperator(_cmd))

我們簡單的了解

lift

就是對

Observable

進行轉換使其具備更進階的功能。而具體的邏輯就是在

HystrixObservableTimeoutOperator

中來實作。

if (properties.executionTimeoutEnabled().get()) {
      execution = executeCommandWithSpecifiedIsolation(_cmd)
              .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
  } else {
      execution = executeCommandWithSpecifiedIsolation(_cmd);
  }
           

HystrixObservableTimeoutOperator

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
        final AbstractCommand<R> originalCommand;
        public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
            this.originalCommand = originalCommand;
        }
        //我們隻需要知道實作 call方法就能拿到訂閱者
        @Override
        public Subscriber<? super R> call(final Subscriber<? super R> child) {
            final CompositeSubscription s = new CompositeSubscription();
            child.add(s);
            final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();
            TimerListener listener = new TimerListener() {
              //逾時執行的邏輯
                @Override
                public void tick() {
                    if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                    //發送逾時時間
                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
                        //取消訂閱
                        s.unsubscribe();
                  //如果逾時 抛出 HystrixTimeoutException異常
                        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
                            @Override
                            public void run() {
                                child.onError(new HystrixTimeoutException());
                            }
                        });
                        timeoutRunnable.run();
                    }
                }
                //從配置擷取逾時時間
                @Override
                public int getIntervalTimeInMilliseconds() {
                    return originalCommand.properties.executionTimeoutInMilliseconds().get();
                }
            };
            //向HystrixTimer中添加監聽器
            final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
            originalCommand.timeoutTimer.set(tl);
            //定義Parent 和child關聯起來 隻有沒有逾時的時候才會執行child的 onNext onCompleted onError等方法
            Subscriber<R> parent = new Subscriber<R>() {
                @Override
                public void onCompleted() {
                    if (isNotTimedOut()) {
                        tl.clear();
                        child.onCompleted();
                    }
                }
                @Override
                public void onError(Throwable e) {
                    if (isNotTimedOut()) {
                        tl.clear();
                        child.onError(e);
                    }
                }
                @Override
                public void onNext(R v) {
                    if (isNotTimedOut()) {
                        child.onNext(v);
                    }
                }
                private boolean isNotTimedOut() {
                    return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||                            originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
                }
            };
            s.add(parent);
            return parent;
        }

    }

           

demo

隔離政策為SEMAPHORE ,逾時時間9s

public class TimeoutCommand extends HystrixCommand<String> {

  private String name;

  protected TimeoutCommand(String name) {

    super(HystrixCommandGroupKey.Factory.asKey(name));
    this.name = name;
  }

  protected TimeoutCommand(Setter setter , String name) {
    super(setter);
    this.name = name;
  }

  @Override
  protected String run() throws Exception {
    TimeUnit.SECONDS.sleep(10);
    return this.name;
  }
}

 @Test
  public void fun2() throws InterruptedException {
    HystrixCommand.Setter commandSetter = HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("coredy"))
                                       .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                                       .withExecutionTimeoutInMilliseconds(9000)
                                       .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE));
    TimeoutCommand command = new TimeoutCommand(commandSetter , "coredy");
    command.toObservable().subscribe(item ->{
      System.out.println(Thread.currentThread().getName());
      System.out.println(item);
    });
    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
  }

           

繼續閱讀