天天看點

03.微服務容錯:Spring Cloud Hystrix (Greenwich.SR2)

1 雪崩效應

在分布式系統中,多個微服務之間互相調用,互相依賴。那麼,當一個微服務出現故障時,調用它的其它微服務由于無法獲得結果,可能出現一直等待的情況。即使設定的逾時時間,在達到逾時時間的這幾秒内,線程也都是處于阻塞狀态。如果并發請求過高,就會導緻越來越多的線程都處于阻塞狀态,可用線程越來越少,直到系統崩潰。而不同微服務之間存在各種依賴,服務A的崩潰,導緻服務B的崩潰,服務B的崩潰導緻服務C和D的崩潰,進而出現級聯故障,這就是雪崩效應。

2 什麼是Spring Cloud Hystrix?

Hystrix是Netflix開源的一個延遲和容錯庫,提供了熔斷器的功能,隔離通路遠端系統,防止級聯失敗。

Hystrix可以包裹對服務的調用,當某服務的錯誤率超過某個門檻值時,就會認為該服務發生了故障,并且打開熔斷器,進而停止對該服務的通路。

當請求失敗、逾時或者熔斷器被打開時,會執行回退邏輯,回退邏輯可以由開發人員提供,傳回一個預設值。

熔斷器打開一段時間後,還會進入半開模式,将一部分請求執行正常邏輯,其它請求快速失敗。如果執行正常邏輯的請求,通路微服務并且成功拿到傳回結果,則熔斷器關閉,否則熔斷器繼續打開,這就是熔斷器的自我修複機制。

3 快速入門

3.1 前提

需要準備如下圖的服務架構:

03.微服務容錯:Spring Cloud Hystrix (Greenwich.SR2)

3.2 引入Hystrix

3.2.1 ribbon-consumer工程添加依賴

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-netflix-hystrix</artifactId>
  </dependency>
           

3.2.2 主類添加注解

@EnableHystrix

@EnableCircuitBreaker

開啟斷路器功能

@EnableCircuitBreaker
@EnableDiscoveryClient
@SpringBootApplication
public class RibbonConsumerApplication {

    @Bean
    @LoadBalanced
    RestTemplate restTemplate(){
        return new RestTemplate();
    }

    public static void main(String[] args) {
        SpringApplication.run(RibbonConsumerApplication.class, args);
    }

}
           

注: 這裡也可以使用注解

@SpringCloudApplication

來修飾主類,檢視其定義如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootApplication
@EnableDiscoveryClient
@EnableCircuitBreaker
public @interface SpringCloudApplication {

}
           

包含上面三個注解,這說明一個spring cloud 标準應用包含服務發現以及斷路器。

3.2.3 編寫 HelloController

@RestController
public class HelloController {

    @Autowired
    private HelloService helloService;

    @GetMapping(value = "/consumerHello")
    public String hello(){

        return helloService.hello();
    }
}
           

3.2.4 編寫 HelloService

注解

@HystrixCommand

指定回調方法

@Service
public class HelloService {

    @Autowired
    RestTemplate restTemplate;

    @HystrixCommand(fallbackMethod = "helloFallback")
    public String hello(){

        return restTemplate.getForEntity("http://eureka-provider/hello",String.class).getBody();
    }

    public String helloFallback() {

        return "error";
    }

}
           

3.2.5 驗證

将服務啟動,通路 http://localhost:1201/consumerHello 能正常得到 hello,這時,停用一個服務提供者,再次請求,則會得到error。

4 源碼分析

上面講到使用Hystrix的方法:

1.啟動類添加@EnableHystrix注解。

2.方法上添加@HystrixCommand注解,并指定fallback的方法。

4.1 檢視注解

@EnableHystrix

/**
 * Convenience annotation for clients to enable Hystrix circuit breakers (specifically).
 * Use this (optionally) in case you want discovery and know for sure that it is Hystrix
 * you want. All it does is turn on circuit breakers and let the autoconfiguration find
 * the Hystrix classes if they are available (i.e. you need Hystrix on the classpath as
 * well).
 *
 * @author Dave Syer
 * @author Spencer Gibb
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableCircuitBreaker
public @interface EnableHystrix {

}

           

這個注解的功能就是開啟Hystrix。這個注解還引入了@EnableCircuitBreaker注解。

在代碼同一級目錄下,還可以看到兩個配置類:HystrixAutoConfiguration和HystrixCircuitBreakerConfiguration。

4.2 檢視

HystrixAutoConfiguration

/**
 * Auto configuration for Hystrix.
 *
 * @author Christian Dupuis
 * @author Dave Syer
 */
@Configuration
@ConditionalOnClass({ Hystrix.class, HealthIndicator.class,
		HealthIndicatorAutoConfiguration.class })
@AutoConfigureAfter({ HealthIndicatorAutoConfiguration.class })
public class HystrixAutoConfiguration {

	@Bean
	@ConditionalOnEnabledHealthIndicator("hystrix")
	public HystrixHealthIndicator hystrixHealthIndicator() {
		return new HystrixHealthIndicator();
	}
	....
}
           

從代碼中可以看到,HystrixAutoConfiguration這個配置類主要是hystrix的健康檢查的配置。

4.3 檢視

HystrixCircuitBreakerConfiguration

@Configuration
public class HystrixCircuitBreakerConfiguration {

	@Bean
	public HystrixCommandAspect hystrixCommandAspect() {
		return new HystrixCommandAspect();
	}

	@Bean
	public HystrixShutdownHook hystrixShutdownHook() {
		return new HystrixShutdownHook();
	}

	@Bean
	public HasFeatures hystrixFeature() {
		return HasFeatures
				.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
	}

	/**
	 * {@link DisposableBean} that makes sure that Hystrix internal state is cleared when
	 * {@link ApplicationContext} shuts down.
	 */
	private class HystrixShutdownHook implements DisposableBean {

		@Override
		public void destroy() throws Exception {
			// Just call Hystrix to reset thread pool etc.
			Hystrix.reset();
		}

	}

}

           

這裡傳回了

HystrixCommandAspect

的Bean。

4.4 檢視

HystrixCommandAspect

/**
 * AspectJ aspect to process methods which annotated with {@link HystrixCommand} annotation.
 */
@Aspect
public class HystrixCommandAspect {

    private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;

    static {
        META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
                .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
                .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
                .build();
    }

    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
    public void hystrixCommandAnnotationPointcut() {
    }

    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
    public void hystrixCollapserAnnotationPointcut() {
    }

    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                    "annotations at the same time");
        }
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

        Object result;
        try {
            if (!metaHolder.isObservable()) {
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause();
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }
    ...
}
           

這個切面裡定義了兩個切點,是以,這個Aspect就是利用AOP切面對 HystrixCommand 、 HystrixCollapser 兩種注解的方法進行擴充處理。

我們在方法上添加@HystrixCommand注解,就會經過這個切面,這個切面中定義了@Around(…)攔截所有請求。

檢視方法

methodsAnnotatedWithHystrixCommand

, 這個方法中,一開始先擷取攔截的Method,然後判斷,如果方法上同時加了@HystrixCommand和@HystrixCollapser兩個注解的話,就抛異常。

在建立MetaHolder的時候,調用了MetaHolderFactory的create方法,MetaHolderFactory有兩個子類:CollapserMetaHolderFactory和CommandMetaHolderFactory,最終執行的是子類的create方法,下面主要看CommandMetaHolderFactory中的create方法:

private static class CollapserMetaHolderFactory extends MetaHolderFactory {
        @Override
        public MetaHolder create(Object proxy, Method collapserMethod, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
            ....
        }
    }

private static class CommandMetaHolderFactory extends MetaHolderFactory {
        @Override
        public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
            HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
            ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
            MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
            if (isCompileWeaving()) {
                builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
            }
            return builder.defaultCommandKey(method.getName())
                            .hystrixCommand(hystrixCommand)
                            .observableExecutionMode(hystrixCommand.observableExecutionMode())
                            .executionType(executionType)
                            .observable(ExecutionType.OBSERVABLE == executionType)
                            .build();
        }
    }
           

metaHolderBuilder 的方法:

MetaHolder.Builder metaHolderBuilder(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
            MetaHolder.Builder builder = MetaHolder.builder()
                    .args(args).method(method).obj(obj).proxyObj(proxy)
                    .joinPoint(joinPoint);

            setFallbackMethod(builder, obj.getClass(), method);
            builder = setDefaultProperties(builder, obj.getClass(), joinPoint);
            return builder;
        }
           

setFallbackMethod 方法:

private static MetaHolder.Builder setFallbackMethod(MetaHolder.Builder builder, Class<?> declaringClass, Method commandMethod) {
        FallbackMethod fallbackMethod = MethodProvider.getInstance().getFallbackMethod(declaringClass, commandMethod);
        if (fallbackMethod.isPresent()) {
            fallbackMethod.validateReturnType(commandMethod);
            builder
                    .fallbackMethod(fallbackMethod.getMethod())
                    .fallbackExecutionType(ExecutionType.getExecutionType(fallbackMethod.getMethod().getReturnType()));
        }
        return builder;
    }
           

在建立MetaHolder的過程中,就會指定fallback方法。

建立完MetaHolder之後,就會根據MetaHolder建立HystrixInvokable:

HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);

檢視 HystrixCommandFactory:

public class HystrixCommandFactory {

    private static final HystrixCommandFactory INSTANCE = new HystrixCommandFactory();

    private HystrixCommandFactory() {

    }

    public static HystrixCommandFactory getInstance() {
        return INSTANCE;
    }

    public HystrixInvokable create(MetaHolder metaHolder) {
        HystrixInvokable executable;
        if (metaHolder.isCollapserAnnotationPresent()) {
            executable = new CommandCollapser(metaHolder);
        } else if (metaHolder.isObservable()) {
            executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        } else {
            executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        }
        return executable;
    }

   ...
}
           

這段代碼裡定義了後續真正執行HystrixCommand的GenericCommand執行個體,方法最終會去執行CommandExecutor.execute方法:

/**
 * Invokes necessary method of {@link HystrixExecutable} or {@link HystrixObservable} for specified execution type:
 * <p/>
 * {@link ExecutionType#SYNCHRONOUS} -> {@link com.netflix.hystrix.HystrixExecutable#execute()}
 * <p/>
 * {@link ExecutionType#ASYNCHRONOUS} -> {@link com.netflix.hystrix.HystrixExecutable#queue()}
 * <p/>
 * {@link ExecutionType#OBSERVABLE} -> depends on specify observable execution mode:
 * {@link ObservableExecutionMode#EAGER} - {@link HystrixObservable#observe()},
 * {@link ObservableExecutionMode#LAZY} -  {@link HystrixObservable#toObservable()}.
 */
public class CommandExecutor {

    /**
     * Calls a method of {@link HystrixExecutable} in accordance with specified execution type.
     *
     * @param invokable  {@link HystrixInvokable}
     * @param metaHolder {@link MetaHolder}
     * @return the result of invocation of specific method.
     * @throws RuntimeException
     */
    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);

        switch (executionType) {
            case SYNCHRONOUS: {
                return castToExecutable(invokable, executionType).execute();
            }
            case ASYNCHRONOUS: {
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                if (metaHolder.hasFallbackMethodCommand()
                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            case OBSERVABLE: {
                HystrixObservable observable = castToObservable(invokable);
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
            }
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }

   ....

}

           

這裡會分成同步和異步的場景,進入execute方法看下:

/**
     * Used for synchronous execution of command.
     * 
     * @return R
     *         Result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
     * @throws HystrixRuntimeException
     *             if a failure occurs and a fallback cannot be retrieved
     * @throws HystrixBadRequestException
     *             if invalid arguments or state were used representing a user failure, not a system failure
     * @throws IllegalStateException
     *             if invoked more than once
     */
    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }
           

這個方法的注釋中說明了傳回值,可以傳回請求的結果,當失敗的時候,則會通過getFallback()方法來執行一個回退操作,由于是GenericCommand執行個體,那就看下這個執行個體中的getFallback()方法:

/**
     * The fallback is performed whenever a command execution fails.
     * Also a fallback method will be invoked within separate command in the case if fallback method was annotated with
     * HystrixCommand annotation, otherwise current implementation throws RuntimeException and leaves the caller to deal with it
     * (see {@link super#getFallback()}).
     * The getFallback() is always processed synchronously.
     * Since getFallback() can throw only runtime exceptions thus any exceptions are thrown within getFallback() method
     * are wrapped in {@link FallbackInvocationException}.
     * A caller gets {@link com.netflix.hystrix.exception.HystrixRuntimeException}
     * and should call getCause to get original exception that was thrown in getFallback().
     *
     * @return result of invocation of fallback method or RuntimeException
     */
    @Override
    protected Object getFallback() {
        final CommandAction commandAction = getFallbackAction();
        if (commandAction != null) {
            try {
                return process(new Action() {
                    @Override
                    Object execute() {
                        MetaHolder metaHolder = commandAction.getMetaHolder();
                        Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                        return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                    }
                });
            } catch (Throwable e) {
                LOGGER.error(FallbackErrorMessageBuilder.create()
                        .append(commandAction, e).build());
                throw new FallbackInvocationException(unwrapCause(e));
            }
        } else {
            return super.getFallback();
        }
    }
           

看到這裡,大體的一個流程也就知道了,就是通過HystrixCommandAspect,請求成功傳回接口的結果,請求失敗執行fallback的邏輯。

繼續閱讀