1 雪崩效應
在分布式系統中,多個微服務之間互相調用,互相依賴。那麼,當一個微服務出現故障時,調用它的其它微服務由于無法獲得結果,可能出現一直等待的情況。即使設定的逾時時間,在達到逾時時間的這幾秒内,線程也都是處于阻塞狀态。如果并發請求過高,就會導緻越來越多的線程都處于阻塞狀态,可用線程越來越少,直到系統崩潰。而不同微服務之間存在各種依賴,服務A的崩潰,導緻服務B的崩潰,服務B的崩潰導緻服務C和D的崩潰,進而出現級聯故障,這就是雪崩效應。
2 什麼是Spring Cloud Hystrix?
Hystrix是Netflix開源的一個延遲和容錯庫,提供了熔斷器的功能,隔離通路遠端系統,防止級聯失敗。
Hystrix可以包裹對服務的調用,當某服務的錯誤率超過某個門檻值時,就會認為該服務發生了故障,并且打開熔斷器,進而停止對該服務的通路。
當請求失敗、逾時或者熔斷器被打開時,會執行回退邏輯,回退邏輯可以由開發人員提供,傳回一個預設值。
熔斷器打開一段時間後,還會進入半開模式,将一部分請求執行正常邏輯,其它請求快速失敗。如果執行正常邏輯的請求,通路微服務并且成功拿到傳回結果,則熔斷器關閉,否則熔斷器繼續打開,這就是熔斷器的自我修複機制。
3 快速入門
3.1 前提
需要準備如下圖的服務架構:
3.2 引入Hystrix
3.2.1 ribbon-consumer工程添加依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-hystrix</artifactId>
</dependency>
3.2.2 主類添加注解 @EnableHystrix
或 @EnableCircuitBreaker
@EnableHystrix
@EnableCircuitBreaker
開啟斷路器功能
@EnableCircuitBreaker
@EnableDiscoveryClient
@SpringBootApplication
public class RibbonConsumerApplication {
@Bean
@LoadBalanced
RestTemplate restTemplate(){
return new RestTemplate();
}
public static void main(String[] args) {
SpringApplication.run(RibbonConsumerApplication.class, args);
}
}
注: 這裡也可以使用注解
@SpringCloudApplication
來修飾主類,檢視其定義如下:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootApplication
@EnableDiscoveryClient
@EnableCircuitBreaker
public @interface SpringCloudApplication {
}
包含上面三個注解,這說明一個spring cloud 标準應用包含服務發現以及斷路器。
3.2.3 編寫 HelloController
@RestController
public class HelloController {
@Autowired
private HelloService helloService;
@GetMapping(value = "/consumerHello")
public String hello(){
return helloService.hello();
}
}
3.2.4 編寫 HelloService
注解
@HystrixCommand
指定回調方法
@Service
public class HelloService {
@Autowired
RestTemplate restTemplate;
@HystrixCommand(fallbackMethod = "helloFallback")
public String hello(){
return restTemplate.getForEntity("http://eureka-provider/hello",String.class).getBody();
}
public String helloFallback() {
return "error";
}
}
3.2.5 驗證
将服務啟動,通路 http://localhost:1201/consumerHello 能正常得到 hello,這時,停用一個服務提供者,再次請求,則會得到error。
4 源碼分析
上面講到使用Hystrix的方法:
1.啟動類添加@EnableHystrix注解。
2.方法上添加@HystrixCommand注解,并指定fallback的方法。
4.1 檢視注解 @EnableHystrix
@EnableHystrix
/**
* Convenience annotation for clients to enable Hystrix circuit breakers (specifically).
* Use this (optionally) in case you want discovery and know for sure that it is Hystrix
* you want. All it does is turn on circuit breakers and let the autoconfiguration find
* the Hystrix classes if they are available (i.e. you need Hystrix on the classpath as
* well).
*
* @author Dave Syer
* @author Spencer Gibb
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableCircuitBreaker
public @interface EnableHystrix {
}
這個注解的功能就是開啟Hystrix。這個注解還引入了@EnableCircuitBreaker注解。
在代碼同一級目錄下,還可以看到兩個配置類:HystrixAutoConfiguration和HystrixCircuitBreakerConfiguration。
4.2 檢視 HystrixAutoConfiguration
HystrixAutoConfiguration
/**
* Auto configuration for Hystrix.
*
* @author Christian Dupuis
* @author Dave Syer
*/
@Configuration
@ConditionalOnClass({ Hystrix.class, HealthIndicator.class,
HealthIndicatorAutoConfiguration.class })
@AutoConfigureAfter({ HealthIndicatorAutoConfiguration.class })
public class HystrixAutoConfiguration {
@Bean
@ConditionalOnEnabledHealthIndicator("hystrix")
public HystrixHealthIndicator hystrixHealthIndicator() {
return new HystrixHealthIndicator();
}
....
}
從代碼中可以看到,HystrixAutoConfiguration這個配置類主要是hystrix的健康檢查的配置。
4.3 檢視 HystrixCircuitBreakerConfiguration
HystrixCircuitBreakerConfiguration
@Configuration
public class HystrixCircuitBreakerConfiguration {
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
@Bean
public HystrixShutdownHook hystrixShutdownHook() {
return new HystrixShutdownHook();
}
@Bean
public HasFeatures hystrixFeature() {
return HasFeatures
.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
}
/**
* {@link DisposableBean} that makes sure that Hystrix internal state is cleared when
* {@link ApplicationContext} shuts down.
*/
private class HystrixShutdownHook implements DisposableBean {
@Override
public void destroy() throws Exception {
// Just call Hystrix to reset thread pool etc.
Hystrix.reset();
}
}
}
這裡傳回了
HystrixCommandAspect
的Bean。
4.4 檢視 HystrixCommandAspect
HystrixCommandAspect
/**
* AspectJ aspect to process methods which annotated with {@link HystrixCommand} annotation.
*/
@Aspect
public class HystrixCommandAspect {
private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;
static {
META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
.put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
.put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
.build();
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
...
}
這個切面裡定義了兩個切點,是以,這個Aspect就是利用AOP切面對 HystrixCommand 、 HystrixCollapser 兩種注解的方法進行擴充處理。
我們在方法上添加@HystrixCommand注解,就會經過這個切面,這個切面中定義了@Around(…)攔截所有請求。
檢視方法
methodsAnnotatedWithHystrixCommand
, 這個方法中,一開始先擷取攔截的Method,然後判斷,如果方法上同時加了@HystrixCommand和@HystrixCollapser兩個注解的話,就抛異常。
在建立MetaHolder的時候,調用了MetaHolderFactory的create方法,MetaHolderFactory有兩個子類:CollapserMetaHolderFactory和CommandMetaHolderFactory,最終執行的是子類的create方法,下面主要看CommandMetaHolderFactory中的create方法:
private static class CollapserMetaHolderFactory extends MetaHolderFactory {
@Override
public MetaHolder create(Object proxy, Method collapserMethod, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
....
}
}
private static class CommandMetaHolderFactory extends MetaHolderFactory {
@Override
public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
if (isCompileWeaving()) {
builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
}
return builder.defaultCommandKey(method.getName())
.hystrixCommand(hystrixCommand)
.observableExecutionMode(hystrixCommand.observableExecutionMode())
.executionType(executionType)
.observable(ExecutionType.OBSERVABLE == executionType)
.build();
}
}
metaHolderBuilder 的方法:
MetaHolder.Builder metaHolderBuilder(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
MetaHolder.Builder builder = MetaHolder.builder()
.args(args).method(method).obj(obj).proxyObj(proxy)
.joinPoint(joinPoint);
setFallbackMethod(builder, obj.getClass(), method);
builder = setDefaultProperties(builder, obj.getClass(), joinPoint);
return builder;
}
setFallbackMethod 方法:
private static MetaHolder.Builder setFallbackMethod(MetaHolder.Builder builder, Class<?> declaringClass, Method commandMethod) {
FallbackMethod fallbackMethod = MethodProvider.getInstance().getFallbackMethod(declaringClass, commandMethod);
if (fallbackMethod.isPresent()) {
fallbackMethod.validateReturnType(commandMethod);
builder
.fallbackMethod(fallbackMethod.getMethod())
.fallbackExecutionType(ExecutionType.getExecutionType(fallbackMethod.getMethod().getReturnType()));
}
return builder;
}
在建立MetaHolder的過程中,就會指定fallback方法。
建立完MetaHolder之後,就會根據MetaHolder建立HystrixInvokable:
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
檢視 HystrixCommandFactory:
public class HystrixCommandFactory {
private static final HystrixCommandFactory INSTANCE = new HystrixCommandFactory();
private HystrixCommandFactory() {
}
public static HystrixCommandFactory getInstance() {
return INSTANCE;
}
public HystrixInvokable create(MetaHolder metaHolder) {
HystrixInvokable executable;
if (metaHolder.isCollapserAnnotationPresent()) {
executable = new CommandCollapser(metaHolder);
} else if (metaHolder.isObservable()) {
executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
} else {
executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
}
return executable;
}
...
}
這段代碼裡定義了後續真正執行HystrixCommand的GenericCommand執行個體,方法最終會去執行CommandExecutor.execute方法:
/**
* Invokes necessary method of {@link HystrixExecutable} or {@link HystrixObservable} for specified execution type:
* <p/>
* {@link ExecutionType#SYNCHRONOUS} -> {@link com.netflix.hystrix.HystrixExecutable#execute()}
* <p/>
* {@link ExecutionType#ASYNCHRONOUS} -> {@link com.netflix.hystrix.HystrixExecutable#queue()}
* <p/>
* {@link ExecutionType#OBSERVABLE} -> depends on specify observable execution mode:
* {@link ObservableExecutionMode#EAGER} - {@link HystrixObservable#observe()},
* {@link ObservableExecutionMode#LAZY} - {@link HystrixObservable#toObservable()}.
*/
public class CommandExecutor {
/**
* Calls a method of {@link HystrixExecutable} in accordance with specified execution type.
*
* @param invokable {@link HystrixInvokable}
* @param metaHolder {@link MetaHolder}
* @return the result of invocation of specific method.
* @throws RuntimeException
*/
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);
switch (executionType) {
case SYNCHRONOUS: {
return castToExecutable(invokable, executionType).execute();
}
case ASYNCHRONOUS: {
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
case OBSERVABLE: {
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
....
}
這裡會分成同步和異步的場景,進入execute方法看下:
/**
* Used for synchronous execution of command.
*
* @return R
* Result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
* @throws HystrixRuntimeException
* if a failure occurs and a fallback cannot be retrieved
* @throws HystrixBadRequestException
* if invalid arguments or state were used representing a user failure, not a system failure
* @throws IllegalStateException
* if invoked more than once
*/
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
這個方法的注釋中說明了傳回值,可以傳回請求的結果,當失敗的時候,則會通過getFallback()方法來執行一個回退操作,由于是GenericCommand執行個體,那就看下這個執行個體中的getFallback()方法:
/**
* The fallback is performed whenever a command execution fails.
* Also a fallback method will be invoked within separate command in the case if fallback method was annotated with
* HystrixCommand annotation, otherwise current implementation throws RuntimeException and leaves the caller to deal with it
* (see {@link super#getFallback()}).
* The getFallback() is always processed synchronously.
* Since getFallback() can throw only runtime exceptions thus any exceptions are thrown within getFallback() method
* are wrapped in {@link FallbackInvocationException}.
* A caller gets {@link com.netflix.hystrix.exception.HystrixRuntimeException}
* and should call getCause to get original exception that was thrown in getFallback().
*
* @return result of invocation of fallback method or RuntimeException
*/
@Override
protected Object getFallback() {
final CommandAction commandAction = getFallbackAction();
if (commandAction != null) {
try {
return process(new Action() {
@Override
Object execute() {
MetaHolder metaHolder = commandAction.getMetaHolder();
Object[] args = createArgsForFallback(metaHolder, getExecutionException());
return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
}
});
} catch (Throwable e) {
LOGGER.error(FallbackErrorMessageBuilder.create()
.append(commandAction, e).build());
throw new FallbackInvocationException(unwrapCause(e));
}
} else {
return super.getFallback();
}
}
看到這裡,大體的一個流程也就知道了,就是通過HystrixCommandAspect,請求成功傳回接口的結果,請求失敗執行fallback的邏輯。