天天看點

聽說 TCC 不支援 OpenFeign?這個坑松哥必須給大家填了

友情提示:本文略長略複雜,但是有配套的視訊教程。

在前面的文章中,松哥和大家聊了分布式事務架構 seata 的 at 模式,然後有小夥伴評論說 seata 的 tcc 模式不支援 Spring Boot:

聽說 TCC 不支援 OpenFeign?這個坑松哥必須給大家填了

這。。。必然是支援的呀!

我一直覺得網上講分布式事務的理論很多,案例代碼很少,是以咱們今天就整一個例子,一起來把這個捋一捋。

1. 什麼是 TCC 模式

相比于​​上篇文章​​所聊的 AT 模式,TCC(Try-Confirm-Cancel) 模式就帶一點手動的感覺了,它也是兩階段送出的演化,但是和 AT 又不太一樣,我們來看下流程。

官網上有一張 TCC 的流程圖,我們來看下:

聽說 TCC 不支援 OpenFeign?這個坑松哥必須給大家填了

可以看到,TCC 也是分為兩階段:

  • 第一階段是 prepare,在這個階段主要是做資源的檢測和預留工作,例如銀行轉賬,這個階段就先去檢查下使用者的錢夠不夠,不夠就直接抛異常,夠就先給當機上。
  • 第二階段是 commit 或 rollback,這個主要是等各個分支事務的一階段都執行完畢,都執行完畢後各自将自己的情況報告給 TC,TC 一統計,發現各個分支事務都沒有異常,那麼就通知大家一起送出;如果 TC 發現有分支事務發生異常了,那麼就通知大家復原。

那麼小夥伴可能也發現了,上面這個流程中,一共涉及到了三個方法,prepare、commit 以及 rollback,這三個方法都完全是使用者自定義的方法,都是需要我們自己來實作的,是以我一開始就說 TCC 是一種手動的模式。

和 AT 相比,大家發現 TCC 這種模式其實是不依賴于底層資料庫的事務支援的,也就是說,哪怕你底層資料庫不支援事務也沒關系,反正 prepare、commit 以及 rollback 三個方法都是開發者自己寫的,我們自己将這三個方法對應的流程捋順就行了。

在​​上篇文章​​的中,我們講 AT 模式,每個資料庫都需要有一個 undo log 表,這個表用來記錄一條資料更改之前和更改之後的狀态(前鏡像和後鏡像),如果所有分支事務最終都送出成功,那麼記錄在 undo log 表中的資料就會自動删除;如果有一個分支事務執行失敗,導緻所有事務都需要復原,那麼就會以 undo log 表中的資料會依據,生成反向補償語句,利用反向補償語句将資料複原,執行完成後也會删除 undo log 表中的記錄。

在這個流程中,大家看到,undo log 表扮演了非常重要的角色。TCC 和 AT 最大的差別在于,TCC 中的送出和復原邏輯都是開發者自己寫的,而 AT 都是架構自動完成的。

為了友善大家了解,本文我就不重新搞案例了,咱們還用​​上篇文章​​那個下訂單的案例來示範。

2. 案例回顧

這是一個商品下單的案例,一共有五個服務,我來和大家稍微解釋下:

  • eureka:這是服務注冊中心。
  • account:這是賬戶服務,可以查詢/修改使用者的賬戶資訊(主要是賬戶餘額)。
  • order:這是訂單服務,可以下訂單。
  • storage:這是一個倉儲服務,可以查詢/修改商品的庫存數量。
  • bussiness:這是業務,使用者下單操作将在這裡完成。

這個案例講了一個什麼事呢?

當使用者想要下單的時候,調用了 bussiness 中的接口,bussiness 中的接口又調用了它自己的 service,在 service 中,首先開啟了全局分布式事務,然後通過 feign 調用 storage 中的接口去扣庫存,然後再通過 feign 調用 order 中的接口去建立訂單(order 在建立訂單的時候,不僅會建立訂單,還會扣除使用者賬戶的餘額),在這個過程中,如果有任何一個環節出錯了(餘額不足、庫存不足等導緻的問題),就會觸發整體的事務復原。

本案例具體架構如下圖:

聽說 TCC 不支援 OpenFeign?這個坑松哥必須給大家填了

這個案例就是一個典型的分布式事務問題,storage、order 以及 account 中的事務分屬于不同的微服務,但是我們希望他們同時成功或者同時失敗。

這個案例的基本架構我這裡就不重複搭建了,小夥伴們可以參考​​上篇文章​​,這裡我們主要來看 TCC 事務如何添加進來。

3. 重新設計資料庫

首先我們将​​上篇文章​​中的資料庫來重新設計一下,友善我們本文的使用。

賬戶表增加一個當機金額的字段,如下:

聽說 TCC 不支援 OpenFeign?這個坑松哥必須給大家填了

訂單表和前文保持一緻,不變。

庫存表也增加一個當機庫存數量的字段,如下:

聽說 TCC 不支援 OpenFeign?這個坑松哥必須給大家填了

另外,由于我們這裡不再使用 AT 模式,是以可以删除之前的 undo_log 表了(可能有小夥伴删除 undo_log 表之後,會報錯,那是因為你 TCC 模式使用不對,注意看松哥後面的講解哦)。

相關的資料庫腳本小夥伴們可以在文末下載下傳,這裡我就不列出來了。

4. 重新設計 Feign 接口

在 TCC 模式中,我們的 Feign 換一種方式來配置。

小夥伴們都知道,在​​上篇文章​​的案例中,我們有一個 common 子產品,用來存放一些公共内容(實際上我們隻是存儲了 RespBean),現在我們把這裡涉及到的 OpenFeign 接口也存儲進來,一共是三個 OpenFeign 接口,因為還要用到 seata 中的注解,是以我們在 common 中引入 OpenFeign 和 seata 的依賴,如下:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
    <version>2.2.6.RELEASE</version>
</dependency>      

然後在這裡定義 OpenFeign 的三個接口,如下:

@LocalTCC
public interface AccountServiceApi {
    @PostMapping("/account/deduct/prepare")
    @TwoPhaseBusinessAction(name = "accountServiceApi", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepare(@RequestBody BusinessActionContext actionContext, @RequestParam("userId") @BusinessActionContextParameter(paramName = "userId") String userId, @RequestParam("money") @BusinessActionContextParameter(paramName = "money") Double money);

    @RequestMapping("/account/deduct/commit")
    boolean commit(@RequestBody BusinessActionContext actionContext);

    @RequestMapping("/account/deduct/rollback")
    boolean rollback(@RequestBody BusinessActionContext actionContext);
}
@LocalTCC
public interface OrderServiceApi {
    @PostMapping("/order/create/prepare")
    @TwoPhaseBusinessAction(name = "orderServiceApi", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepare(@RequestBody BusinessActionContext actionContext, @RequestParam("userId") @BusinessActionContextParameter(paramName = "userId") String userId, @RequestParam("productId") @BusinessActionContextParameter(paramName = "productId") String productId, @RequestParam("count") @BusinessActionContextParameter(paramName = "count") Integer count);

    @RequestMapping("/order/create/commit")
    boolean commit(@RequestBody BusinessActionContext actionContext);

    @RequestMapping("/order/create/rollback")
    boolean rollback(@RequestBody BusinessActionContext actionContext);
}
@LocalTCC
public interface StorageServiceApi {
    @PostMapping("/storage/deduct/prepare")
    @TwoPhaseBusinessAction(name = "storageServiceApi",commitMethod = "commit",rollbackMethod = "rollback")
    boolean deduct(@RequestBody BusinessActionContext actionContext, @RequestParam("productId")@BusinessActionContextParameter(paramName = "productId") String productId, @RequestParam("count") @BusinessActionContextParameter(paramName = "count") Integer count);

    @RequestMapping("/storage/deduct/commit")
    boolean commit(@RequestBody BusinessActionContext actionContext);

    @RequestMapping("/storage/deduct/rollback")
    boolean rollback(@RequestBody BusinessActionContext actionContext);
}      

這裡一共有三個接口,但是隻要大家搞懂其中一個,另外兩個都很好懂了。我這裡就以 AccountServiceApi 為例來和大家講解吧。

  • 首先接口的定義上,需要加一個注解 @LocalTCC,這個表示開啟 seata 中的 TCC 模式。
  • 然後就是 @TwoPhaseBusinessAction 注解,兩階段送出的注解,這個注解有三個屬性,第一個 name 就是處理兩階段送出的 bean 的名字,其實就是目前 bean 的名字,目前類名首字母小寫。兩階段第一階段就是 prepare 階段,也就是執行 @TwoPhaseBusinessAction 注解所在的方法,第二階段則分為兩種情況,送出或者復原,分别對應了兩個不同的方法,commitMethod 和 rollbackMethod 就指明了相應的方法。
  • 一階段的 prepare 需要開發者手動調用,二階段的 commit 或者 rollback 則是系統自動調用。prepare 中的方法是由開發者來傳遞的,而在二階段的方法中,相關的參數我們需要從 BusinessActionContext 中擷取,@BusinessActionContextParameter 注解就是将對應的參數放入到 BusinessActionContext 中(注意需要給每一個參數取一個名字),将來可以從 BusinessActionContext 中取出對應的參數。
  • 另外需要注意,接口的傳回值設計成 boolean,用以表示相應的操作執行成功還是失敗,傳回 false 表示執行失敗,預設會有重試機制進行重試。

這是 AccountServiceApi,另外兩個接口的設計也是大同小異,這裡我就不再贅述。

接下來看接口的實作。

5. Account

首先我們來看看 Account 服務。AccountController 實作 AccountServiceApi。

我們來看下 AccountController 的定義:

@RestController
public class AccountController implements AccountServiceApi {
    @Autowired
    AccountService accountService;

    @Override
    public boolean prepare(BusinessActionContext actionContext, String userId, Double money) {
        return accountService.prepareDeduct(userId, money);
    }

    @Override
    public boolean commit(BusinessActionContext actionContext) {
        return accountService.commitDeduct(actionContext);
    }

    @Override
    public boolean rollback(BusinessActionContext actionContext) {
        return accountService.rollbackDeduct(actionContext);
    }
}      

因為接口的路徑都定義在 AccountServiceApi 中了,是以這裡隻需要簡單實作即可,核心的處理邏輯在 AccountService 中,我們來看下 AccountService:

@Service
public class AccountService {

    private static final Logger logger = LoggerFactory.getLogger(AccountService.class);

    @Autowired
    AccountMapper accountMapper;

    /**
     * 預扣款階段
     * 檢查賬戶餘額
     *
     * @param userId
     * @param money
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean prepareDeduct(String userId, Double money) {
        Account account = accountMapper.getAccountByUserId(userId);
        if (account == null) {
            throw new RuntimeException("賬戶不存在");
        }
        if (account.getMoney() < money) {
            throw new RuntimeException("餘額不足,預扣款失敗");
        }
        account.setFreezeMoney(account.getFreezeMoney() + money);
        account.setMoney(account.getMoney() - money);
        Integer i = accountMapper.updateAccount(account);
        logger.info("{} 賬戶預扣款 {} 元", userId, money);
        return i == 1;
    }

    /**
     * 實際扣款階段
     *
     * @param actionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean commitDeduct(BusinessActionContext actionContext) {
        String userId = (String) actionContext.getActionContext("userId");
        Double money = ((BigDecimal) actionContext.getActionContext("money")).doubleValue();
        Account account = accountMapper.getAccountByUserId(userId);
        if (account.getFreezeMoney() < money) {
            throw new RuntimeException("餘額不足,扣款失敗");
        }
        account.setFreezeMoney(account.getFreezeMoney() - money);
        Integer i = accountMapper.updateAccount(account);
        logger.info("{} 賬戶扣款 {} 元", userId, money);
        return i == 1;
    }

    /**
     * 賬戶復原階段
     *
     * @param actionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean rollbackDeduct(BusinessActionContext actionContext) {
        String userId = (String) actionContext.getActionContext("userId");
        Double money = ((BigDecimal) actionContext.getActionContext("money")).doubleValue();
        Account account = accountMapper.getAccountByUserId(userId);
        if (account.getFreezeMoney() >= money) {
            account.setMoney(account.getMoney() + money);
            account.setFreezeMoney(account.getFreezeMoney() - money);
            Integer i = accountMapper.updateAccount(account);
            logger.info("{} 賬戶釋放當機金額 {} 元", userId, money);
            return i == 1;
        }
        logger.info("{} 賬戶資金已釋放",userId);
        //說明prepare中抛出異常,未當機資金
        return true;
    }
}      
  • AccountService 裡一共有三個方法,在整個兩階段送出中,一階段執行 prepareDeduct 方法,二階段執行 commitDeduct 或者 rollbackDeduct 方法。
  • 在 prepareDeduct 中,我們主要檢查一下賬戶是否存在,賬戶餘額是否充足,餘額充足就将本次消費的金額當機起來,當機的邏輯就是給 freezeMoney 字段增加本次消費金額,從 money 字段減少本次消費金額。
  • 等到其他幾個服務的一階段方法都執行完成後,都沒有抛出異常,此時就執行二階段的送出方法,對應這裡就是 commitDeduct 方法;如果其他服務的一階段執行過程中,抛出了異常,那麼就執行二階段的復原方法,對應這裡的 rollbackDeduct。
  • 在 commitDeduct 方法中,首先從 BusinessActionContext 中提取出來我們需要的參數(因為這個方法是系統自動調用的,不是我們手動調用,是以沒法自己傳參數進來,隻能通過 BusinessActionContext 來擷取),然後再檢查一下餘額是否充足,沒問題就把當機的資金劃掉,就算扣款完成了。
  • 在 rollbackDeduct 方法中,也是先從 BusinessActionContext 中擷取相應的參數,檢查一下當機的金額,沒問題就把當機的金額恢複到 money 字段上(如果沒進入 if 分支,則說明 prepare 中抛出異常,未當機資金)。

好了,這就是從賬戶扣錢的兩階段操作,資料庫操作比較簡單,我這裡就不列出來了,文末可以下載下傳源碼。

6. Order

再來看訂單服務。

由于我們是在 order 中調用 account 完成賬戶扣款的,是以需要先在 order 中加入 account 的 OpenFeign 調用,如下:

@FeignClient("account")
public interface AccountServiceApiImpl extends AccountServiceApi {
}      

這應該沒啥好解釋的。

接下來我們來看 OrderController:

@RestController
public class OrderController implements OrderServiceApi {

    @Autowired
    OrderService orderService;

    @Override
    public boolean prepare(BusinessActionContext actionContext, String userId, String productId, Integer count) {
        return orderService.prepareCreateOrder(actionContext,userId, productId, count);
    }

    @Override
    public boolean commit(BusinessActionContext actionContext) {
        return orderService.commitOrder(actionContext);
    }

    @Override
    public boolean rollback(BusinessActionContext actionContext) {
        return orderService.rollbackOrder(actionContext);
    }
}      

這個跟 AccountService 也基本一緻,實作了 OrderServiceApi 接口,接口位址啥的都定義在 OrderServiceApi 中,這個類重點還是在 OrderService 中,如下:

@Service
public class OrderService {

    private static final Logger logger = LoggerFactory.getLogger(OrderService.class);

    @Autowired
    AccountServiceApi accountServiceApi;

    @Autowired
    OrderMapper orderMapper;

    @Transactional(rollbackFor = Exception.class)
    public boolean prepareCreateOrder(BusinessActionContext actionContext, String userId, String productId, Integer count) {
        //先去扣款,假設每個産品100塊錢
        boolean resp = accountServiceApi.prepare(actionContext, userId, count * 100.0);
        logger.info("{} 使用者購買的 {} 商品共計 {} 件,預下單成功", userId, productId, count);
        return resp;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean commitOrder(BusinessActionContext actionContext) {
        String userId = (String) actionContext.getActionContext("userId");
        String productId = (String) actionContext.getActionContext("productId");
        Integer count = (Integer) actionContext.getActionContext("count");
        int i = orderMapper.addOrder(userId, productId, count, count * 100.0);
        logger.info("{} 使用者購買的 {} 商品共計 {} 件,下單成功", userId, productId, count);
        return i==1;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean rollbackOrder(BusinessActionContext actionContext) {
        String userId = (String) actionContext.getActionContext("userId");
        String productId = (String) actionContext.getActionContext("productId");
        Integer count = (Integer) actionContext.getActionContext("count");
        logger.info("{} 使用者購買的 {} 商品共計 {} 件,訂單復原成功", userId, productId, count);
        return true;
    }
}      

跟之前的 AccountService 一樣,這裡也是三個核心方法:

  • prepareCreateOrder:這裡主要是調用了一下賬戶的方法,去檢查下看下錢夠不。一階段就做個這事。
  • commitOrder:二階段如果是送出的話,就向資料庫中添加一條訂單記錄。
  • rollbackOrder:二階段如果是復原的話,就什麼事情都不做,打個日志就行了。

好了,這就是下單的操作。

7. Storage

最後我們再來看看扣庫存的操作,這個跟扣款比較像,一起來看下:

@RestController
public class StorageController implements StorageServiceApi {

    @Autowired
    StorageService storageService;

    @Override
    public boolean deduct(BusinessActionContext actionContext, String productId, Integer count) {
        return storageService.prepareDeduct(productId, count);
    }

    @Override
    public boolean commit(BusinessActionContext actionContext) {
        return storageService.commitDeduct(actionContext);
    }

    @Override
    public boolean rollback(BusinessActionContext actionContext) {
        return storageService.rollbackDeduct(actionContext);
    }
}      

核心邏輯在 StorageService 中,如下:

@Service
public class StorageService {

    private static final Logger logger = LoggerFactory.getLogger(StorageService.class);

    @Autowired
    StorageMapper storageMapper;

    /**
     * 預扣庫存
     *
     * @param productId
     * @param count
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean prepareDeduct(String productId, Integer count) {
        Storage storage = storageMapper.getStorageByProductId(productId);
        if (storage == null) {
            throw new RuntimeException("商品不存在");
        }
        if (storage.getCount() < count) {
            throw new RuntimeException("庫存不足,預扣庫存失敗");
        }
        storage.setFreezeCount(storage.getFreezeCount() + count);
        storage.setCount(storage.getCount() - count);
        int i = storageMapper.updateStorage(storage);
        logger.info("{} 商品庫存當機 {} 個", productId, count);
        return i == 1;
    }

    /**
     * 扣庫存
     *
     * @param actionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean commitDeduct(BusinessActionContext actionContext) {
        String productId = (String) actionContext.getActionContext("productId");
        Integer count = (Integer) actionContext.getActionContext("count");
        Storage storage = storageMapper.getStorageByProductId(productId);
        if (storage.getFreezeCount() < count) {
            throw new RuntimeException("庫存不足,扣庫存失敗");
        }
        storage.setFreezeCount(storage.getFreezeCount() - count);
        int i = storageMapper.updateStorage(storage);
        logger.info("{} 商品庫存扣除 {} 個", productId, count);
        return i == 1;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean rollbackDeduct(BusinessActionContext actionContext) {
        String productId = (String) actionContext.getActionContext("productId");
        Integer count = (Integer) actionContext.getActionContext("count");
        Storage storage = storageMapper.getStorageByProductId(productId);
        if (storage.getFreezeCount() >= count) {
            storage.setFreezeCount(storage.getFreezeCount() - count);
            storage.setCount(storage.getCount() + count);
            int i = storageMapper.updateStorage(storage);
            logger.info("{} 商品釋放庫存 {} 個", productId, count);
            return i == 1;
        }
        //說明 prepare 階段就沒有當機
        return true;
    }
}      

這個跟 AccountService 的邏輯基本上是一樣的,我就不多做解釋了。

8. Business

最後再來看看調用的入口 Business。Business 中要調用 storage 和 order,是以先把這兩個的 OpenFeign 整進來:

@FeignClient("order")
public interface OrderServiceApiImpl extends OrderServiceApi {
}
@FeignClient("storage")
public interface StorageServiceApiImpl extends StorageServiceApi {
}      

然後看下接口調用:

@RestController
public class BusinessController {
    @Autowired
    BusinessService businessService;

    @PostMapping("/order")
    public RespBean order(String account, String productId, Integer count) {
        try {
            businessService.purchase(account, productId, count);
            return RespBean.ok("下單成功");
        } catch (Exception e) {
            e.printStackTrace();
            return RespBean.error("下單失敗", e.getMessage());
        }
    }
}
@Service
public class BusinessService {
    @Autowired
    StorageServiceApi storageServiceApi;
    @Autowired
    OrderServiceApi orderServiceApi;

    @GlobalTransactional
    public void purchase(String account, String productId, Integer count) {
        String xid = RootContext.getXID();
        BusinessActionContext actionContext = new BusinessActionContext();
        actionContext.setXid(xid);
        storageServiceApi.deduct(actionContext, productId, count);
        orderServiceApi.prepare(actionContext, account, productId, count);
    }
}      

BusinessService 中通過 RootContext 擷取全局事務 ID,然後構造一個 BusinessActionContext 對象,開始整個流程的調用。

好啦,大功告成。

9. 測試

最後再來個簡單測試,成功的測試:

聽說 TCC 不支援 OpenFeign?這個坑松哥必須給大家填了

調用失敗的測試: