天天看點

基于事件流的輕量級異常容錯設計—支援接口可重入

寫在前面

在我們平時的業務代碼中,最常見的代碼結構就是外部的請求打過來,首先進行必要的參數校驗,接着根據參數對關聯實體的狀态進行校驗,然後再校驗業務邏輯,最後推進關聯實體的狀态。下面以一段代碼簡單示例一下

pulic class ReentryServiceImpl implements ReentryService {
        
    public void bizHandle(String entityName) {
       Entity1 entity1 = queryEntityByName(entityName);
        if (entity1 == null) {
          return;
        }
        if (!entity1.getStatus.equals(Entity1StatusEnums.ACCEPT)) {
          return;
       }
       entity1Service.finishEntity1(entity);
       entity2Service.createEntity2(entity.getEntity2());
       entity3Service.rollBackEntity3(entity.getEntity3());
    }
    
}           

在這種代碼當中,如果後面三行狀态推進的服務調用失敗,如果缺乏适當的異常處理機制的話,就會導緻整個服務不可用,在bizHandle(String)方法重試的時候,可能由于entity1Service.finishEntity1(entity);調用成功導緻方法入口處的狀态校驗就失敗了,接着方法直接傳回(或者抛出異常),最終造成關聯實體的狀态不一緻。

當然,對于這種情況,由于方法的入口處的校驗是針對entity1來做的,那麼在後續狀态推進的過程中,調整狀态推進的順序,把entity1的狀态推進放在最後來做,然後基于服務調用的幂等性,其實也可以保證重試時的可重入。但是這種做法過度依賴業務場景,不具備通用的場景處理能力哦。如果此處的校驗并不是針對entity1來做的,而是對entity1,entity2,entity3來做的聯合的狀态校驗,那麼不管如何調整後續狀态推進的順序,都是無法保證重試的可重入的。

是以,淩駕于具體場景之上,我希望做到的是,編寫業務代碼的時候,可以盡量降低開發人員的負擔,使用一種統一的處理機制來保證接口重試時的可重入能力。

服務與關聯事件

通過上面的概述,相信大家已經知道我要做一件什麼樣的事了——異常情況下支援接口重試時的可重入機制。用白話說就是在接口第一次調用失敗的情況下,後續進行重試,為了避免重試時狀态校驗失敗而導緻重試失敗,我需要知道上一次服務調用的時候,方法執行到了哪一步,然後,可以直接從這一步開始繼續往下執行(為了保證業務資料正确,所有的外部服務接口都需要支援幂等)。

為了達到這一目的,自然想到的就是使用一個清單或者隊列把所有的服務調用組織起來,比如上述例子中的:

entity1Service.finishEntity1(entity);
entity2Service.createEntity2(entity.getEntity2());
entity3Service.rollBackEntity3(entity.getEntity3());           

并且,必須保證這些服務調用的先後順序是确定的(任意一次重試的時候,看到的這三個服務調用順序都是一緻的)。首先我把這些服務調用都當做是ReentryServiceImpl.bizHandle(String)方法執行過程中的事件,在做bizHandle的時候,需要依次去執行entity1Service.finishEntity1(entity)、entity2Service.createEntity2(entity.getEntity2())和entity3Service.rollBackEntity3(entity.getEntity3())。是以,首先定義一個通用事件接口:

public interface Event {

    /**
     * 執行本次事件的任務
     *
     * @throws Exception 異常
     */
    void handle(Context context) throws Exception;

    /**
     * 倒序執行本次事件,目前事件的所有後續事件都執行完了,執行該方法
     * @throws Exception 異常
     */
    void postHandle(Context context) throws Exception;
}           

有了這個接口之後,就可以對上述服務中的三個事件進行定義了。依次為:

@RelatedService(serviceName = "ReentryService", methodName = "bizHandle", order = 1)
public class FinishEntity1Event implements Event {

    @Override
    public void handle(Context context) throws Exception {
        entity1Service.finishEntity1(context);
    }

    @Override
    public void postHandle(Context context) throws Exception {
        //log.info("Context success, context={}",context);
    }
}           
@RelatedService(serviceName = "ReentryService", methodName = "bizHandle", order = 2)
public class CreateEntity2Event implements Event {

    @Override
    public void handle(Context context) throws Exception {
        entity2Service.createEntity2(context);
    }

    @Override
    public void postHandle(Context context) throws Exception {
        //log.info("Context success, context={}",context);
    }
}           
@RelatedService(serviceName = "ReentryService", methodName = "bizHandle", order = 3)
public class RollBackEntity3Event implements Event {

    @Override
    public void handle(Context context) throws Exception {
        entity3Service.rollBackEntity3(context);
    }

    @Override
    public void postHandle(Context context) throws Exception {
        //log.info("Context success, context={}",context);
    }
}           

好了,到此三個事件定義好了,估計大家看到在事件上打了個注解@RelatedService,先給出代碼:

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Component
public @interface RelatedService {

    /**
     * 事件關聯的服務 {@link Event}
     *
     * @return 一般為服務的bean name
     */
    String serviceName() default "";

    /**
     *
     * @return 調用事件的服務方法名
     */
    String methodName() default "";

    /**
     * 事件在服務中的執行順序, 值越小,執行順序越靠前
     * @return 優先級
     */
    int order() default 0;

}           

在這個注解上總共有三個參數,分别為:

  • order:可以定義事件執行先後順序,值越小,執行順序越靠前
  • serviceName:事件執行時所在服務名,一般為bean的名字,在上面這個例子中就是:ReentryService
  • methodName:執行事件的方法名,使用者自定義的,在上面的例子中可以是:bizHandle

并且這個注解繼承了@Component,是以上面的三個事件都會被注入到Spring IoC容器中進行管理。

好了,有了上面這一系列鋪墊,通過spring IoC容器我們就可以非常友善的指導目前正在調用的ReentryService服務的bizHandle方法中需要執行的事件是什麼,并且這些事件之間執行的先後順序。

事件容器和事件隊列

有了上面的内容,我就可以非常友善的在服務執行的過程中知道我目前正在執行的服務将要執行哪些事件,這些事件執行的先後順序是怎麼樣的。那麼,接下來我需要做的就是把這些事件編排起來,使用一個統一的容器來管理這些事件。先給出事件容器的代碼(一般是一個服務的一個方法對應一個事件容器)

@Slf4j(topic = "APPLICATION")
public class EventContainer implements Event {

    private String serviceName;

    private String methodName;

    @Getter
    private final Queue<EventNode> eventNodes = new PriorityQueue<>();

    public EventContainer(String serviceName, String methodName) {
        this.serviceName = serviceName;
        this.methodName = methodName;
    }

    public void init() {

        Map<String, Object> eventBeans = Env.getBeansWithAnnotation(RelatedService.class);

        log.info("EventPipeline.init success, eventBeans={}",eventBeans);

        for (Map.Entry<String, Object> beanEntry : eventBeans.entrySet()) {
            String eventName = beanEntry.getKey();
            Object beanObj = beanEntry.getValue();
            if (!(beanObj instanceof Event)) {
                continue;
            }
            RelatedService relatedService = beanObj.getClass().getAnnotation(RelatedService.class);
            String serviceName = relatedService.serviceName();
            String methodName = relatedService.methodName();
            int order = relatedService.order();
            if (!serviceName.equals(this.serviceName)) {
                continue;
            }
            if (!methodName.equals(this.methodName)) {
                continue;
            }
            EventNode curEventNode = new EventNode(order,  (Event)beanObj);
            eventNodes.offer(curEventNode);
        }

        log.info("EventPipeline.init success, {}",eventNodes);
    }

    @Override
    public void handle(Context context) throws Exception {
        init();
        while (eventNodes.peek() != null) {
            EventNode curNode = eventNodes.poll();
            curNode.handle(context);
            log.info("EventContainer.handle, {},{}",curNode.getOrder(), curNode.getCurEvent());
        }
    }

    @Override
    public void postHandle(StockDiffContext context) throws Exception {

    }
}           

EventContainer也是Event的實作類,其handle(Context)方法是整個事件流執行的入口。

在EventContainer中有三個非常重要的内容:

  • serviceName,事件容器對應的服務名
  • methodName,事件容器對應的方法名
  • Queue eventNodes,事件節點的優先級隊列(事件隊列),事件會在這個隊列中被依次執行

而事件隊列的初始化就是通過前面的@RelatedService注解實作的,具體見代碼。

EventNode的定義代碼:

public class EventNode implements Comparable<EventNode>, Event {

    @Getter
    private String nodeName;

    @Getter
    private int order;

    @Getter
    private Event curEvent;

    EventNode(){}

    public EventNode(String name, int order) {
        this.nodeName = name;
        this.order = order;
    }

    public EventNode(String name, int order, Event curEvent) {
        this(name,order);
        this.curEvent = curEvent;
    }

    public EventNode(int order, Event curEvent) {
        this.curEvent = curEvent;
        this.order = order;
    }


    @Override
    public int compareTo(EventNode o) {
        return this.order - o.order;
    }

    @Override
    public void handle(Context context) throws Exception {
        this.curEvent.handle(context);
    }

    @Override
    public void postHandle(Context context) throws Exception {
        this.curEvent.postHandle(context);
    }
}           

有了上面的内容,我們就建構了一整套基于事件流的服務調用機制了。總結一下:

  • Event,事件的接口,規定了事件執行的動作
  • EventNode,事件節點,友善對事件進行編排
  • EventContainer,事件容器,管理服務所對應的事件,并負責執行事件
  • @RelatedService,事件關聯服務的注解

基于事件流的機制,本文開始的例子,代碼可以修改為:

pulic class ReentryServiceImpl implements ReentryService {
        
    public void bizHandle(Context context) {
       Entity1 entity1 = queryEntityByName(context.getEntityName());
        if (entity1 == null) {
          return;
        }
        if (!entity1.getStatus.equals(Entity1StatusEnums.ACCEPT)) {
          return;
       }
       EventContainer eventContainer = new EventContainer("ReentryService", "bizHandle");
       eventContainer.handle(context);
    }
    
}           

支援可重入

上面所述的部分僅僅是實作了基本的事件流的處理機制,但是還不具備接口的可重入能力,要想實作可重入,就必須在服務調用的時候,對調用的進度進行持久化,這樣友善下次調用的時候,對服務進行恢複。

想實作這一點,有很多方法,最簡單的就是使用基于幂等+标志位的方式,持久化在db中,謝謝我的師兄提供了這個非常簡單實用的方案。

代碼如下:

pulic class ReentryServiceImpl implements ReentryService {
        
    public void bizHandle(Context context) {
       // 計算幂等鍵,保證唯一
       String idempotentKey = buildIdempotentKey(serviceName,methodName,context.getEntityName());
       IdempotentDO idem = queryIdempotent(idempotentKey);
       if (idem != null) {
            context.setExecuteProcess(idem.getExecuteProcess());
            EventContainer eventContainer = new EventContainer("ReentryService", "bizHandle");
            eventContainer.handle(context);
            return;
       }
       Entity1 entity1 = queryEntityByName(entityName);
        if (entity1 == null) {
          return;
        }
        if (!entity1.getStatus.equals(Entity1StatusEnums.ACCEPT)) {
          return;
       }
       EventContainer eventContainer = new EventContainer("ReentryService", "bizHandle");
       eventContainer.handle(context);
    }
    
}           

相應的,每個事件節點在執行的時候,都要重寫postHandle(Context)方法,如下:

public class EventNode implements Comparable<EventNode>, Event {

    @Getter
    private String nodeName;

    @Getter
    private int order;

    @Getter
    private Event curEvent;
    
    //省略其他

    @Override
    public void handle(Context context) throws Exception {
        this.curEvent.handle(context);
    }

    @Override
    public void postHandle(Context context) throws Exception {
        this.curEvent.postHandle(context);
        //更新執行進度,如果沒有這條幂等記錄,就插入
        updateExecuteProcess(context.getIdempotentKey(),this.order);
    }
}           

還差最後一步改造就大功告成了,重寫EventContainer的handle(Context)方法

@Override
    public void handle(Context context) throws Exception {
        init();
        int process = context.getExecuteProcess();
        int idx = 0;
        while (eventNodes.peek() != null) {
            EventNode curNode = eventNodes.poll();
            if (++idx <= process){
                continue;
            }
            curNode.handle(context);
            log.info("EventContainer.handle, {},{}",curNode.getOrder(), curNode.getCurEvent());
        }
    }           

寫在最後

這點總結記錄了自己日常開發中針對痛點的一些想法,可能并不是很成熟,要是和大家有共鳴的話,那真是太好了,歡迎批評指教~