天天看点

03.微服务容错:Spring Cloud Hystrix (Greenwich.SR2)

1 雪崩效应

在分布式系统中,多个微服务之间互相调用,互相依赖。那么,当一个微服务出现故障时,调用它的其它微服务由于无法获得结果,可能出现一直等待的情况。即使设置的超时时间,在达到超时时间的这几秒内,线程也都是处于阻塞状态。如果并发请求过高,就会导致越来越多的线程都处于阻塞状态,可用线程越来越少,直到系统崩溃。而不同微服务之间存在各种依赖,服务A的崩溃,导致服务B的崩溃,服务B的崩溃导致服务C和D的崩溃,进而出现级联故障,这就是雪崩效应。

2 什么是Spring Cloud Hystrix?

Hystrix是Netflix开源的一个延迟和容错库,提供了熔断器的功能,隔离访问远程系统,防止级联失败。

Hystrix可以包裹对服务的调用,当某服务的错误率超过某个阈值时,就会认为该服务发生了故障,并且打开熔断器,从而停止对该服务的访问。

当请求失败、超时或者熔断器被打开时,会执行回退逻辑,回退逻辑可以由开发人员提供,返回一个默认值。

熔断器打开一段时间后,还会进入半开模式,将一部分请求执行正常逻辑,其它请求快速失败。如果执行正常逻辑的请求,访问微服务并且成功拿到返回结果,则熔断器关闭,否则熔断器继续打开,这就是熔断器的自我修复机制。

3 快速入门

3.1 前提

需要准备如下图的服务架构:

03.微服务容错:Spring Cloud Hystrix (Greenwich.SR2)

3.2 引入Hystrix

3.2.1 ribbon-consumer工程添加依赖

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-netflix-hystrix</artifactId>
  </dependency>
           

3.2.2 主类添加注解

@EnableHystrix

@EnableCircuitBreaker

开启断路器功能

@EnableCircuitBreaker
@EnableDiscoveryClient
@SpringBootApplication
public class RibbonConsumerApplication {

    @Bean
    @LoadBalanced
    RestTemplate restTemplate(){
        return new RestTemplate();
    }

    public static void main(String[] args) {
        SpringApplication.run(RibbonConsumerApplication.class, args);
    }

}
           

注: 这里也可以使用注解

@SpringCloudApplication

来修饰主类,查看其定义如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootApplication
@EnableDiscoveryClient
@EnableCircuitBreaker
public @interface SpringCloudApplication {

}
           

包含上面三个注解,这说明一个spring cloud 标准应用包含服务发现以及断路器。

3.2.3 编写 HelloController

@RestController
public class HelloController {

    @Autowired
    private HelloService helloService;

    @GetMapping(value = "/consumerHello")
    public String hello(){

        return helloService.hello();
    }
}
           

3.2.4 编写 HelloService

注解

@HystrixCommand

指定回调方法

@Service
public class HelloService {

    @Autowired
    RestTemplate restTemplate;

    @HystrixCommand(fallbackMethod = "helloFallback")
    public String hello(){

        return restTemplate.getForEntity("http://eureka-provider/hello",String.class).getBody();
    }

    public String helloFallback() {

        return "error";
    }

}
           

3.2.5 验证

将服务启动,访问 http://localhost:1201/consumerHello 能正常得到 hello,这时,停用一个服务提供者,再次请求,则会得到error。

4 源码分析

上面讲到使用Hystrix的方法:

1.启动类添加@EnableHystrix注解。

2.方法上添加@HystrixCommand注解,并指定fallback的方法。

4.1 查看注解

@EnableHystrix

/**
 * Convenience annotation for clients to enable Hystrix circuit breakers (specifically).
 * Use this (optionally) in case you want discovery and know for sure that it is Hystrix
 * you want. All it does is turn on circuit breakers and let the autoconfiguration find
 * the Hystrix classes if they are available (i.e. you need Hystrix on the classpath as
 * well).
 *
 * @author Dave Syer
 * @author Spencer Gibb
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableCircuitBreaker
public @interface EnableHystrix {

}

           

这个注解的功能就是开启Hystrix。这个注解还引入了@EnableCircuitBreaker注解。

在代码同一级目录下,还可以看到两个配置类:HystrixAutoConfiguration和HystrixCircuitBreakerConfiguration。

4.2 查看

HystrixAutoConfiguration

/**
 * Auto configuration for Hystrix.
 *
 * @author Christian Dupuis
 * @author Dave Syer
 */
@Configuration
@ConditionalOnClass({ Hystrix.class, HealthIndicator.class,
		HealthIndicatorAutoConfiguration.class })
@AutoConfigureAfter({ HealthIndicatorAutoConfiguration.class })
public class HystrixAutoConfiguration {

	@Bean
	@ConditionalOnEnabledHealthIndicator("hystrix")
	public HystrixHealthIndicator hystrixHealthIndicator() {
		return new HystrixHealthIndicator();
	}
	....
}
           

从代码中可以看到,HystrixAutoConfiguration这个配置类主要是hystrix的健康检查的配置。

4.3 查看

HystrixCircuitBreakerConfiguration

@Configuration
public class HystrixCircuitBreakerConfiguration {

	@Bean
	public HystrixCommandAspect hystrixCommandAspect() {
		return new HystrixCommandAspect();
	}

	@Bean
	public HystrixShutdownHook hystrixShutdownHook() {
		return new HystrixShutdownHook();
	}

	@Bean
	public HasFeatures hystrixFeature() {
		return HasFeatures
				.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
	}

	/**
	 * {@link DisposableBean} that makes sure that Hystrix internal state is cleared when
	 * {@link ApplicationContext} shuts down.
	 */
	private class HystrixShutdownHook implements DisposableBean {

		@Override
		public void destroy() throws Exception {
			// Just call Hystrix to reset thread pool etc.
			Hystrix.reset();
		}

	}

}

           

这里返回了

HystrixCommandAspect

的Bean。

4.4 查看

HystrixCommandAspect

/**
 * AspectJ aspect to process methods which annotated with {@link HystrixCommand} annotation.
 */
@Aspect
public class HystrixCommandAspect {

    private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;

    static {
        META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
                .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
                .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
                .build();
    }

    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
    public void hystrixCommandAnnotationPointcut() {
    }

    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
    public void hystrixCollapserAnnotationPointcut() {
    }

    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                    "annotations at the same time");
        }
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

        Object result;
        try {
            if (!metaHolder.isObservable()) {
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause();
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }
    ...
}
           

这个切面里定义了两个切点,所以,这个Aspect就是利用AOP切面对 HystrixCommand 、 HystrixCollapser 两种注解的方法进行扩展处理。

我们在方法上添加@HystrixCommand注解,就会经过这个切面,这个切面中定义了@Around(…)拦截所有请求。

查看方法

methodsAnnotatedWithHystrixCommand

, 这个方法中,一开始先获取拦截的Method,然后判断,如果方法上同时加了@HystrixCommand和@HystrixCollapser两个注解的话,就抛异常。

在创建MetaHolder的时候,调用了MetaHolderFactory的create方法,MetaHolderFactory有两个子类:CollapserMetaHolderFactory和CommandMetaHolderFactory,最终执行的是子类的create方法,下面主要看CommandMetaHolderFactory中的create方法:

private static class CollapserMetaHolderFactory extends MetaHolderFactory {
        @Override
        public MetaHolder create(Object proxy, Method collapserMethod, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
            ....
        }
    }

private static class CommandMetaHolderFactory extends MetaHolderFactory {
        @Override
        public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
            HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
            ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
            MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
            if (isCompileWeaving()) {
                builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
            }
            return builder.defaultCommandKey(method.getName())
                            .hystrixCommand(hystrixCommand)
                            .observableExecutionMode(hystrixCommand.observableExecutionMode())
                            .executionType(executionType)
                            .observable(ExecutionType.OBSERVABLE == executionType)
                            .build();
        }
    }
           

metaHolderBuilder 的方法:

MetaHolder.Builder metaHolderBuilder(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
            MetaHolder.Builder builder = MetaHolder.builder()
                    .args(args).method(method).obj(obj).proxyObj(proxy)
                    .joinPoint(joinPoint);

            setFallbackMethod(builder, obj.getClass(), method);
            builder = setDefaultProperties(builder, obj.getClass(), joinPoint);
            return builder;
        }
           

setFallbackMethod 方法:

private static MetaHolder.Builder setFallbackMethod(MetaHolder.Builder builder, Class<?> declaringClass, Method commandMethod) {
        FallbackMethod fallbackMethod = MethodProvider.getInstance().getFallbackMethod(declaringClass, commandMethod);
        if (fallbackMethod.isPresent()) {
            fallbackMethod.validateReturnType(commandMethod);
            builder
                    .fallbackMethod(fallbackMethod.getMethod())
                    .fallbackExecutionType(ExecutionType.getExecutionType(fallbackMethod.getMethod().getReturnType()));
        }
        return builder;
    }
           

在创建MetaHolder的过程中,就会指定fallback方法。

创建完MetaHolder之后,就会根据MetaHolder创建HystrixInvokable:

HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);

查看 HystrixCommandFactory:

public class HystrixCommandFactory {

    private static final HystrixCommandFactory INSTANCE = new HystrixCommandFactory();

    private HystrixCommandFactory() {

    }

    public static HystrixCommandFactory getInstance() {
        return INSTANCE;
    }

    public HystrixInvokable create(MetaHolder metaHolder) {
        HystrixInvokable executable;
        if (metaHolder.isCollapserAnnotationPresent()) {
            executable = new CommandCollapser(metaHolder);
        } else if (metaHolder.isObservable()) {
            executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        } else {
            executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        }
        return executable;
    }

   ...
}
           

这段代码里定义了后续真正执行HystrixCommand的GenericCommand实例,方法最终会去执行CommandExecutor.execute方法:

/**
 * Invokes necessary method of {@link HystrixExecutable} or {@link HystrixObservable} for specified execution type:
 * <p/>
 * {@link ExecutionType#SYNCHRONOUS} -> {@link com.netflix.hystrix.HystrixExecutable#execute()}
 * <p/>
 * {@link ExecutionType#ASYNCHRONOUS} -> {@link com.netflix.hystrix.HystrixExecutable#queue()}
 * <p/>
 * {@link ExecutionType#OBSERVABLE} -> depends on specify observable execution mode:
 * {@link ObservableExecutionMode#EAGER} - {@link HystrixObservable#observe()},
 * {@link ObservableExecutionMode#LAZY} -  {@link HystrixObservable#toObservable()}.
 */
public class CommandExecutor {

    /**
     * Calls a method of {@link HystrixExecutable} in accordance with specified execution type.
     *
     * @param invokable  {@link HystrixInvokable}
     * @param metaHolder {@link MetaHolder}
     * @return the result of invocation of specific method.
     * @throws RuntimeException
     */
    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);

        switch (executionType) {
            case SYNCHRONOUS: {
                return castToExecutable(invokable, executionType).execute();
            }
            case ASYNCHRONOUS: {
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                if (metaHolder.hasFallbackMethodCommand()
                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            case OBSERVABLE: {
                HystrixObservable observable = castToObservable(invokable);
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
            }
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }

   ....

}

           

这里会分成同步和异步的场景,进入execute方法看下:

/**
     * Used for synchronous execution of command.
     * 
     * @return R
     *         Result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
     * @throws HystrixRuntimeException
     *             if a failure occurs and a fallback cannot be retrieved
     * @throws HystrixBadRequestException
     *             if invalid arguments or state were used representing a user failure, not a system failure
     * @throws IllegalStateException
     *             if invoked more than once
     */
    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }
           

这个方法的注释中说明了返回值,可以返回请求的结果,当失败的时候,则会通过getFallback()方法来执行一个回退操作,由于是GenericCommand实例,那就看下这个实例中的getFallback()方法:

/**
     * The fallback is performed whenever a command execution fails.
     * Also a fallback method will be invoked within separate command in the case if fallback method was annotated with
     * HystrixCommand annotation, otherwise current implementation throws RuntimeException and leaves the caller to deal with it
     * (see {@link super#getFallback()}).
     * The getFallback() is always processed synchronously.
     * Since getFallback() can throw only runtime exceptions thus any exceptions are thrown within getFallback() method
     * are wrapped in {@link FallbackInvocationException}.
     * A caller gets {@link com.netflix.hystrix.exception.HystrixRuntimeException}
     * and should call getCause to get original exception that was thrown in getFallback().
     *
     * @return result of invocation of fallback method or RuntimeException
     */
    @Override
    protected Object getFallback() {
        final CommandAction commandAction = getFallbackAction();
        if (commandAction != null) {
            try {
                return process(new Action() {
                    @Override
                    Object execute() {
                        MetaHolder metaHolder = commandAction.getMetaHolder();
                        Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                        return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                    }
                });
            } catch (Throwable e) {
                LOGGER.error(FallbackErrorMessageBuilder.create()
                        .append(commandAction, e).build());
                throw new FallbackInvocationException(unwrapCause(e));
            }
        } else {
            return super.getFallback();
        }
    }
           

看到这里,大体的一个流程也就知道了,就是通过HystrixCommandAspect,请求成功返回接口的结果,请求失败执行fallback的逻辑。

继续阅读