天天看点

采用事件溯源重构支付功能

事件溯源是什么

        事件溯源是以时间顺序记录所发生的领域事件,之后可通过所记录的领域事件得到相应对象当前状态的一种设计理念。这种设计理念从根本上颠覆了以更新方式维护对象状态的传统方式,其优点是保存了聚合(如果还没听说过这个概念,可以先了解下DDD)的操作历史,从而为审计和数据分析提供支持;对于事件只进行插入,也就无需锁和事务,因此性能更好。缺点是数据量会更大;需要重放所有相关事件来得到当前状态;有违大多数开发人员的习惯思维方式。

为什么用事件溯源

        在为新类型单据开发支付功能的过程中发现了原支付功能中诸多不合理设计和缺陷,比如表结构(不参与查询的属性定义为表中的列,也就是可以保存为json的数据以结构化方式保存),代码不能应对并发。

        原支付功能由payment表负责保存单据的支付状态,每次支付操作的请求数据和回调数据都保存在与payment表结构完全一样的payment_history表中,两个表的唯一的区别是payment_history表的单据编号字段保存的是单据编号 + 支付次数,像这样:0001-1,0001-2。当执行任何与支付有关的操作(支付,拒绝,退款)或接收到支付接口的回调后须同时更新payment和paymeny_history两个表。

        支付功能对于任何系统的重要性都不言而喻,因此当初的设计者才会将有关支付的所有操作都记录在paymeny_history这个相当于支付日志的支付历史表中。事件溯源就是为类似这种需要完整保存历史轨迹的需求而量身打造的,但其实这并不是事件溯源的目的,而只是该设计理念所附带的好处。这就是为什么选择以事件溯源来重构支付功能的唯一原因。

如何以事件溯源进行重构

        事件溯源的核心是事件,没有事件何以溯源。所以,首先要做的就是从支付流程中识别并提取出所有领域事件。根据本系统支付业务的特点,提取出如下事件:支付数据验证通过,支付数据验证未通过,支付成功,支付失败,退款,拒绝支付。事件对象中属性的定义取决于系统的特点,并没有通用的标准,不过至少要包含事件名和时间两个核心属性。以下是本系统支付领域事件的定义。

public static class StartPay extends Event {
        public StartPay(Long billId) {
            super(billId, StartPay.class, "开始支付");
        }
    }

    public static class Verified extends Event {
        public Verified(Long billId, String data) {
            super(billId, Verified.class, data);
        }
    }

    public static class VerificationFailed extends Event {
        public VerificationFailed(Long billId, String data) {
            super(billId, VerificationFailed.class, data);
        }
    }

    public static class Payed extends Event {
        public Payed(Long billId, String data) {
            super(billId, Payed.class, data);
        }
    }

    public static class PayFailed extends Event {
        public PayFailed(Long billId, String data) {
            super(billId, PayFailed.class, data);
        }
    }

    public static class Refused extends Event {
        public Refused(Long billId, String data) {
            super(billId, Refused.class, data);
        }
    }

    public static class Refunded extends Event {
        public Refunded(Long billId, String data) {
            super(billId, Refunded.class, data);
        }
    }

    public static class Rejected extends Event {
        public Rejected(Long billId, String data) {
            super(billId, Rejected.class, data);
        }
    }
           
@Setter
@ToString
public class Event {
    private Long ID;
    private Long BILL_ID;
    private String BILL_NO;
    private String EVENT_NAME;
    private Date CREATE_TIME = new Date();
    private String DATA;

    public Event(Long billId, Class<? extends Event> cla, String data) {
        this.BILL_ID = billId;
        this.DATA = data;
        this.EVENT_NAME = cla.getSimpleName();
    }
}
           

        接下来便可按照支付业务规则将上面的领域事件持久化,也就是实现新的支付方法。

public String pay(Long billId) {
        return PayContext.builder()
                .billAggregationForPay(getBillAggregationForPay(billId))
                .billAggregationRepository(billAggregationRepository)
                .eventRepository(eventRepository)
                .supplierRepository(supplierRepository)
                .paymentInterfaceInvoker(paymentInterfaceInvoker)
                .payeeRepository(payeeRepository)
                .build()
                .pay();
    }
           
@Builder
    static class PayContext {
        private String callbackUrl;
        private BillAggregationForPay billAggregationForPay;
        private BillAggregationRepository billAggregationRepository;
        private PaymentEventRepository eventRepository;
        private PayeeRepository payeeRepository;
        private IRepository<Supplier> supplierRepository;
        private PaymentInterfaceInvoker paymentInterfaceInvoker;

        public String pay() {
            try {
                PayMessage payMessage = billAggregationForPay.lock();
                if (PayMessage.CAN_PAY != payMessage) {
                    log.info("{}, billNO -> {}", payMessage.message, billAggregationForPay.getBillNo());
                    return payMessage.message;
                }
                PaymentInterfaceInvoker.PaymentRequest request = buildPaymentRequest();
                PaymentInterfaceInvoker.PaymentResponse paymentResponse = paymentInterfaceInvoker.invoke(request);
                eventRepository.save(paymentResponse.buildEvent(billAggregationForPay.getBillId(), request));

                return paymentResponse.toString();
            } finally {
                billAggregationForPay.unlock();
            }
        }
           

        以上重构后的支付代码摒弃了原有的事务脚本形式的代码风格,通过PaymentService将支付逻辑所需的repository等对象汇聚起来后一并交付给PayContext,PayContext以充血模型方式在其内部完成支付操作。PaymentService在这个过程中只扮演协调者,不涉及任何业务规则,全部业务规则由PayContext承担。

        而相较于原支付回调处理方法,重构后的支付回调处理方法在代码的简洁性和职责分配方面都做了非常大的优化。下面是原支付回调处理方法。

private void handlePaymentCallback(CallbackCtx ctx) {
            //1、更新支付信息
            this.updatePaymentInfo(ctx);

            //2、增加根据支付返回结果预算、押金、数据操作 begin
            this.dealBudget(ctx);

            //3、增加根据支付返回结果生成借款单操作
            this.createLoan(ctx);

            //4、向SRM推送数据
            this.callSRM(ctx);

            //5、创建会计凭证
            this.callAd(ctx);

            //6、付款成功,关联方交易,通知税务系统
            this.createPaymentRequest(ctx.getBillPO(), ctx.getPaymentPO());

            //7、发送邮件
            this.sendMail(ctx);
    }
           

        先不说上面这个支付回调处理方法中如果有一个抛异常是否会导致其后的方法不能执行的问题,就仅从单一职责这个基本原则出发,该回调方法唯一的职责只应有支付状态更新,也就是除了第一个方法——updatePaymentInfo(),其余都不应是此回调方法的职责。显然,这个回调方法承担了太多它不应承担的东西。另外,别看以上封装出了7个方法,但在每个方法背后都是少则几十,多则数百行的代码,绝对的牵一发动全身。

        那支付回调结果的后续处理应由谁来负责呢?以下是重构后的支付回调方法。

public void handleCallback(CallbackDTO callbackDTO) {
        log.info("支付回调数据 -> {}", callbackDTO);
        Event event = callbackDTO.getEvent(billCrudService.get(callbackDTO.getOrderNo()).getBill().getID());
        eventRepository.save(event);
        publishEvent(event);
    }

    @Autowired
    private EventPublisher eventPublisher;

    @Async
    private void publishEvent(Event event) {
        eventPublisher.publish(event);
    }
           

        重构后的支付回调处理方法只负责保存根据回调结果生成的领域事件,然后将该领域事件以异步方式发布出去。发布出去的事件则由相应的EventHandler进行处理。当然,事件的发布也可以使用AOP拦截handleCallback方法实现。

并发控制

        并发控制对于事件溯源极其重要,处理稍有不当就会导致聚合状态混乱,最终影响系统的行为。对于并发控制可选的技术方案有:关系数据库锁(乐观锁或行锁),分布锁或流量限制(RateLimter或Nginx)。如果在限流的粒度上没有严格要求可以选择流量限制方案。如果想实现细粒度的并发控制则需要采用分布锁或数据库锁方案,比如控制数据库中一条记录或一个过程的并发操作。

        支付功能的业务特点决定了必须采用细粒度的并发控制方案,也就是要控制每一个单据的并发操作。当然也可以采用限流方案,但在分布式环境下,限流想要达到与数据库锁和分布锁同样的效果就必须对流量进行严格的限制,如此则势必会对性能造成严重影响。出于对性能和复杂度的考虑,新支付功能的并发控制采用的是基于关系数据库的乐观锁方案。

        其实除上述几个方案外,还有一个最为简单的方案,就是通过关系数据库的唯一索引来控制并发,即为事件表中的事件名和事件所属的事件流标识这两个字段建立联合唯一索引,如此便可阻止对同一个事件流插入相同事件,在代码层面则需要 try catch 住事件插入的相关代码,在 catch 中专门处理duplicate key异常。该方案虽然是最简单的,但个人并不建议也不太喜欢这种以捕获异常的方式来控制并发,所以在本小节开始并未将其作为候选方案之一。

重构效果

        从以上介绍可以看出,基于事件溯源对支付功能的重构大幅降低了代码间的耦合,从而为以后的维护奠定了良好的基础,另一方面,兼具性能和并发控制的乐观锁方案解决了原支付功能的并发缺陷问题,有效增强了新支付功能的健壮性。

        如果你对于事件溯源有所了解,那么可以看到对于支付功能的重构没有使用事件快照,这是因为每条单据的支付事件是十分有限的,所以也就不需要用快照方式来提升性能。

        需要注意的是,并不是任何业务都适合采用事件溯源,且事件溯源的代码设计思维与传统的设计思维大不相同,甚至是颠覆式的,所以需要开发人员在清晰准确认识业务领域的基础上抛弃掉固有的代码设计习惯,然而,只要涉及到习惯的事情其难度必定不会小。

继续阅读