背景
基礎技術組的接口重推元件基于smart-retry源碼進行了改造
smart-retry資訊
倉庫位址
https://gitee.com/hack3389/smart-retry/閱讀分支
分支:master
commit
主要功能
Smart Retry主要是用來進行方法重試的。和Guava Retry、Spring Retry相比,Smart Retry最大的特點是異步重試,支援持久化,系統重新開機之後可以繼續重試。
功能特點
- 方法重試持久化,系統重新開機之後可以繼續重試
- 異步重試(不支援同步重試)
- 支援接口實作和聲明式方式
架構圖

如何使用
引入依賴
com.github.hadoop002.smartretry
retry-spring4
使用最新版本
初始化表
create table sys_retry_task (
task_id bigint not null primary key auto_increment,
identity_name varchar(50) not null COMMENT '任務的唯一辨別',
params text COMMENT '參數',
status tinyint not null COMMENT '狀态。1: 進行中,2: 成功,3: 失敗',
retry_count int not null default 0 COMMENT '重試次數',
remark varchar(1000) COMMENT '備注',
create_date datetime not null,
edit_date datetime) ENGINE=InnoDB COMMENT='系統重試表';
create index idx_identityname_status ON sys_retry_task(identity_name asc,status asc);
編寫業務邏輯
@RetryFunction(identity = "order.payment")
public void payOrderAndUpdateStatus(Order order) {
boolean success = paymentBusiness.doPayment(order);
if (success) {
orderBusiness.updateOrderPayStatus(order);
} else {
orderBusiness.updateOrderPayFail(order);
}
}
或者
@Slf4j
@Service("orderPaymentBusiness")
public class OrderPaymentBusiness implements RetryHandler {
@Autowired
private PaymentBusiness paymentBusiness;
private OrderBusiness orderBusiness;
@Override
public String identity() {
return "order.payment";
public Void handle(Order order) {
boolean success = paymentBusiness.doPayment(order);
if (success) {
orderBusiness.updateOrderPayStatus(order);
} else {
orderBusiness.updateOrderPayFail(order);
}
return null;
打開開關
在啟動入口上加上@EnableRetrying 注解
源碼閱讀
源碼結構
- retry-cpre:重試子產品的核心,定義了一系列的接口和擴充點
- retry-spring4:基于spring4實作的重試子產品
- retry-serializer-jackson2:使用jackson2來實作參數的序列化和反序列化
- retry-serializer-gson:使用gson來實作參數的序列化和反序列化
- retry-serializer-fastjson:使用fastjson來實作參數的序列化和反序列化
- retry-samples:配套的示例demo,可直接使用
大緻流程
- 系統啟動後,把所有com.github.smartretry.core.RetryHandler和帶有@RetryFunction注解的方法注冊為定時任務。
- 所有com.github.smartretry.core.RetryHandler和帶有@RetryFunction注解的方法都會被Spring進行代理,執行的時候,會先把參數序列化,然後把執行任務插入到資料庫。最後根據任務執行的成功與否,更新任務的相應狀态。
- 定時任務定時從表裡面擷取未成功的任務,進行重試
根據流程走讀代碼
根據整個流程走讀代碼應該會對代碼有更清晰的認識
系統啟動
系統啟動的核心處理邏輯主要是在類RetryAnnotationBeanPostProcessor中,下面通過流程仔細分析該類
- 掃描所有帶有@RetryFunction注解和實作RetryHandler接口的類
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof AopInfrastructureBean) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.postedClasseCache.contains(targetClass)) {
Object targetObject = AopProxyUtils.getSingletonTarget(bean);
if (RetryHandler.class.isAssignableFrom(targetClass)) {
RetryHandlerUtils.validateRetryHandler(targetClass);
log.info("發現RetryHandler的執行個體:{},準備注冊", targetClass);
retryHandlers.add((RetryHandler) targetObject);
return bean;
}
ReflectionUtils.MethodFilter methodFilter = method -> method.getAnnotation(RetryFunction.class) != null;
Set methods = MethodIntrospector.selectMethods(targetClass, methodFilter);
methods.forEach(method -> processRetryFunction(targetObject, method));
postedClasseCache.add(targetClass);
return bean;
}
改類實作了BeanPostProcessor接口,重寫了postProcessAfterInitialization方法(每個bean初始化之後執行)
主要為兩處判斷/過濾
1.判斷是否實作了RetryHandler
if (RetryHandler.class.isAssignableFrom(targetClass)) {
2.過濾打了@RetryFunction的方法
ReflectionUtils.MethodFilter methodFilter = method -> method.getAnnotation(RetryFunction.class) != null;
Set methods = MethodIntrospector.selectMethods(targetClass, methodFilter);
- 把打了@RetryFunction注解的都轉化為RetryHandler,即最終都是走的RetryHandler
protected void processRetryFunction(Object bean, Method method) {
log.info("發現@RetryFunction的執行個體:{},準備注冊", method.toString());
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
RetryHandlerUtils.validateRetryFunction(method);
RetryFunction retryFunction = method.getAnnotation(RetryFunction.class);
Supplier retryListenerSupplier = () -> {
RetryListener retryListener = null;
String retryListenerName = retryFunction.retryListener();
if (StringUtils.isNotBlank(retryListenerName)) {
retryListener = defaultListableBeanFactory.getBean(retryListenerName, RetryListener.class);
return retryListener;
};
retryHandlers.add(new MethodRetryHandler(bean, invocableMethod, retryFunction, retryListenerSupplier));
- 把所有的retryHandlers周遊注冊為定時任務,預設用的quartz
public void afterSingletonsInstantiated() {
postedClasseCache.clear();
this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);
this.retryRegistry = defaultListableBeanFactory.getBean(RetryRegistry.class);
boolean beforeTask = environment.getProperty(EnvironmentConstants.RETRY_BEFORETASK, Boolean.class, Boolean.TRUE);
this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);
if (this.retrySerializer == null) {
this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(retryTaskMapper, beforeTask);
} else {
this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, beforeTask);
retryHandlers.forEach(this::registerJobBean);
retryHandlers.clear();
protected void registerJobBean(RetryHandler retryHandler) {
if (retryHandler.identity().length() > 50) {
throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");
RetryHandler retryHandlerProxy = retryHandlerPostProcessor.doPost(retryHandler);
RetryHandlerRegistration.registry(retryHandlerProxy);
RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer);
retryRegistry.register(retryHandler, retryProcessor);
至此,系統啟動所做的任務就完成了
打有@RetryFunction的注解和實作RetryHandler的接口的方法都會被Spring代理
- @RetryFunction注解的方法如何被代理
public class RetryHandlerMethodPointcut implements Pointcut {
public ClassFilter getClassFilter() {
return ClassFilter.TRUE;
public MethodMatcher getMethodMatcher() {
return new StaticMethodMatcher() {
@Override
public boolean matches(Method method, Class targetClass) {
return RetryHandlerUtils.isRetryFunctionMethod(method);
}
public static boolean isRetryFunctionMethod(Method method) {
if (method.getAnnotation(RetryFunction.class) != null && method.getParameterCount() == 1) {
return !Object.class.equals(method.getParameterTypes()[0]);
return false;
實作Pointcut接口通過isRetryFunctionMethod(Method method)方法判斷是否是需要代理的方法
- 實作RetryHandler接口的方法如何被代理
public class RetryHandlerClassPointcut implements Pointcut {
return RetryHandler.class::isAssignableFrom;
return RetryHandlerUtils.isRetryHandlerMethod(targetClass, method);
public static boolean isRetryHandlerMethod(Class targetClass, Method method) {
if ("handle".equals(method.getName()) && method.getParameterCount() == 1 && method.isBridge() && method.isSynthetic()) {
//RetryHandler接口有泛型,需要特殊處理
return true;
Type interfaceType = getRetryHandlerGenericInterface(targetClass);
if (interfaceType == null) {
return false;
Class argsInputType = Object.class;
if (interfaceType instanceof ParameterizedType) {
argsInputType = (Class) ((ParameterizedType) interfaceType).getActualTypeArguments()[0];
Class parameterType = argsInputType;
return "handle".equals(method.getName()) && method.getParameterCount() == 1 && method.getParameterTypes()[0].equals(parameterType);
先對類進行過濾,要求實作RetryHandler
public ClassFilter getClassFilter() {
return RetryHandler.class::isAssignableFrom;
}
再對方法進行過濾,詳細請看isRetryHandlerMethod(Class targetClass, Method method)方法
打有@RetryFunction注解的方法被調用時
當執行到帶有@RetryFunction方法時(實作了RetryHandler也差不多的邏輯,就不再贅述了),會被方法攔截器攔截,
public class RetryHandlerMethodInterceptor implements MethodInterceptor {
public Object invoke(MethodInvocation invocation) {
RetryFunction retryFunction = invocation.getMethod().getAnnotation(RetryFunction.class);
Object[] args = invocation.getArguments();
String identity = retryFunction.identity();
if (StringUtils.isBlank(identity)) {
identity = RetryHandlerUtils.getMethodIdentity(invocation.getMethod());
Optional optional = RetryHandlerRegistration.get(identity);
if (optional.isPresent()) {
return optional.get().handle(ArrayUtils.isEmpty(args) ? null : args[0]);
throw new IllegalArgumentException("找不到對應的RetryHandler代理,identity=" + identity);
因為RetryHandlerRegistration中注冊的是ImmediatelyRetryHandler,是以執行的是ImmediatelyRetryHandler的handle方法
doPost方法建立的是ImmediatelyRetryHandler
public RetryHandler doPost(RetryHandler retryHandler) {
if (retryHandler instanceof GenericRetryHandler) {
return new ImmediatelyRetryHandler((GenericRetryHandler) retryHandler, retryTaskFactory, retryTaskMapper, beforeTask);
return new ImmediatelyRetryHandler(new DefaultRetryHandler(retryHandler), retryTaskFactory, retryTaskMapper, beforeTask);
接下來我們看看ImmediatelyRetryHandler.handle方法做了什麼
public Object handle(Object arg) {
RetryContext retryContext = new RetryContext(genericRetryHandler, arg);
Object result;
RetryTask retryTask;
// 是否在執行任務之前插入資料庫 |配置false則表示,隻有任務執行報錯才插入資料庫|
if (beforeTask) {
retryTask = retryTaskFactory.create(genericRetryHandler, arg);
retryTaskMapper.insert(retryTask);
try {
result = genericRetryHandler.handle(arg);
retryContext.setResult(result);
completeTask(retryTask);
onRetry(retryContext);
onComplete(retryContext);
} catch (NoRetryException e) {
retryContext.setException(e);
failureTask(retryTask, retryContext);
onError(retryContext);
throw e;
} catch (RuntimeException e) {
if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {
//隻有最大可重試次數為0,才會執行到這裡
failureTask(retryTask, retryContext);
onRetry(retryContext);
onError(retryContext);
} else {
updateRemark(retryTask, e);
}
return result;
//等待重試
retryTask = retryTaskFactory.create(genericRetryHandler, arg);
retryTask.setRemark(StringUtils.left(e.getMessage(), 1000));
retryTaskMapper.insert(retryTask);
return result;
這個方法有點長,不過代碼還算簡單,簡而言之就是重試的方法發生異常後入庫( retryTaskFactory.create(genericRetryHandler, arg);)的操作,當然裡邊有序列化參數,修改重試表的狀态等操作,就不再詳細講了(比較簡單,相信大家都看得懂(*╹▽╹*)
至此,調用帶有@RetryFunction注解的方法第一被調用,以及如何把重試任務入庫的操作就完成了,下面講解重試的邏輯
定時重試邏輯
上邊有講到把重試任務注冊為定時任務的邏輯,再看一下代碼吧
可以看到,注冊的是一個DefaultRetryProcessor,就是說,每次定時任務調用的是該類的doRetry方法,以quartz為例
public class RetryJob implements Job {
private RetryProcessor retryProcessor;
public RetryJob() {
public RetryJob(RetryProcessor retryProcessor) {
this.retryProcessor = retryProcessor;
public void execute(JobExecutionContext context) {
retryProcessor.doRetry();
下邊我們看看doRetry都做了些什麼
public void doRetry() {
log.info("開始執行Identity={}的重試,maxRetryCount={}, initialDelay={}", genericRetryHandler.identity(), genericRetryHandler.maxRetryCount(), genericRetryHandler.initialDelay());
List tasks = retryTaskMapper.queryNeedRetryTaskList(genericRetryHandler.identity(), genericRetryHandler.maxRetryCount(), genericRetryHandler.initialDelay());
if (tasks == null) {
return;
log.info("Identity={}目前有{}個任務準備重試", genericRetryHandler.identity(), tasks.size());
if (genericRetryHandler.ignoreException()) {
tasks.forEach(this::doRetryWithIgnoreException);
tasks.forEach(this::doRetry);
相信聰明的你肯定猜到了,沒錯,取出之前入庫的資料開始進行重試
private void doRetry(RetryTask retryTask) {
log.info("開始重試Identity={},Id={}的任務", retryTask.getIdentity(), retryTask.getTaskId());
retryedRetryHandler.setRetryTask(retryTask);
String json = retryTask.getParams();
if (StringUtils.isBlank(json)) {
retryedRetryHandler.handle(null);
retryedRetryHandler.parseArgsAndhandle(json);
重試調用的是retryedRetryHandler.handle()的方法
retryTask.setRetryCount(retryTask.getRetryCount() + 1);
RetryContext retryContext = new RetryContext(genericRetryHandler, arg, retryTask.getRetryCount());
try {
result = genericRetryHandler.handle(arg);
retryContext.setResult(result);
completeTask(retryTask);
onRetry(retryContext);
onComplete(retryContext);
} catch (NoRetryException e) {
retryContext.setException(e);
failureTask(retryTask, retryContext);
onError(retryContext);
throw e;
} catch (RuntimeException e) {
if (retryTask.getRetryCount() == genericRetryHandler.maxRetryCount()) {
} else {
update(retryTask, retryContext);
if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {
//重試次數達到最大,觸發失敗回調
retryedRetryHandler.handle方法主要是調用目标方法後,如果目标方法沒報錯,則把表中的狀态修改成功,發生異常後更新表中的異常資訊,達到最大重試次數後,把表中的狀态改為失敗,當然其中有把資料庫中的參數反序列的操作.
到這裡,smart-retry的大緻流程,源碼解讀就完成了,當然,這并不是全部代碼,隻是主流程的代碼,有興趣的同學可以把代碼拉下來,詳細閱讀以下
總結
smart-retry支援異步重試,支援重試持久化,用着還是相當不錯的,但是還是有缺點的,比如,1. 隻支援有且僅有一個參數2.每一個重試方法都對應一個定時任務,會造成線程的過度使用
是以,我在該源碼的基礎上,對smart-retry進行了改造,改造點如下
- 支援重試的方法有多個參數
- 支援指定抛出哪些異常後重試
- 支援配置在注解上是否在執行方法前入庫
- 隻提供重試的接口給使用者,具體定時任務讓使用者自己去實作,比如改造後例子中的定時任務用的是xxl-job
如果想詳細了解改造後的smart-retry,請參照smart-retry改造更新文檔