天天看點

smart-retry源代碼閱讀背景smart-retry資訊如何使用源碼閱讀總結

背景

基礎技術組的接口重推元件基于smart-retry源碼進行了改造

smart-retry資訊

倉庫位址

https://gitee.com/hack3389/smart-retry/

閱讀分支

分支:master

commit

主要功能

Smart Retry主要是用來進行方法重試的。和Guava Retry、Spring Retry相比,Smart Retry最大的特點是異步重試,支援持久化,系統重新開機之後可以繼續重試。

功能特點

  • 方法重試持久化,系統重新開機之後可以繼續重試
  • 異步重試(不支援同步重試)
  • 支援接口實作和聲明式方式

架構圖

smart-retry源代碼閱讀背景smart-retry資訊如何使用源碼閱讀總結

如何使用

引入依賴

      com.github.hadoop002.smartretry

      retry-spring4

      使用最新版本

初始化表

create table sys_retry_task (

task_id bigint not null primary key auto_increment,

identity_name varchar(50) not null COMMENT '任務的唯一辨別',

params text COMMENT '參數',

status tinyint not null COMMENT '狀态。1: 進行中,2: 成功,3: 失敗',

retry_count int not null default 0 COMMENT '重試次數',

remark varchar(1000) COMMENT '備注',

create_date datetime not null,

edit_date datetime) ENGINE=InnoDB COMMENT='系統重試表';

create index idx_identityname_status ON sys_retry_task(identity_name asc,status asc);

編寫業務邏輯

  @RetryFunction(identity = "order.payment")

  public void payOrderAndUpdateStatus(Order order) {

      boolean success = paymentBusiness.doPayment(order);

      if (success) {

          orderBusiness.updateOrderPayStatus(order);

      } else {

          orderBusiness.updateOrderPayFail(order);

      }

  }

或者

  @Slf4j

  @Service("orderPaymentBusiness")

  public class OrderPaymentBusiness implements RetryHandler {

      @Autowired

      private PaymentBusiness paymentBusiness;

      private OrderBusiness orderBusiness;

      @Override

      public String identity() {

          return "order.payment";

      public Void handle(Order order) {

          boolean success = paymentBusiness.doPayment(order);

          if (success) {

              orderBusiness.updateOrderPayStatus(order);

          } else {

              orderBusiness.updateOrderPayFail(order);

          }

          return null;

打開開關

在啟動入口上加上@EnableRetrying 注解

源碼閱讀

源碼結構

  • retry-cpre:重試子產品的核心,定義了一系列的接口和擴充點
  • retry-spring4:基于spring4實作的重試子產品
  • retry-serializer-jackson2:使用jackson2來實作參數的序列化和反序列化
  • retry-serializer-gson:使用gson來實作參數的序列化和反序列化
  • retry-serializer-fastjson:使用fastjson來實作參數的序列化和反序列化
  • retry-samples:配套的示例demo,可直接使用

大緻流程

  • 系統啟動後,把所有com.github.smartretry.core.RetryHandler和帶有@RetryFunction注解的方法注冊為定時任務。
  • 所有com.github.smartretry.core.RetryHandler和帶有@RetryFunction注解的方法都會被Spring進行代理,執行的時候,會先把參數序列化,然後把執行任務插入到資料庫。最後根據任務執行的成功與否,更新任務的相應狀态。
  • 定時任務定時從表裡面擷取未成功的任務,進行重試

根據流程走讀代碼

根據整個流程走讀代碼應該會對代碼有更清晰的認識

系統啟動

系統啟動的核心處理邏輯主要是在類RetryAnnotationBeanPostProcessor中,下面通過流程仔細分析該類

  • 掃描所有帶有@RetryFunction注解和實作RetryHandler接口的類

    @Override

    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        if (bean instanceof AopInfrastructureBean) {

            // Ignore AOP infrastructure such as scoped proxies.

            return bean;

        }

        Class targetClass = AopProxyUtils.ultimateTargetClass(bean);

        if (!this.postedClasseCache.contains(targetClass)) {

            Object targetObject = AopProxyUtils.getSingletonTarget(bean);

            if (RetryHandler.class.isAssignableFrom(targetClass)) {

                RetryHandlerUtils.validateRetryHandler(targetClass);

                log.info("發現RetryHandler的執行個體:{},準備注冊", targetClass);

                retryHandlers.add((RetryHandler) targetObject);

                return bean;

            }

            ReflectionUtils.MethodFilter methodFilter = method -> method.getAnnotation(RetryFunction.class) != null;

            Set methods = MethodIntrospector.selectMethods(targetClass, methodFilter);

            methods.forEach(method -> processRetryFunction(targetObject, method));

            postedClasseCache.add(targetClass);

        return bean;

    }

改類實作了BeanPostProcessor接口,重寫了postProcessAfterInitialization方法(每個bean初始化之後執行)

主要為兩處判斷/過濾

1.判斷是否實作了RetryHandler

 if (RetryHandler.class.isAssignableFrom(targetClass)) {

2.過濾打了@RetryFunction的方法

ReflectionUtils.MethodFilter methodFilter = method -> method.getAnnotation(RetryFunction.class) != null;

Set methods = MethodIntrospector.selectMethods(targetClass, methodFilter);

  • 把打了@RetryFunction注解的都轉化為RetryHandler,即最終都是走的RetryHandler

    protected void processRetryFunction(Object bean, Method method) {

        log.info("發現@RetryFunction的執行個體:{},準備注冊", method.toString());

        Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());

        RetryHandlerUtils.validateRetryFunction(method);

        RetryFunction retryFunction = method.getAnnotation(RetryFunction.class);

        Supplier retryListenerSupplier = () -> {

            RetryListener retryListener = null;

            String retryListenerName = retryFunction.retryListener();

            if (StringUtils.isNotBlank(retryListenerName)) {

                retryListener = defaultListableBeanFactory.getBean(retryListenerName, RetryListener.class);

            return retryListener;

        };

        retryHandlers.add(new MethodRetryHandler(bean, invocableMethod, retryFunction, retryListenerSupplier));

  • 把所有的retryHandlers周遊注冊為定時任務,預設用的quartz

    public void afterSingletonsInstantiated() {

        postedClasseCache.clear();

        this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);

        this.retryRegistry = defaultListableBeanFactory.getBean(RetryRegistry.class);

        boolean beforeTask = environment.getProperty(EnvironmentConstants.RETRY_BEFORETASK, Boolean.class, Boolean.TRUE);

        this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);

        if (this.retrySerializer == null) {

            this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(retryTaskMapper, beforeTask);

        } else {

            this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, beforeTask);

        retryHandlers.forEach(this::registerJobBean);

        retryHandlers.clear();

    protected void registerJobBean(RetryHandler retryHandler) {

        if (retryHandler.identity().length() > 50) {

            throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");

        RetryHandler retryHandlerProxy = retryHandlerPostProcessor.doPost(retryHandler);

        RetryHandlerRegistration.registry(retryHandlerProxy);

        RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer);

        retryRegistry.register(retryHandler, retryProcessor);

至此,系統啟動所做的任務就完成了

打有@RetryFunction的注解和實作RetryHandler的接口的方法都會被Spring代理

  • @RetryFunction注解的方法如何被代理

public class RetryHandlerMethodPointcut implements Pointcut {

    public ClassFilter getClassFilter() {

        return ClassFilter.TRUE;

    public MethodMatcher getMethodMatcher() {

        return new StaticMethodMatcher() {

            @Override

            public boolean matches(Method method, Class targetClass) {

                return RetryHandlerUtils.isRetryFunctionMethod(method);

}

    public static boolean isRetryFunctionMethod(Method method) {

        if (method.getAnnotation(RetryFunction.class) != null && method.getParameterCount() == 1) {

            return !Object.class.equals(method.getParameterTypes()[0]);

        return false;

實作Pointcut接口通過isRetryFunctionMethod(Method method)方法判斷是否是需要代理的方法

  • 實作RetryHandler接口的方法如何被代理

public class RetryHandlerClassPointcut implements Pointcut {

        return RetryHandler.class::isAssignableFrom;

                return RetryHandlerUtils.isRetryHandlerMethod(targetClass, method);

    public static boolean isRetryHandlerMethod(Class targetClass, Method method) {

        if ("handle".equals(method.getName()) && method.getParameterCount() == 1 && method.isBridge() && method.isSynthetic()) {

            //RetryHandler接口有泛型,需要特殊處理

            return true;

        Type interfaceType = getRetryHandlerGenericInterface(targetClass);

        if (interfaceType == null) {

            return false;

        Class argsInputType = Object.class;

        if (interfaceType instanceof ParameterizedType) {

            argsInputType = (Class) ((ParameterizedType) interfaceType).getActualTypeArguments()[0];

        Class parameterType = argsInputType;

        return "handle".equals(method.getName()) && method.getParameterCount() == 1 && method.getParameterTypes()[0].equals(parameterType);

先對類進行過濾,要求實作RetryHandler

   public ClassFilter getClassFilter() {

       return RetryHandler.class::isAssignableFrom;

   }

再對方法進行過濾,詳細請看isRetryHandlerMethod(Class targetClass, Method method)方法

打有@RetryFunction注解的方法被調用時

當執行到帶有@RetryFunction方法時(實作了RetryHandler也差不多的邏輯,就不再贅述了),會被方法攔截器攔截,

public class RetryHandlerMethodInterceptor implements MethodInterceptor {

    public Object invoke(MethodInvocation invocation) {

        RetryFunction retryFunction = invocation.getMethod().getAnnotation(RetryFunction.class);

        Object[] args = invocation.getArguments();

        String identity = retryFunction.identity();

        if (StringUtils.isBlank(identity)) {

            identity = RetryHandlerUtils.getMethodIdentity(invocation.getMethod());

        Optional optional = RetryHandlerRegistration.get(identity);

        if (optional.isPresent()) {

            return optional.get().handle(ArrayUtils.isEmpty(args) ? null : args[0]);

        throw new IllegalArgumentException("找不到對應的RetryHandler代理,identity=" + identity);

因為RetryHandlerRegistration中注冊的是ImmediatelyRetryHandler,是以執行的是ImmediatelyRetryHandler的handle方法

doPost方法建立的是ImmediatelyRetryHandler

    public RetryHandler doPost(RetryHandler retryHandler) {

        if (retryHandler instanceof GenericRetryHandler) {

            return new ImmediatelyRetryHandler((GenericRetryHandler) retryHandler, retryTaskFactory, retryTaskMapper, beforeTask);

        return new ImmediatelyRetryHandler(new DefaultRetryHandler(retryHandler), retryTaskFactory, retryTaskMapper, beforeTask);

接下來我們看看ImmediatelyRetryHandler.handle方法做了什麼

    public Object handle(Object arg) {

        RetryContext retryContext = new RetryContext(genericRetryHandler, arg);

        Object result;

        RetryTask retryTask;

        // 是否在執行任務之前插入資料庫 |配置false則表示,隻有任務執行報錯才插入資料庫|

        if (beforeTask) {

            retryTask = retryTaskFactory.create(genericRetryHandler, arg);

            retryTaskMapper.insert(retryTask);

            try {

                result = genericRetryHandler.handle(arg);

                retryContext.setResult(result);

                completeTask(retryTask);

                onRetry(retryContext);

                onComplete(retryContext);

            } catch (NoRetryException e) {

                retryContext.setException(e);

                failureTask(retryTask, retryContext);

                onError(retryContext);

                throw e;

            } catch (RuntimeException e) {

                if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {

                    //隻有最大可重試次數為0,才會執行到這裡

                    failureTask(retryTask, retryContext);

                    onRetry(retryContext);

                    onError(retryContext);

                } else {

                    updateRemark(retryTask, e);

                }

            return result;

                    //等待重試

                    retryTask = retryTaskFactory.create(genericRetryHandler, arg);

                    retryTask.setRemark(StringUtils.left(e.getMessage(), 1000));

                    retryTaskMapper.insert(retryTask);

        return result;

這個方法有點長,不過代碼還算簡單,簡而言之就是重試的方法發生異常後入庫( retryTaskFactory.create(genericRetryHandler, arg);)的操作,當然裡邊有序列化參數,修改重試表的狀态等操作,就不再詳細講了(比較簡單,相信大家都看得懂(*╹▽╹*)

至此,調用帶有@RetryFunction注解的方法第一被調用,以及如何把重試任務入庫的操作就完成了,下面講解重試的邏輯

定時重試邏輯

上邊有講到把重試任務注冊為定時任務的邏輯,再看一下代碼吧

可以看到,注冊的是一個DefaultRetryProcessor,就是說,每次定時任務調用的是該類的doRetry方法,以quartz為例

public class RetryJob implements Job {

    private RetryProcessor retryProcessor;

    public RetryJob() {

    public RetryJob(RetryProcessor retryProcessor) {

        this.retryProcessor = retryProcessor;

    public void execute(JobExecutionContext context) {

        retryProcessor.doRetry();

下邊我們看看doRetry都做了些什麼

    public void doRetry() {

        log.info("開始執行Identity={}的重試,maxRetryCount={}, initialDelay={}", genericRetryHandler.identity(), genericRetryHandler.maxRetryCount(), genericRetryHandler.initialDelay());

        List tasks = retryTaskMapper.queryNeedRetryTaskList(genericRetryHandler.identity(), genericRetryHandler.maxRetryCount(), genericRetryHandler.initialDelay());

        if (tasks == null) {

            return;

        log.info("Identity={}目前有{}個任務準備重試", genericRetryHandler.identity(), tasks.size());

        if (genericRetryHandler.ignoreException()) {

            tasks.forEach(this::doRetryWithIgnoreException);

            tasks.forEach(this::doRetry);

相信聰明的你肯定猜到了,沒錯,取出之前入庫的資料開始進行重試

    private void doRetry(RetryTask retryTask) {

        log.info("開始重試Identity={},Id={}的任務", retryTask.getIdentity(), retryTask.getTaskId());

        retryedRetryHandler.setRetryTask(retryTask);

        String json = retryTask.getParams();

        if (StringUtils.isBlank(json)) {

            retryedRetryHandler.handle(null);

            retryedRetryHandler.parseArgsAndhandle(json);

重試調用的是retryedRetryHandler.handle()的方法

        retryTask.setRetryCount(retryTask.getRetryCount() + 1);

        RetryContext retryContext = new RetryContext(genericRetryHandler, arg, retryTask.getRetryCount());

        try {

            result = genericRetryHandler.handle(arg);

            retryContext.setResult(result);

            completeTask(retryTask);

            onRetry(retryContext);

            onComplete(retryContext);

        } catch (NoRetryException e) {

            retryContext.setException(e);

            failureTask(retryTask, retryContext);

            onError(retryContext);

            throw e;

        } catch (RuntimeException e) {

            if (retryTask.getRetryCount() == genericRetryHandler.maxRetryCount()) {

            } else {

                update(retryTask, retryContext);

            if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {

                //重試次數達到最大,觸發失敗回調

retryedRetryHandler.handle方法主要是調用目标方法後,如果目标方法沒報錯,則把表中的狀态修改成功,發生異常後更新表中的異常資訊,達到最大重試次數後,把表中的狀态改為失敗,當然其中有把資料庫中的參數反序列的操作.

到這裡,smart-retry的大緻流程,源碼解讀就完成了,當然,這并不是全部代碼,隻是主流程的代碼,有興趣的同學可以把代碼拉下來,詳細閱讀以下

總結

smart-retry支援異步重試,支援重試持久化,用着還是相當不錯的,但是還是有缺點的,比如,1. 隻支援有且僅有一個參數2.每一個重試方法都對應一個定時任務,會造成線程的過度使用

是以,我在該源碼的基礎上,對smart-retry進行了改造,改造點如下

  • 支援重試的方法有多個參數
  • 支援指定抛出哪些異常後重試
  • 支援配置在注解上是否在執行方法前入庫
  • 隻提供重試的接口給使用者,具體定時任務讓使用者自己去實作,比如改造後例子中的定時任務用的是xxl-job

如果想詳細了解改造後的smart-retry,請參照smart-retry改造更新文檔