天天看點

微服務整合Seata分布式事務處理

前言
官方文檔: http://seata.io/zh-cn/docs/user/microservice.html

這幾天開始做新項目,是基于RPC+Restful的微服務項目。前端用的Vue,後端用的比較冷門的小型輕量級架構zbus。前端和後端互動是通過restful,後端服務之間通過RPC調用,這就涉及到了分布式事務。

官方文檔很簡潔,網上其他資源都是講的seata與spring cloud、dubbo等架構的整合,但是看到官方這一句:

跨服務調用場景下的事務傳播,本質上就是要把 XID 通過服務調用傳遞到服務提供方,并綁定到 RootContext 中去。隻要能做到這點,理論上 Seata 可以支援任意的微服務架構。

so,當然也可以支援zbus,隻要能擷取到全局事務的XID。

一、從 https://github.com/seata/seata/releases,下載下傳伺服器軟體包,将其解壓縮并啟動。

Usage: sh seata-server.sh(for linux and mac) or cmd seata-server.bat(for windows) [options]
  Options:
    --host, -h
      The host to bind.
      Default: 0.0.0.0
    --port, -p
      The port to listen.
      Default: 8091
    --storeMode, -m
      log store mode : file、db
      Default: file
    --help

e.g.

sh seata-server.sh -p 8091 -h 127.0.0.1 -m file
           

PS:自定義指令:

cd /usr/local/seata-server-1.4.0-SNAPSHOT/seata/bin

nohup ./seata-server.sh -h 127.0.0.1 -p 8091 >seata.log 2>&1 & 
           

預設是file類型啟動,可以在conf下的file.conf和registry.conf修改。我這裡用的是redis。

二、在各個服務裡引入seata依賴:

<!-- https://mvnrepository.com/artifact/io.seata/seata-spring-boot-starter -->
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>1.3.0</version>
</dependency>

           

目前(2020-07)最新版本是1.3.0

三、在yml配置檔案裡配置seata

# seata配置
seata:
  enabled: false
  application-id: cargo
  tx-service-group: my_test_tx_group
  service:
    vgroup-mapping:
      my_test_tx_group: default
    grouplist:
      default: 172.31.1.191:8091

           

注意:用戶端和服務端都要在同一個tx-service-group事務組裡!application-id是區分每個服務的辨別,如果為空,自動取服務名name。

四、在調用方的業務方法上加上全局事務注解@GlobalTransactional

/**
     * 儲存訂單,同時儲存訂單商品資訊。
     *
     * @param order
     * @return com.yorma.entity.YmMsg<com.yorma.cargo.entity.Order>
     * @author zhangchao
     * @date 2020/7/16 15:16
     **/
@GlobalTransactional
@Override
public YmMsg<Order> saveOrder(Order order) {
    PubOperateHistory operateHistory = new PubOperateHistory();
    operateHistory.setLogTime(new Date());
    operateHistory.setUser("admin"); // TODO zhangchao 2020/7/8 先寫死,以後根據token從redis裡擷取!
    //        operateHistory.setType(this.getClass().getName());
    operateHistory.setType("Order");
    operateHistory.setNote(Thread.currentThread().getStackTrace()[1].getMethodName());
    // 新增
    if (null == order.getId()) {
        if (isNotEmpty(order.getOrderDetails())) {
            orderDetailsService.saveBatch(order.getOrderDetails());
        }
        baseMapper.insert(order);
        operateHistory.setContent("新增訂單,id:" + order.getId());
        operateHistory.setSourceId(order.getId().toString());
        // 編輯
    } else {
        List<OrderDetail> orderDetails = orderDetailsService.list(new QueryWrapper<OrderDetail>().lambda()
                                                                  .eq(OrderDetail::getOrderId, order.getId()));
        // 先清一遍子表
        if (isNotEmpty(orderDetails)) {
            orderDetails.forEach(orderDetail -> orderDetailsService.removeById(orderDetail.getId()));
        }
        if (isNotEmpty(order.getOrderDetails())) {
            orderDetailsService.saveBatch(order.getOrderDetails());
        }
        baseMapper.updateById(order);
        operateHistory.setContent("編輯訂單,id:" + order.getId());
        operateHistory.setSourceId(order.getId().toString());
    }
    CommonApi commonApi = rpc.createProxy("/basic/common", CommonApi.class);
    YmMsg ymMsg = commonApi.saveHistory(operateHistory, RootContext.getXID());
    if (ymMsg.isSuccess()) {
        return YmMsg.ok("儲存成功!", Order.class);
    } else {
        throw new RuntimeException("操作曆史儲存失敗!");
    }
    //        int i = 10 / 0;
    //        return YmMsg.ok("儲存成功!", Order.class);
}
           

注意:被調用方不需要加@GlobalTransactional!

被調用方不需要加@GlobalTransactional!

被調用方不需要加@GlobalTransactional!

坑點:

  • 如果本地事務包着seata全局事務,會導緻seata擷取不到全局鎖而出現二階段送出失敗異常!!

本人做的事務測試:

微服務整合Seata分布式事務處理

繼續閱讀