天天看点

spring Retryable注解实现原理RetryConfiguration切面切入点切面方法拦截器重试策略回退策略总结

上次聊了retryable的使用方法后没有继续研究其原理,所以使用起来还是有些心虚的,比如:重试如何实现的?有没有使用线程池?带着问题学习源码吧

启用重试功能的EnableRetry注解导入了重试所用的配置:RetryConfiguration,并且启用了AspectJ AOP功能EnableAspectJAutoProxy,既然启用了AOP那势必使用咯,继续看配置源码

RetryConfiguration

类树结构

继承:AbstractPointcutAdvisor,即当前配置是一个切点通知bean,提供Pointcut切入点,以及Advice通知

实现

  1. IntroductionAdvisor,实际类过滤器提供者使用的Pointcut切点的类过滤器
  2. BeanFactoryAware,获取bean工厂

初始化方法:init

@PostConstruct
public void init() {
	Set<Class<? extends Annotation>> retryableAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(1);
	retryableAnnotationTypes.add(Retryable.class);
	this.pointcut = buildPointcut(retryableAnnotationTypes);
	this.advice = buildAdvice();
	if (this.advice instanceof BeanFactoryAware) {
		((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
	}
}
           
  1. buildPointcut 构建切点:切点类型为使用Retryable注解的类或方法(org.springframework.retry.annotation.RetryConfiguration.AnnotationClassOrMethodPointcut)
  2. buildAdvice 构建通知:创建通知org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor

配置类初始化时间点:ConfigurationClassPostProcessor该后置处理器处理Configuration注解配置类,该后置处理器实现了BeanDefinitionRegistryPostProcessor接口,即在注册中心构建后便开始加载配置

AOP配置时间点:aop动态代理构建者-AnnotationAwareAspectJAutoProxyCreator实现了SmartInstantiationAwareBeanPostProcessor后置处理器,在bean实例化前postProcessBeforeInstantiation查找bean是否被代理,如果是则返回代理对应的通知者,根据通知代理目标bean,动态增加切面动作。

切面切入点

类过滤器

AnnotationClassOrMethodFilter:使用注解方法解析器AnnotationMethodsResolver判断目标类是否存在对应的注解hasAnnotatedMethods,是则通过,进行切点切入

方法匹配器

满足下面条件之一即可

  1. 类过滤器匹配通过:org.springframework.aop.ClassFilter#matches
  2. 方法解析器匹配通过:org.springframework.aop.MethodMatcher#matches(java.lang.reflect.Method, java.lang.Class<?>)

切面方法拦截器

重试拦截器:AnnotationAwareRetryOperationsInterceptor

invoke拦截目标方法

  1. 如果存在方法拦截器委派者,调用委派者invoke方法
  2. 否则直接调用目标方法,即:不具备可重试功能
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
	MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod());
	if (delegate != null) {
		return delegate.invoke(invocation);
	}
	else {
		return invocation.proceed();
	}
}
           

获取拦截器委派者

  1. 委派者缓存不存在当前方法则创建并缓存
  2. 否则直接返回方法对应的委派者缓存对象
private MethodInterceptor getDelegate(Object target, Method method) {
    // double check 单例模式
	if (!this.delegates.containsKey(method)) {
		synchronized (this.delegates) {
			if (!this.delegates.containsKey(method)) {
                // 1. 获取目标方法上的Retryable注解
				Retryable retryable = AnnotationUtils.findAnnotation(method, Retryable.class);
				if (retryable == null) {
                    // 2. 目标方法不存在Retryable注解,降级查找目标方法声明类上的Retryable注解
					retryable = AnnotationUtils.findAnnotation(method.getDeclaringClass(), Retryable.class);
				}
				if (retryable == null) {
                    // 3. 方法、声明类均没有Retryable注解,降级从target目标实例中获取同方法名、
                    // 同方法入参类型的方法上的Retryable注解
					retryable = findAnnotationOnTarget(target, method);
				}
                // 4.依然不存在则为方法缓存委派者为null,即降级直接调用目标方法,不具备可重试功能。
				if (retryable == null) {
					return this.delegates.put(method, null);
				}
				MethodInterceptor delegate;
				if (StringUtils.hasText(retryable.interceptor())) {
                    // 5. 如果Retryable注解指定了方法拦截器,使用自定义方法拦截器拦截目标方法
					delegate = this.beanFactory.getBean(retryable.interceptor(), MethodInterceptor.class);
				}
                // 6. 如果可重试是有状态的,获取有状态的拦截器委派者
				else if (retryable.stateful()) {
					delegate = getStatefulInterceptor(target, method, retryable);
				}
				else {
					delegate = getStatelessInterceptor(target, method, retryable);
				}
				this.delegates.put(method, delegate);
			}
		}
	}
	return this.delegates.get(method);
}
           

获取有状态的拦截器委派者

org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor#getStatefulInterceptor

private MethodInterceptor getStatefulInterceptor(Object target, Method method, Retryable retryable) {
    // 1. 创建重试模板类
	RetryTemplate template = createTemplate();
	template.setRetryContextCache(this.retryContextCache);
	// 2. 获取方法的环路打断器注解
	CircuitBreaker circuit = AnnotationUtils.findAnnotation(method, CircuitBreaker.class);
	if (circuit!=null) {
        // 3. 根据环路打断器注解获取重试策略:a. ExpressionRetryPolicy。b. SimpleRetryPolicy
		RetryPolicy policy = getRetryPolicy(circuit);
        // 4. 环路打断器重试策略包装原注解配置的重试策略
		CircuitBreakerRetryPolicy breaker = new CircuitBreakerRetryPolicy(policy);
		breaker.setOpenTimeout(circuit.openTimeout());
		breaker.setResetTimeout(circuit.resetTimeout());
		template.setRetryPolicy(breaker);
        // 无backoff策略,即立马重试
		template.setBackOffPolicy(new NoBackOffPolicy());
		String label = circuit.label();
		if (!StringUtils.hasText(label))  {
			label = method.toGenericString();
		}
        // 5. 根据配置构建并返回重试拦截器,底层实际使用的是StatefulRetryOperationsInterceptor
        // 拦截器,区别主要是重试策略在原基础上增加了CircuitBreakerRetryPolicy包装,及无backoff策略
		return RetryInterceptorBuilder.circuitBreaker()
				.keyGenerator(new FixedKeyGenerator("circuit"))
				.retryOperations(template)
				.recoverer(getRecoverer(target, method))
				.label(label)
				.build();
	}
    // 6. 如果不存在环路打断器则走原注解配置的重试策略:表达式重试策略,简单重试策略
	RetryPolicy policy = getRetryPolicy(retryable);
	template.setRetryPolicy(policy);
    // 7. 根据注解配置设置backoff策略
	template.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));
	String label = retryable.label();
	return RetryInterceptorBuilder.stateful()
			.keyGenerator(this.methodArgumentsKeyGenerator)
			.newMethodArgumentsIdentifier(this.newMethodArgumentsIdentifier)
			.retryOperations(template)
			.label(label)
			.recoverer(getRecoverer(target, method))
			.build();
}
           

StatefulRetryOperationsInterceptor拦截器

public Object invoke(final MethodInvocation invocation) throws Throwable {
	...
	Object key = createKey(invocation, defaultKey);
	RetryState retryState = new DefaultRetryState(key,
			this.newMethodArgumentsIdentifier != null
					&& this.newMethodArgumentsIdentifier.isNew(args),
			this.rollbackClassifier);
    // 执行重试的操作:RetryTemplate模板
	Object result = this.retryOperations
			.execute(new MethodInvocationRetryCallback(invocation, label),
					this.recoverer != null
							? new ItemRecovererCallback(args, this.recoverer) : null,
					retryState);
	...
	return result;
}
           

重试模板类

RetryTemplate重试模板执行。类如其名,模板方法类

  1. open打开重试策略,状态,拦截器,start启动backoff回退策略
  2. 是否可以重试,是则重试
  3. 根据回退策略执行回退backoff
  4. 是否需要重新抛出异常,是则重新抛出
  5. 如果state状态不为空则中断重试并执行:handleRetryExhausted
  6. 关闭重试策略,状态,拦截器
public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback)
		throws E {
    // 执行:执行动作
	return doExecute(retryCallback, null, null);
}
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
		RecoveryCallback<T> recoveryCallback, RetryState state)
		throws E, ExhaustedRetryException {
...
		while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
			try {
				if (this.logger.isDebugEnabled()) {
					this.logger.debug("Retry: count=" + context.getRetryCount());
				}
				// Reset the last exception, so if we are successful
				// the close interceptors will not think we failed...
				lastException = null;
				return retryCallback.doWithRetry(context);
			}
			catch (Throwable e) {
				lastException = e;
				try {
					registerThrowable(retryPolicy, state, context, e);
				}
				catch (Exception ex) {
					throw new TerminatedRetryException("Could not register throwable",
							ex);
				}
				finally {
					doOnErrorInterceptors(retryCallback, context, e);
				}
				if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
					try {
						backOffPolicy.backOff(backOffContext);
					}
					catch (BackOffInterruptedException ex) {
						lastException = e;
						// back off was prevented by another thread - fail the retry
						if (this.logger.isDebugEnabled()) {
							this.logger
									.debug("Abort retry because interrupted: count="
											+ context.getRetryCount());
						}
						throw ex;
					}
				}
...
				if (shouldRethrow(retryPolicy, context, state)) {
					if (this.logger.isDebugEnabled()) {
						this.logger.debug("Rethrow in retry for policy: count="
								+ context.getRetryCount());
					}
					throw RetryTemplate.<E>wrapIfNecessary(e);
				}

			}
			/*
			 * A stateful attempt that can retry may rethrow the exception before now,
			 * but if we get this far in a stateful retry there's a reason for it,
			 * like a circuit breaker or a rollback classifier.
			 */
			if (state != null && context.hasAttribute(GLOBAL_STATE)) {
				break;
			}
		}
...
		exhausted = true;
		return handleRetryExhausted(recoveryCallback, context, state);
	}
	catch (Throwable e) {
		throw RetryTemplate.<E>wrapIfNecessary(e);
	}
	finally {
		close(retryPolicy, context, state, lastException == null || exhausted);
		doCloseInterceptors(retryCallback, context, lastException);
		RetrySynchronizationManager.clear();
	}
}
           

获取无状态的拦截器委派者

getStatelessInterceptor,与“获取有状态的拦截器委派者”雷同

重试策略

  1. 表达式重试策略
  2. 简单重试策略
  3. 环路打断器重试策略

表达式重试策略

ExpressionRetryPolicy继承:简单重试策略,可重试判断同简单重试策略相同的同时增加表达式判断

public boolean canRetry(RetryContext context) {
	Throwable lastThrowable = context.getLastThrowable();
	if (lastThrowable == null) {
		return super.canRetry(context);
	}
	else {
		return super.canRetry(context)
				&& this.expression.getValue(this.evaluationContext, lastThrowable, Boolean.class);
	}
}
           

简单重试策略

可重试判断条件满足下面所有条件

  1. 判断上次抛出的异常是否注解定义的可重试异常
  2. 并且当前重试次数小于配置的最大尝试值org.springframework.retry.policy.SimpleRetryPolicy#maxAttempts
public boolean canRetry(RetryContext context) {
	Throwable t = context.getLastThrowable();
	return (t == null || retryForException(t)) && context.getRetryCount() < maxAttempts;
}
           

环路打断器重试策略

回退策略

  1. 指数型回退策略
  2. 指数型随机回退策略
  3. 均匀随机回退策略
  4. 固定性回退策略
  5. 无回退策略

指数型回退策略

ExponentialBackOffPolicy,按照multiplier值指数型增长两次重试之间的休眠时间,默认的休眠器:ThreadWaitSleeper,直接调用的Thread.sleep。算法如下:

public synchronized long getSleepAndIncrement() {
	long sleep = this.interval;
	if (sleep > maxInterval) {
		sleep = maxInterval;
	}
	else {
		this.interval = getNextInterval();
	}
	return sleep;
}
protected long getNextInterval() {
	return (long) (this.interval * this.multiplier);
}
           

指数型随机回退策略

ExponentialRandomBackOffPolicy,继承ExponentialBackOffPolicy,同ExponentialBackOffPolicy雷同,区别在于休眠时间算法更加随机,算法如下:

public synchronized long getSleepAndIncrement() {
	long next = super.getSleepAndIncrement();
	next = (long) (next * (1 + r.nextFloat() * (getMultiplier() - 1)));
	return next;
}
           

均匀随机回退策略

UniformRandomBackOffPolicy,继承StatelessBackOffPolicy,两次重试之间间隔时间为maxDelay-delay之间的随机值,默认休眠器:ThreadWaitSleeper,算法如下:

protected void doBackOff() throws BackOffInterruptedException {
	try {
        // minBackOffPeriod:maxDelay属性
        // minBackOffPeriod:delay属性
		long delta = maxBackOffPeriod==minBackOffPeriod ? 0 : random.nextInt((int) (maxBackOffPeriod - minBackOffPeriod));
		sleeper.sleep(minBackOffPeriod + delta );
	}
	catch (InterruptedException e) {
		throw new BackOffInterruptedException("Thread interrupted while sleeping", e);
	}
}
           

固定性回退策略

FixedBackOffPolicy,继承StatelessBackOffPolicy,两次重试之间间隔时间为delay固定值,默认休眠器:ThreadWaitSleeper,算法如下:

protected void doBackOff() throws BackOffInterruptedException {
	try {
		sleeper.sleep(backOffPeriod);
	}
	catch (InterruptedException e) {
		throw new BackOffInterruptedException("Thread interrupted while sleeping", e);
	}
}
           

总结

重试如何实现的?

答:使用spring AOP实现,底层封装为一个while循环并catch住异常,然后根据配置判断是否需要继续重试

有没有使用线程池?

答:没有使用线程池,同步调用,默认的重试间隔是会阻塞当前线程的,官方也给出了说明,即使阻塞线程,为重试增加间隔也仍然是一个很好的选择