天天看点

SpringCloud Hystrix原理分析

SpringCloud Hystrix原理分析

    • 前置说明
    • hystrix中用到的rxjava方法(会的请跳过)
    • hystrix 执行过程原理分析
    • hystrix熔断器数据统计 --扩展知识
    • 总结

前置说明

本文源码基于

hystrix1.5.18

hystrix中用到的rxjava方法(会的请跳过)

首先rxjava是响应式流的编程模型, 所有的角色分为Observable(观察对象), Observer(观察者, Subscription(订阅), Subject(又是观察对象又是观察者), 所有的操作分为来源操作, 中间操作, 订阅操作

  • hystrix使用的来源操作有, 例如
    • defer - 这个操作就是用于延迟加载观察对象, 尤其是在观察对象来源于网络时
  • hystrix使用的中间操作 例如
    • map/flatMap - 用来对每个数据对象进行转换 ;
    • doOnxxx - 这个操作用于在对应的状态中进行数据观察, 不能做出修改, 比如
      • doOnNext, 那么就是在onNext的状态下获取流经的数据
    • window - 这是一个窗口函数, 有两种用途, 一种根据次数进行窗口分组, 一种根据时间进行创建口分组, 那么hystrix中将两者结合实现滑动窗口的数据统计. 具体是将完成的事件按照时间分区(假设每100ms), 然后再按照每10个分组, 那么就能继续每秒的请求数量统计了
    • share - 使得流成为共享流, 即每次订阅都不会重新创建来源, 而是在原有的流上一起消费
    • onBackpressureDrop - 被压的一种策略, 丢弃策略
    • startWith - 添加一些数据到作为流最开始发送的数据
    • onErrorResumeNext - 在发送onError事件是, 进行流的替换
    • subscribeOn - 切换发生订阅时及其之后执行的所在线程
  • hystrix使用的订阅操作 这个没啥可以介绍的, 就是触发整个流程的, 进行最后的数据消费

举例

@Test
    public void test1(){
      //创建一个信号发送器
     Observable<String> map = Observable.defer(()-> //defer主要是延迟作用
                Observable.create((Observable.OnSubscribe<Integer>) subscriber -> {
            subscriber.onStart();
            subscriber.onNext(1);
            subscriber.onError(new RuntimeException());

        }))
                .startWith(0) //设置最开始的数据是0
                .doOnError(e-> System.err.println("error:"+e.getLocalizedMessage())) //出现异常信号时打印
                .onErrorResumeNext(Observable.just(1, 2, 3)) //发送异常时 重新发送1,2,3
                .doOnNext(e->System.out.println("next:"+e)) // 每次next信号都打印一下
                .map(Object::toString) //转成字符串
                .subscribe(System.out::println); //订阅并打印

 
 // subject 既可以作为可观察者, 又可以作为观察者
        PublishSubject<String> objectPublishSubject = PublishSubject.create();
        map.subscribe(objectPublishSubject);
        objectPublishSubject.subscribe(System.out::println, e -> System.err.println(e.getLocalizedMessage()));

    }
           

hystrix 执行过程原理分析

接下来开始分析hystrix源码, 首先我们需要找到入口函数, 所以先看我给出的demo

public class HelloServiceCommand extends HystrixCommand<String> {

    private RestTemplate restTemplate;

    protected HelloServiceCommand(String commandGroupKey, RestTemplate restTemplate) {
        super(HystrixCommandGroupKey.Factory.asKey(commandGroupKey));
        this.restTemplate = restTemplate;
    }

    @Override
    protected String run() throws Exception {
        System.out.println(Thread.currentThread().getName());
        return restTemplate.getForEntity("http://HELLO-SERVICE/hello", String.class).getBody()  ;
    }

    @Override
    protected String getFallback() {
        return "error";
    }

    @Override
    protected String getCacheKey() {
        return "hello";
    }
}
           

这个就是原生的hystrix的使用, 所以我们可以看到run就是我们本身要执行的可能会失败或者超时的业务代码, 然后getFallback是降级处理, 之后我们调用

command.queue().get()

就能获取到结果.

那么我们快速进入源代码中, 会省略很多无关的代码

首先是AbstractCommand, 这里最终要的就是

toObservable

方法, 我们会看到很多匿名内部类, 我们只讲重要的匿名类

public Observable<R> toObservable() {
     	final AbstractCommand<R> _cmd = this;
	// 这个匿名内部类主要是在流 完成时的清扫工作, 以及发送完成的事件
	 	final Action0 terminateCommandCleanup = xxx
	// 这个匿名内部类主要是在流 取消时的清扫工作, 以及发送完成的事件	
		final Action0 unsubscribeCommandCleanup =xxx
	// 这个匿名类相当重要, 里面包含了熔断逻辑判断, 隔离逻辑判断, 以及业务执行逻辑
		final Func0<Observable<R>> applyHystrixSemantics =xxx
	//... 跳过一大堆钩子方法的代码来到最后的return
		return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                // ...省略无关代码
                final boolean requestCacheEnabled = isRequestCachingEnabled();
                final String cacheKey = getCacheKey();
                if (requestCacheEnabled) { //是否开启缓存
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        // 如果缓存命中 则直接返回
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }
                	
                // 没有命中缓存则执行原来的hystrix处理链
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics) //这个匿名内部类上面有提到的是执行原有业务逻辑用的
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // 放入缓存
                if (requestCacheEnabled && cacheKey != null) {
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    if (fromCache != null) {
                        toCache.unsubscribe();
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    } else {  
                        afterCache = toCache.toObservable();
                    }
                } else {
                    afterCache = hystrixObservable;
                }

                return afterCache 
                        .doOnTerminate(terminateCommandCleanup)   
                        .doOnUnsubscribe(unsubscribeCommandCleanup)
                        .doOnCompleted(fireOnCompletedHook);
            }
        });
	}
           

下面具体分析一下 applyHystrixSemantics

final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                    return Observable.never();
                }
                return applyHystrixSemantics(_cmd);
            }
        };

	private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // ...
        // 判断断路器是否开启
        if (circuitBreaker.allowRequest()) {
            //...
            // 尝试获取信号量, 在信号量隔离模式下这里的实现类是TryableSemaphoreActual, 否则是TryableSemaphoreNoOp
            if (executionSemaphore.tryAcquire()) {
                try {
              
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd) //下面具体分析
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else { // 超过并发数 直接降级处理
                return handleSemaphoreRejectionViaFallback();
            }
        } else { // 断路器已经开启 直接降价处理
            return handleShortCircuitViaFallback();
        }
    }

	// 继续分析一下executeCommandAndObserve
 	private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {

		// 这个匿名内部类 主要在下面的onErrorResumeNext, 也就是发送异常信号时返回一个错误的新流
		final Func1<Throwable, Observable<R>> handleFallback = xx

		 Observable<R> execution;
        if (properties.executionTimeoutEnabled().get()) { //是否开启超时判断
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd); //下面具体分析
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);

	}
	
	// 执行隔离策略
 	private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
	 // 是否线程池隔离模式
 		if (properties.executionIsolationStrategy().get() == 		ExecutionIsolationStrategy.THREAD) {
 	 		return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    ...
                  
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                        HystrixCounters.incrementGlobalConcurrentThreads();
                        threadPool.markThreadExecution();
                        endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                        executionResult = executionResult.setExecutedInThread();
                        try {
                            executionHook.onThreadStart(_cmd);
                            executionHook.onRunStart(_cmd);
                            executionHook.onExecutionStart(_cmd);
                            return getUserExecutionObservable(_cmd); //执行用户的代码
                        } catch (Throwable ex) {
                            return Observable.error(ex);
                        }
                    } else {
                        return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                    }
				// ... 省略其他的doOnxxx
				 // 设置订阅时的执行线程池, 同时也是实现线程舱壁隔离的地方, 这里可以理解为把每次流的处理都会交给线程池执行.,如果线程池满了,那么就会触发任务拒绝
                 }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
		}} else {
		 return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                  // ...
                        return getUserExecutionObservable(_cmd); //执行用户代码
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                }
            });
        }

	}

   private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
        Observable<R> userObservable;
        try {
            userObservable = getExecutionObservable();
        } catch (Throwable ex) {
            userObservable = Observable.error(ex);
        }
        return userObservable
                .lift(new ExecutionHookApplication(_cmd))
                .lift(new DeprecatedOnRunHookApplication(_cmd));
    }


 	final protected Observable<R> getExecutionObservable() {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                try {
                    return Observable.just(run()); //这里终于调用我们自己重写的run了
                } catch (Throwable ex) {
                    return Observable.error(ex);
                }
            }
        }).doOnSubscribe(new Action0() {
            @Override
            public void call() {
                executionThread.set(Thread.currentThread());
            }
        });
    }
           

到此的话基本的执行流程就走完了, 其实所有的逻辑基本都在一个类里面, 都是通过rxjava进行穿起来. 我基本跳过了所有不重要的分支, 直走主干.

hystrix熔断器数据统计 --扩展知识

首先要HystrixCircuitBreaker.isOpen()是用来判断是否符合熔断, AbstractCommand#handleCommandEnd()是用来发送完成的事件。 那么hystrix 是如何接受事件然后生成数据给熔断器进行判断呢?

这里直接给出流的源头和流经的地方

SpringCloud Hystrix原理分析

根据图的流向, 我们可以判断出HystrixCircuitBreaker.isOpen()最终是通过BucketedCounterStream的counterSubject来获取滑动窗口的技术, 而AbstractCommand#handleCommandEnd()是发送给HystrixCommandCompletionStream的writeOnlySubject来完成事件的通知

最后给出一个基于rxjava的实现滑动窗口计算的demo, 理解这个demo之后, 再结合上面的数据流转就可以明白hystrix计数对的原理了。

@Test
    public void testWindow(){
        Flux.interval(Duration.ofMillis(300))
                .window(Duration.ofMillis(500L))
                .flatMap(e -> e.count()) //计算每个500ms内的数量
                .window(2)
                .flatMap(e -> e.reduce((a, b) -> a + b)) 、、计算每2个窗口内的数量
                .subscribe(e -> System.out.println(new Date()+":"+e));
    }

           

总结

看完上面后, 下面是我梳理的hystrix 的执行流程, 看看是否和你的阅读结果一致

创建Command -> 调用execute方法 -> 判断是否命中缓存 -> 判断是否熔断 -> 判断是否超过隔离数量 -> 执行代码 -> 是否执行超时 -> 返回执行结果/ 返回降级结果

为啥hystrix官方不维护了?

首先是netflix自己是觉得功能比较完善了, 那么没必要继续迭代什么新的功能. 但是我发现hystrix给予rxjava1.x的版本, 实际上现在rxjava都已经3.x 了, 而各个版本之间的差异较大, 如果hystrix要继续迭代, 那么还需要将rxjava版本逐渐迭代上去, 这个就过于麻烦, 可能会有很多问题存在.

继续阅读