一.为什么要引入重试机制
在分布式系统中,日常开发维护中,我们经常涉及到调用外部接口或者通过RPC去访问其他的业务系统。在这个过程中经常会碰到这样的问题:被调用的第三方或者外部接口的稳定性存在问题,经常会出现请求不通或者超时等现象,造成这种现象的原因可能是由于网络波动或者是系统升级维护导致短暂的不可用。这些可能会导致我们自身的系统处于一种不稳定的状态,不仅会在测试跟产品那经常被抱怨系统地实现问题,还会影响我们自身业务的正常进行。
诸如此类的场景:发送消息失败、调用远程服务失败、争抢锁失败等;
重试机制可以保护系统减少因网络波动、依赖服务短暂性不可用所带来的影响,让系统能更稳定运行的一种保护机制。所以在有些必须的业务中重试机制实用又有效。
二.重试机制需要具备的特点
- 无侵入:尽量不改动当前的业务逻辑,对代码最好做到无侵入,在需要进行重试的地方,可以简单地实现
- 可配置:重试次数、重试间隔时间、是否为异步重试等,可以根据业务的不同进行配置
- 通用性:可以无改动的或者很少改动的支持绝大部分的场景,拿来即用
三.重试机制需要考虑的问题
当然,在引入重试机制的时候我们需要进行如下几个问题的考虑:
- 重试几次比较合适?
- 每次重试的间隔设置多少合适?
- 如果所有的重试机会用完之后还是不成功应该怎么办?
1.重试几次比较合适
通常来说我们单次重试所面临的的结果是有很大的不确定性的,那么到底需要进行多少次重试才是最合理的呢?这个就需要根据具体的业务具体进行分析了,看业务的重要性以及异常报警的等级划分处理及时度。但是一般来说3次重试就基本可以满足大多数的业务需求了,当然这个也需要结合重试间隔进行一起考量。为什么说3次就基本可以说足够了呢,因为如果系统处在长时间不可用的状态下,我们重试多少次都是没有实际意义的,反而徒增系统的压力。
2.每次重试的间隔设置多少合适
如果重试间隔设置得太小,可能会造成这样的情况:被调用的系统还没来得及恢复我们就已经发起了调用,那么得到的结果肯定还是失败,这样相当于快速调用并失败了N次,没有实际意义;
如果重试间隔设置得太大,可能会造成这样的情况:牺牲掉了不少数据的时效性;
所以,重试间隔的设置要根据被调用系统的平均恢复时间来去正确的估量,通常来说这个平均恢复时间如果没有完备的大数据分析系统是很难统计到,所以一般这个值就需要根据经验(一般的经验值3-5min)来进行设置并根据实际情况去不断地修正。
3.重试机会用完之后还是失败应该怎么办
当设置的重试次数全部用完之后系统仍然返回失败,此时此处业务相当于中断,这时候就需要采用一定的补偿措施,以保证系统流程的正常进行,保证数据的准确性时效性。
- 增加错误报警机制:将重试之后依然无法成功地接口详细信息记录到报警日志并进行邮件、电话、飞书等方式的提醒,这样我们可以及时的感知到应用发生了不可自恢复的调用异常,采取人工干预的方式进行修复;
- 增加手动重试的入口:增加手动重试的开关按钮,这样在异常发生时可以方便进行手动重试;在手动进行重试时,需要考虑接口的幂等性,否则容易造成数据的混乱。
四.重试机制的解决思路
1.Spring-Retry
spring-retry是spring自身提供的一种重试机制,可以帮助我们以标准的方式处理特定操作的重试。在spring-retry中,所有配置都是基于简单注释的。
- spring-retry怎么使用具体过程不做过多赘述
/**
* value:抛出指定异常才会重试
* include:和value一样,默认为空,当exclude也为空时,默认所有异常
* exclude:指定不处理得异常
* maxAttempts:最大重试次数,默认3次
* backoff:重试等待策略,
* 默认使用@Backoff,@Backoff的value默认为1000L,我们设置为2000; 以毫秒为单位的延迟(默认 1000)
* multiplier(指定延迟倍数)默认为0,表示固定暂停1秒后进行重试,如果把multiplier设置为1.5,则第一次重试为2秒,第二次为3秒,第三次为4.5秒。
* Spring-Retry还提供了@Recover注解,用于@Retryable重试失败后处理方法。
* 如果不需要回调方法,可以直接不写回调方法,那么实现的效果是,重试次数完了后,如果还是没成功没符合业务判断,就抛出异常。
* 可以看到传参里面写的是 Exception e,这个是作为回调的接头暗号(重试次数用完了,还是失败,我们抛出这个Exception e通知触发这个回调方法)。
* 注意事项:
* 方法的返回值必须与@Retryable方法一致
* 方法的第一个参数,必须是Throwable类型的,建议是与@Retryable配置的异常一致,其他的参数,需要哪个参数,写进去就可以了(@Recover方法中有的)
* 该回调方法与重试方法写在同一个实现类里面
*
* 由于是基于AOP实现,所以不支持类里自调用方法
* 如果重试失败需要给@Recover注解的方法做后续处理,那这个重试的方法不能有返回值,只能是void
* 方法内不能使用try catch,只能往外抛异常
* @Recover注解来开启重试失败后调用的方法(注意,需跟重处理方法在同一个类中),此注解注释的方法参数一定要是@Retryable抛出的异常,否则无法识别,可以在该方法中进行日志处理。
*/
- spring-retry的缺陷
spring-retry工具虽然是基于注解的并且很优雅实现重试,但是存在几个不友好的设计:
- spring-retry重试的实体限制为Throwable子类,说明重试针对的是可捕捉的功能异常为设计前提的,但是我们希望依赖某个数据对象实体作为重试实体,但spring-retry框架必须轻质转换为Throwable子类。
- spring-retry重试的断言对象使用的是doWithRetry的Exception异常实例,不符合正常内部断言的返回设计
- spring-retry提倡以注解的方式对方法进行重试,重试的逻辑是同步的,当抛出相关的异常之后进行重试逻辑。如果你要以返回值的某个状态来判定是否需要进行重试,可能只能通过自己判断返回值然后显示抛出异常来实现了。
- spring-retry重试机制中:只读操作可以进行重试,幂等写操作可以进行重试,但是非幂等写操作不能进行重试,这种情况下的重试可能会导致写入脏数据,或者产生重复数据
- spring-retry中@Recover注解在使用时无法指定方法,如果说一个类中有多个重试方法,处理起来就会很麻烦,可能需要特定的标识符来进行区分
- spring-retry注意事项
- 使用了@Retryable的方法不能在本类被调用,不然重试机制不会生效。也就是说要标记为@Service,然后在其他类中使用@Autowired注入或者@Bean去实例才能生效
- 要触发@Recover方法,那么在@Retryable方法上不能有返回值,只能是void才能生效
- 使用了@Retryable的方法里面不能使用try...catch包裹,要在方法上抛出异常,不然不会触发
- 在重试期间这个方法是同步的,如果使用类似Spring Cloud这种框架的熔断机制时,可以结合重试机制来重试后返回结果
- spring-retry不只能注入方式去实现,还可以通过API的方式实现,类似熔断处理的机制就基于API方式实现会比较宽松
2.Guava-Retrying
- 关于guava-retrying
- guava-retrying是Google Guava库的一个扩展包,包含了多种重试策略,而且扩展起来非常的容易。
- 使用guava-retrying你可以自定义来执行重试,同时也可以监控每次重试的结果和行为。
- guava-retry工具与spring retry类似,都是通过定义重试者角色来包装正常逻辑重试,但是Guava retry有更优的策略定义,在支持重试次数和重试频度控制基础上,能够兼容支持多个异常或者自定义实体对象的重试源定义,让重试功能有更多的灵活性。相比于spring-retry:
--可以设置任何任务单次执行的时间限制,如果超时则抛出异常;
--可以设置重试监听器,用来执行额外的处理工作
--可以设置任务阻塞策略,即可设置当前重试完成,下次重试开始前的这段时间做什么事情
--可以通过停止重试策略和等待策略结合使用来设置更加灵活的策略,比如指数等待时长并最多10次调用,随机等待时长并且永不停止等待
- Guava Retryer也是线程安全的,入口调用逻辑采用的是 java.util.concurrent.Callable 的 call() 方法
- 关键点介绍
- RetryerBuilder是一个factory创建者,可以定制设置重试源且可以支持多个重试源,可以配置重试次数或重试超时时间,以及可以配置等待时间间隔,创建重试者Retryer实例。
- RetryerBuilder的重试源支持Exception异常对象 和自定义断言对象,通过retryIfException 和retryIfResult设置,同时支持多个且能兼容。
- retryIfException,抛出runtime异常、checked异常时都会重试,但是抛出error不会重试。
- retryIfRuntimeException只会在抛runtime异常的时候才重试,checked异常和error都不重试。
- retryIfExceptionOfType允许我们只在发生特定异常的时候才重试,比如NullPointerException和IllegalStateException`都属于runtime异常,也包括自定义的error
3.Spring-Aop
自己造轮子:使用AOP来为目标设置切面,即可在目标调用的前后添加一些额外的逻辑。
- 首先创建一个注解,注解里一般有2个参数:retryCount:最大重试次数;retryInterval:重试间隔;当然也可以设置更多的参数,比如指定重试的异常、重试延迟倍数等。
- 其实spring-retry、guava-retrying也是基于切面的方式来实现的
- 在需要进行重试的方法上加上注解
- 编写AOP切面
4.利用MQ进行消息重试
以kafka为例:
- kafka消费者采用手动异步ack确认机制,如果消费失败,消息会重新进行消费,可以设置重试的次数,这样就可以对要消费的消息进行简单的重试。
- 流行的解决方案:设置重试主题(retry-topics)
基本的过程如下:
- 消费者尝试消费业务主题中(business-topic)的一条消息
- 如果未能正确的消费该消息,则消费者将消息发布到第一个重试主题中(retry-topic-1),然后提交消息的偏移量,以便继续处理下一条消息。
- 订阅重试主题(retry-topic-1)的消费者,他的处理逻辑与业务主题的消费者逻辑相同。该消费者在进行消费尝试的时候引入了短暂的延迟。如果这个消费者也无法消费该消息,则会将消息发布到另一个重试主题中(retry-topic-2),并提交该消息的偏移量。
- 这一过程不断的进行,增加了一些重试主题(retry-topic-n)和重试消费者,同时每个重试的延迟会越来越长(用作退避策略)。最终在消费者无法处理某条消息后,会将该消息发布到一个死信队列(dead letter queue,DLQ)中,运维团队将在该队列中消息进行手动分类处理。
重试主题(retry-topics)带来的问题以及思考
- 重试主题看起来似乎是非常的合理,但他存在一些弊端,在实际场景中有些场景不大适用,需要进行进一步的改造。
- 首先他忽略了不同类型的错误,我们暂且可分为可自动恢复的错误、不可自动恢复的错误两种类型。可恢复性的错误也就是说不管重试了多少次,最终这些错误是可以得到解决的。举个例子来说:一个入库的操作,如果说消息A是因为数据库重启或者宕机而造成消费失败,那么随之而来的消息B、消息C...都将失败。那么这种可恢复性的错误直接交给消费者自身来进行重试,直至问题解决,不停的加入到重试主题中,反而会增加不必要的开销。
- 不可恢复的错误指的是无论我们重试了多少次都将失败的错误。这时候加入到重试主题中可以帮助受到困扰的消息消费者继续前进。而错误的消息会在重试主题中进行重试直至进入死信队列,或者此处已经确定了是不可恢复的错误,直接将该消息发送到死信队列。、
- 另外使用这种方案的时候还需要考虑有些消息需要顺序消费的问题如何解决,或者是否能接受一些数据不一致性。
5.其他方案
- 手动重试:在catch代码块中进行重试逻辑代码。需要对代码进行大量的侵入式修改,不优雅,可维护性差,且大部分时候还是失败,意义不大,不建议。
- 代理模式:不修改业务逻辑代码,直接在业务代码外边再包一层重试逻辑。这样重试逻辑全部由代理类来完成,以后想修改重试逻辑可以直接修改代理类,分工明确。但代理模式显然过于麻烦,而且重试的逻辑一般都是大同小异,无非就是重试的次数和间隔时间不一样而已,如果每个需要进行重试的业务类都包装一层代理类,显然代码冗余量太大,不够优雅。
- JDK动态代理:上面说代理类的方式代码冗余量太大,不够优雅,那么自然而然就想到了使用JDK动态代理来实现。动态代理将重试的逻辑放在一块,显然比代理类的方式更加方便优雅。
但是:如果被代理的类没有其他的依赖类,直接创建不成问题;如果被代理的类依赖了其他被spring容器 管理的类,则这种方式就会抛出异常,因为没有把被代理的实例注入到创建的代理实例中。这种情况 下,就比较复杂了,需要从Spring容器中获取已经装配好的,需要被代理的实例,然后为其创建代 理类实例,并交给Spring容器来管理,这样就不用每次都重新创建新的代理类实例了。
同时还要考虑容器中的bean类型是Singleton还是Prototype,如果是Singleton则像上面这样进行 操作,如果是Prototype则每次都新建代理类对象。
另外,这里使用的是JDK动态代理,因此就存在一个天然的缺陷,如果想要被代理的类,没有实现 任何接口,那么就无法为其创建代理对象,这种方式就行不通了。
- CGLib动态代理:解决了JDK动态代理带来的缺陷,但是仍然存在问题:对原来的逻辑进行了侵入式的修改,在每个·被代理实例被调用的地方都需要进行调整。
- 总的来说,这几种方式都不是比较完善的方式,不建议使用。已经有了比较好的轮子,也不建议自己再造轮子,自己造的轮子也不见得完善。
6.一个消息重试的解决方案
- 案例背景
服务端Svr、数据采集中间件Svr、车端Svr进行指令的交互
- 服务端svr发送指令消息至kafka(business-topic)
- 数据采集中间件svr消费kafka消息(business-topic),然后发送给车端硬件svr
- 车端svr消费后会将结果是成功失败返回给数据采集中间件服务svr
- 数据采集中间件再将结果发送至kafka(result-business-topic)
- 服务端svr消费kafka消息(result-business-topic),得到本次指令发送的最终结果
2.重试需求
- 对于kafka消费者端的消息重试采用kafka自身的重试机制,或者是采用上文介绍的重试主题的方式
- 本需求需要讨论解决的是:在向kafka发送消息以及消费消息都成功的情况下,但是由车端返回的结果为失败时,这种情况下同样也需要进行重试,即根据返回的状态值进行业务端的重试。
3.解决方案
采用redis进行指令、指令索引、远程启动指令、指令重试次数的记录;采用naocs配置指令重试的最大次数; 采用xxl-job进行设置指令重试的间隔时间;
- 服务端下发消息时,进行缓存记录,采用redis的String数据结构:
指令缓存(String类型):key:dispatch:vehicle:retry:instruction:instructionId(指令id) value:json字符串(此次下发的指令json)
指令的重试次数(String类型):key:dispatch:vehicle:retry:count:instructionId value:0(默认为0次)
【远程启动】指令缓存(Set类型):key:dispatch:vehicle:retry:remotestart value:instructionId
注:此两处缓存在消息返回成功后或者是重试N次之后进行删除;缓存的默认次数采用nacos进行配置,重试的执行间隔时间采用xxl-job进行设置,可以是每分钟一次或者是间隔的倍数等形式。
- 消息执行返回结果为失败时,记录缓存的指令索引,采用redis的Set数据结构:
消息指令索引集合(Set类型):key:dispatch:vehicle:retry:failmsgindex value:instructionId
- 使用xxl-job进行定时任务的调用。
获取缓存指令索引的集合,判断该set集合是否为空,如果为空,则【没有需要进行重试的指令】;如果set集合不为空,则遍历集合中的每一个值。
由于是分布式系统,此处需要采用分布式锁,我们在此采用redission架构来进行加锁。至此还需对具体的业务进行判断:有一个特殊的指令为【远程启动】指令,由于此指令在下发还未创建任务,所以无法与任务进行关联,所以如果是这个指令就直接进行重试,如果不是则需要对任务的状态进行判断,我们只对正在执行的任务进行重试,已经完成、暂停的、废弃的任务不进行指令的重试
- 根据需要进行重试的指令获取已经进行重试的次数,第一次肯定是0次
- 判断重试的次数是否大于nacos中设置的重试次数N(比如n=3)
- 如果已经大于等于3(实际最终只可能等于设置的重试次数),则不再进行重试,同时删除在redis中做的那几个相关的缓存
- 如果小于3:首先根据指令索引获取到需要进行重试的消息,将该消息发送至kafka,同时重试次数缓存+1,并将此处记录写入到操作日志中【正在进行第N次重试,状态为执行中】。
// 核心代码:
@XxlJob("doRetryHandle")
public void doRetryHandle() {
log.info("...[消息重试]正在进行消息重试...");
// 获取执行结果为失败的消息索引
Set<String> msgSet = commonHandle.getRedisFailMsgIndex();
log.info("...[消息重试]需要进行消息重试的条数为:{},消息索引为:{}", msgSet.size(), msgSet);
if (EmptyUtil.isEmpty(msgSet)) {
log.info("[没有需要进行重试的指令]:doRetryHandle:[{}]", JSON.toJSONString(msgSet));
} else {
msgSet.forEach(instructionId -> {
RLock lock = redissonClient.getLock(DispatchRedisConstants.DISPATCH_MSG_RETRY_LOCK + instructionId);
log.info("...[消息重试]获取到锁{}", DispatchRedisConstants.DISPATCH_MSG_RETRY_LOCK + instructionId);
try {
lock.lock();
//ADD:前置条件,只对执行中的任务进行重试;若下发指令时,验证任务为【作废/完成/暂停=非执行中】状态,则删除该指令重试的缓存,同时更新该指令状态为失败
// todo:问题:1.远程启动命令时,尚未进行创建任务关联 2.需要做指令跟任务关联的缓存(同时注意删除时机)
// 判断指令是否为远程启动的指令,如果是直接进行重试,如果不是则进行判断任务的状态
if (commonHandle.getRedisRetryRemoteStart(instructionId)) {
doRetryExcute(instructionId);
} else {
String dispatchId = commonHandle.getRedisInstrctionIdDispatchId(instructionId);
log.info("...[消息重试]根据指令id:{}获取到该指令所属的任务id:{}", instructionId, dispatchId);
if (EmptyUtil.isNotEmpty(dispatchId)) {
// 获取任务的当前状态
DispatchTask dispatchTask = dispatchTaskRepository.getById(dispatchId);
log.info("...[消息重试]消息为:{},任务状态为{}", JSONObject.toJSONString(dispatchTask), dispatchTask.getTaskState());
if (EmptyUtil.isNotEmpty(dispatchTask)) {
//只针对执行状态的任务进行重试
if (dispatchTask.getTaskState().equals(EnumTaskState.EXECUTING.getCode())) {
log.info("...[消息重试]正在进行消息重试,任务状态为:{}", EnumTaskState.EXECUTING.getDesc());
//执行重试
doRetryExcute(instructionId);
} else {
// 非执行状态:则删除该指令重试的缓存,同时更新该指令状态为失败
log.info("...[消息重试]任务状态为:{},不是执行中的任务不进行重试", dispatchTask.getTaskState());
commonHandle.doRetryFailExcute(instructionId, OperateExecuteResultEnum.FAIL.getCode());
}
} else {
log.info("...[消息重试]查询不到任务,不进行重试", dispatchTask.getTaskState());
// 查询不到任务
commonHandle.doRetryFailExcute(instructionId, OperateExecuteResultEnum.FAIL.getCode());
}
}
}
} catch (Exception e) {
log.error("doRetryHandle消息重试异常:{}", e.getMessage());
e.printStackTrace();
} finally {
lock.unlock();
}
});
}
}
/**
* 执行重试逻辑
*/
private void doRetryExcute(String instructionId) {
// 根据指令索引获取指令已经进行的重试次数
String stringCount = commonHandle.getRedisRetryCount(instructionId);
if (EmptyUtil.isNotEmpty(stringCount)) {
Integer retryCount = Integer.valueOf(stringCount);
if (retryCount >= RETRY_COUNT) {
log.info("...[消息重试]doRetryExcute:重试次数为{},不再进行重试", retryCount);
// 删除缓存:指令索引缓存、指令缓存、指令重试次数缓存
commonHandle.deleteRetryCaches(instructionId);
return;
} else {
// 根据指令索引获取需要进行重试的消息
String messageJson = commonHandle.getRedisInstrction(instructionId);
if (EmptyUtil.isNotEmpty(messageJson)) {
log.info("...[消息重试]doRetryExcute:需要进行重试的消息为{}", messageJson);
// 发送重试消息
kafkaSendMessage.retrySendMessage(messageJson);
// 重试次数+1
commonHandle.addRedisRetryCount(instructionId, String.valueOf(retryCount + 1));
// 写日志文件(暂时:正在进行第N次重试,**秒后进行第N+1次重试)
IntructionLogUpdateResultRequestDTO intructionLogUpdateResultRequestDTO = new IntructionLogUpdateResultRequestDTO()
.setOperateId(Long.parseLong(instructionId))
.setExecuteResult(OperateExecuteResultEnum.RERTY.getCode())
.setLogContext("进行第" + (retryCount + 1) + "次重试");
dispatchTaskOperateLogRepository.ModifyResultByInstructionId(intructionLogUpdateResultRequestDTO);
}
}
}
}
总结:没有更好的方案,只有最合适的方案,一定要根据自己的业务进行选择并不断的进行优化。
欢迎关注公众号: