友情提示:本文略長略複雜,但是有配套的視訊教程。
在前面的文章中,松哥和大家聊了分布式事務架構 seata 的 at 模式,然後有小夥伴評論說 seata 的 tcc 模式不支援 Spring Boot:
這。。。必然是支援的呀!
我一直覺得網上講分布式事務的理論很多,案例代碼很少,是以咱們今天就整一個例子,一起來把這個捋一捋。
1. 什麼是 TCC 模式
相比于上篇文章所聊的 AT 模式,TCC(Try-Confirm-Cancel) 模式就帶一點手動的感覺了,它也是兩階段送出的演化,但是和 AT 又不太一樣,我們來看下流程。
官網上有一張 TCC 的流程圖,我們來看下:
可以看到,TCC 也是分為兩階段:
- 第一階段是 prepare,在這個階段主要是做資源的檢測和預留工作,例如銀行轉賬,這個階段就先去檢查下使用者的錢夠不夠,不夠就直接抛異常,夠就先給當機上。
- 第二階段是 commit 或 rollback,這個主要是等各個分支事務的一階段都執行完畢,都執行完畢後各自将自己的情況報告給 TC,TC 一統計,發現各個分支事務都沒有異常,那麼就通知大家一起送出;如果 TC 發現有分支事務發生異常了,那麼就通知大家復原。
那麼小夥伴可能也發現了,上面這個流程中,一共涉及到了三個方法,prepare、commit 以及 rollback,這三個方法都完全是使用者自定義的方法,都是需要我們自己來實作的,是以我一開始就說 TCC 是一種手動的模式。
和 AT 相比,大家發現 TCC 這種模式其實是不依賴于底層資料庫的事務支援的,也就是說,哪怕你底層資料庫不支援事務也沒關系,反正 prepare、commit 以及 rollback 三個方法都是開發者自己寫的,我們自己将這三個方法對應的流程捋順就行了。
在上篇文章的中,我們講 AT 模式,每個資料庫都需要有一個 undo log 表,這個表用來記錄一條資料更改之前和更改之後的狀态(前鏡像和後鏡像),如果所有分支事務最終都送出成功,那麼記錄在 undo log 表中的資料就會自動删除;如果有一個分支事務執行失敗,導緻所有事務都需要復原,那麼就會以 undo log 表中的資料會依據,生成反向補償語句,利用反向補償語句将資料複原,執行完成後也會删除 undo log 表中的記錄。
在這個流程中,大家看到,undo log 表扮演了非常重要的角色。TCC 和 AT 最大的差別在于,TCC 中的送出和復原邏輯都是開發者自己寫的,而 AT 都是架構自動完成的。
為了友善大家了解,本文我就不重新搞案例了,咱們還用上篇文章那個下訂單的案例來示範。
2. 案例回顧
這是一個商品下單的案例,一共有五個服務,我來和大家稍微解釋下:
- eureka:這是服務注冊中心。
- account:這是賬戶服務,可以查詢/修改使用者的賬戶資訊(主要是賬戶餘額)。
- order:這是訂單服務,可以下訂單。
- storage:這是一個倉儲服務,可以查詢/修改商品的庫存數量。
- bussiness:這是業務,使用者下單操作将在這裡完成。
這個案例講了一個什麼事呢?
當使用者想要下單的時候,調用了 bussiness 中的接口,bussiness 中的接口又調用了它自己的 service,在 service 中,首先開啟了全局分布式事務,然後通過 feign 調用 storage 中的接口去扣庫存,然後再通過 feign 調用 order 中的接口去建立訂單(order 在建立訂單的時候,不僅會建立訂單,還會扣除使用者賬戶的餘額),在這個過程中,如果有任何一個環節出錯了(餘額不足、庫存不足等導緻的問題),就會觸發整體的事務復原。
本案例具體架構如下圖:
這個案例就是一個典型的分布式事務問題,storage、order 以及 account 中的事務分屬于不同的微服務,但是我們希望他們同時成功或者同時失敗。
這個案例的基本架構我這裡就不重複搭建了,小夥伴們可以參考上篇文章,這裡我們主要來看 TCC 事務如何添加進來。
3. 重新設計資料庫
首先我們将上篇文章中的資料庫來重新設計一下,友善我們本文的使用。
賬戶表增加一個當機金額的字段,如下:
訂單表和前文保持一緻,不變。
庫存表也增加一個當機庫存數量的字段,如下:
另外,由于我們這裡不再使用 AT 模式,是以可以删除之前的 undo_log 表了(可能有小夥伴删除 undo_log 表之後,會報錯,那是因為你 TCC 模式使用不對,注意看松哥後面的講解哦)。
相關的資料庫腳本小夥伴們可以在文末下載下傳,這裡我就不列出來了。
4. 重新設計 Feign 接口
在 TCC 模式中,我們的 Feign 換一種方式來配置。
小夥伴們都知道,在上篇文章的案例中,我們有一個 common 子產品,用來存放一些公共内容(實際上我們隻是存儲了 RespBean),現在我們把這裡涉及到的 OpenFeign 接口也存儲進來,一共是三個 OpenFeign 接口,因為還要用到 seata 中的注解,是以我們在 common 中引入 OpenFeign 和 seata 的依賴,如下:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
然後在這裡定義 OpenFeign 的三個接口,如下:
@LocalTCC
public interface AccountServiceApi {
@PostMapping("/account/deduct/prepare")
@TwoPhaseBusinessAction(name = "accountServiceApi", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepare(@RequestBody BusinessActionContext actionContext, @RequestParam("userId") @BusinessActionContextParameter(paramName = "userId") String userId, @RequestParam("money") @BusinessActionContextParameter(paramName = "money") Double money);
@RequestMapping("/account/deduct/commit")
boolean commit(@RequestBody BusinessActionContext actionContext);
@RequestMapping("/account/deduct/rollback")
boolean rollback(@RequestBody BusinessActionContext actionContext);
}
@LocalTCC
public interface OrderServiceApi {
@PostMapping("/order/create/prepare")
@TwoPhaseBusinessAction(name = "orderServiceApi", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepare(@RequestBody BusinessActionContext actionContext, @RequestParam("userId") @BusinessActionContextParameter(paramName = "userId") String userId, @RequestParam("productId") @BusinessActionContextParameter(paramName = "productId") String productId, @RequestParam("count") @BusinessActionContextParameter(paramName = "count") Integer count);
@RequestMapping("/order/create/commit")
boolean commit(@RequestBody BusinessActionContext actionContext);
@RequestMapping("/order/create/rollback")
boolean rollback(@RequestBody BusinessActionContext actionContext);
}
@LocalTCC
public interface StorageServiceApi {
@PostMapping("/storage/deduct/prepare")
@TwoPhaseBusinessAction(name = "storageServiceApi",commitMethod = "commit",rollbackMethod = "rollback")
boolean deduct(@RequestBody BusinessActionContext actionContext, @RequestParam("productId")@BusinessActionContextParameter(paramName = "productId") String productId, @RequestParam("count") @BusinessActionContextParameter(paramName = "count") Integer count);
@RequestMapping("/storage/deduct/commit")
boolean commit(@RequestBody BusinessActionContext actionContext);
@RequestMapping("/storage/deduct/rollback")
boolean rollback(@RequestBody BusinessActionContext actionContext);
}
這裡一共有三個接口,但是隻要大家搞懂其中一個,另外兩個都很好懂了。我這裡就以 AccountServiceApi 為例來和大家講解吧。
- 首先接口的定義上,需要加一個注解 @LocalTCC,這個表示開啟 seata 中的 TCC 模式。
- 然後就是 @TwoPhaseBusinessAction 注解,兩階段送出的注解,這個注解有三個屬性,第一個 name 就是處理兩階段送出的 bean 的名字,其實就是目前 bean 的名字,目前類名首字母小寫。兩階段第一階段就是 prepare 階段,也就是執行 @TwoPhaseBusinessAction 注解所在的方法,第二階段則分為兩種情況,送出或者復原,分别對應了兩個不同的方法,commitMethod 和 rollbackMethod 就指明了相應的方法。
- 一階段的 prepare 需要開發者手動調用,二階段的 commit 或者 rollback 則是系統自動調用。prepare 中的方法是由開發者來傳遞的,而在二階段的方法中,相關的參數我們需要從 BusinessActionContext 中擷取,@BusinessActionContextParameter 注解就是将對應的參數放入到 BusinessActionContext 中(注意需要給每一個參數取一個名字),将來可以從 BusinessActionContext 中取出對應的參數。
- 另外需要注意,接口的傳回值設計成 boolean,用以表示相應的操作執行成功還是失敗,傳回 false 表示執行失敗,預設會有重試機制進行重試。
這是 AccountServiceApi,另外兩個接口的設計也是大同小異,這裡我就不再贅述。
接下來看接口的實作。
5. Account
首先我們來看看 Account 服務。AccountController 實作 AccountServiceApi。
我們來看下 AccountController 的定義:
@RestController
public class AccountController implements AccountServiceApi {
@Autowired
AccountService accountService;
@Override
public boolean prepare(BusinessActionContext actionContext, String userId, Double money) {
return accountService.prepareDeduct(userId, money);
}
@Override
public boolean commit(BusinessActionContext actionContext) {
return accountService.commitDeduct(actionContext);
}
@Override
public boolean rollback(BusinessActionContext actionContext) {
return accountService.rollbackDeduct(actionContext);
}
}
因為接口的路徑都定義在 AccountServiceApi 中了,是以這裡隻需要簡單實作即可,核心的處理邏輯在 AccountService 中,我們來看下 AccountService:
@Service
public class AccountService {
private static final Logger logger = LoggerFactory.getLogger(AccountService.class);
@Autowired
AccountMapper accountMapper;
/**
* 預扣款階段
* 檢查賬戶餘額
*
* @param userId
* @param money
* @return
*/
@Transactional(rollbackFor = Exception.class)
public boolean prepareDeduct(String userId, Double money) {
Account account = accountMapper.getAccountByUserId(userId);
if (account == null) {
throw new RuntimeException("賬戶不存在");
}
if (account.getMoney() < money) {
throw new RuntimeException("餘額不足,預扣款失敗");
}
account.setFreezeMoney(account.getFreezeMoney() + money);
account.setMoney(account.getMoney() - money);
Integer i = accountMapper.updateAccount(account);
logger.info("{} 賬戶預扣款 {} 元", userId, money);
return i == 1;
}
/**
* 實際扣款階段
*
* @param actionContext
* @return
*/
@Transactional(rollbackFor = Exception.class)
public boolean commitDeduct(BusinessActionContext actionContext) {
String userId = (String) actionContext.getActionContext("userId");
Double money = ((BigDecimal) actionContext.getActionContext("money")).doubleValue();
Account account = accountMapper.getAccountByUserId(userId);
if (account.getFreezeMoney() < money) {
throw new RuntimeException("餘額不足,扣款失敗");
}
account.setFreezeMoney(account.getFreezeMoney() - money);
Integer i = accountMapper.updateAccount(account);
logger.info("{} 賬戶扣款 {} 元", userId, money);
return i == 1;
}
/**
* 賬戶復原階段
*
* @param actionContext
* @return
*/
@Transactional(rollbackFor = Exception.class)
public boolean rollbackDeduct(BusinessActionContext actionContext) {
String userId = (String) actionContext.getActionContext("userId");
Double money = ((BigDecimal) actionContext.getActionContext("money")).doubleValue();
Account account = accountMapper.getAccountByUserId(userId);
if (account.getFreezeMoney() >= money) {
account.setMoney(account.getMoney() + money);
account.setFreezeMoney(account.getFreezeMoney() - money);
Integer i = accountMapper.updateAccount(account);
logger.info("{} 賬戶釋放當機金額 {} 元", userId, money);
return i == 1;
}
logger.info("{} 賬戶資金已釋放",userId);
//說明prepare中抛出異常,未當機資金
return true;
}
}
- AccountService 裡一共有三個方法,在整個兩階段送出中,一階段執行 prepareDeduct 方法,二階段執行 commitDeduct 或者 rollbackDeduct 方法。
- 在 prepareDeduct 中,我們主要檢查一下賬戶是否存在,賬戶餘額是否充足,餘額充足就将本次消費的金額當機起來,當機的邏輯就是給 freezeMoney 字段增加本次消費金額,從 money 字段減少本次消費金額。
- 等到其他幾個服務的一階段方法都執行完成後,都沒有抛出異常,此時就執行二階段的送出方法,對應這裡就是 commitDeduct 方法;如果其他服務的一階段執行過程中,抛出了異常,那麼就執行二階段的復原方法,對應這裡的 rollbackDeduct。
- 在 commitDeduct 方法中,首先從 BusinessActionContext 中提取出來我們需要的參數(因為這個方法是系統自動調用的,不是我們手動調用,是以沒法自己傳參數進來,隻能通過 BusinessActionContext 來擷取),然後再檢查一下餘額是否充足,沒問題就把當機的資金劃掉,就算扣款完成了。
- 在 rollbackDeduct 方法中,也是先從 BusinessActionContext 中擷取相應的參數,檢查一下當機的金額,沒問題就把當機的金額恢複到 money 字段上(如果沒進入 if 分支,則說明 prepare 中抛出異常,未當機資金)。
好了,這就是從賬戶扣錢的兩階段操作,資料庫操作比較簡單,我這裡就不列出來了,文末可以下載下傳源碼。
6. Order
再來看訂單服務。
由于我們是在 order 中調用 account 完成賬戶扣款的,是以需要先在 order 中加入 account 的 OpenFeign 調用,如下:
@FeignClient("account")
public interface AccountServiceApiImpl extends AccountServiceApi {
}
這應該沒啥好解釋的。
接下來我們來看 OrderController:
@RestController
public class OrderController implements OrderServiceApi {
@Autowired
OrderService orderService;
@Override
public boolean prepare(BusinessActionContext actionContext, String userId, String productId, Integer count) {
return orderService.prepareCreateOrder(actionContext,userId, productId, count);
}
@Override
public boolean commit(BusinessActionContext actionContext) {
return orderService.commitOrder(actionContext);
}
@Override
public boolean rollback(BusinessActionContext actionContext) {
return orderService.rollbackOrder(actionContext);
}
}
這個跟 AccountService 也基本一緻,實作了 OrderServiceApi 接口,接口位址啥的都定義在 OrderServiceApi 中,這個類重點還是在 OrderService 中,如下:
@Service
public class OrderService {
private static final Logger logger = LoggerFactory.getLogger(OrderService.class);
@Autowired
AccountServiceApi accountServiceApi;
@Autowired
OrderMapper orderMapper;
@Transactional(rollbackFor = Exception.class)
public boolean prepareCreateOrder(BusinessActionContext actionContext, String userId, String productId, Integer count) {
//先去扣款,假設每個産品100塊錢
boolean resp = accountServiceApi.prepare(actionContext, userId, count * 100.0);
logger.info("{} 使用者購買的 {} 商品共計 {} 件,預下單成功", userId, productId, count);
return resp;
}
@Transactional(rollbackFor = Exception.class)
public boolean commitOrder(BusinessActionContext actionContext) {
String userId = (String) actionContext.getActionContext("userId");
String productId = (String) actionContext.getActionContext("productId");
Integer count = (Integer) actionContext.getActionContext("count");
int i = orderMapper.addOrder(userId, productId, count, count * 100.0);
logger.info("{} 使用者購買的 {} 商品共計 {} 件,下單成功", userId, productId, count);
return i==1;
}
@Transactional(rollbackFor = Exception.class)
public boolean rollbackOrder(BusinessActionContext actionContext) {
String userId = (String) actionContext.getActionContext("userId");
String productId = (String) actionContext.getActionContext("productId");
Integer count = (Integer) actionContext.getActionContext("count");
logger.info("{} 使用者購買的 {} 商品共計 {} 件,訂單復原成功", userId, productId, count);
return true;
}
}
跟之前的 AccountService 一樣,這裡也是三個核心方法:
- prepareCreateOrder:這裡主要是調用了一下賬戶的方法,去檢查下看下錢夠不。一階段就做個這事。
- commitOrder:二階段如果是送出的話,就向資料庫中添加一條訂單記錄。
- rollbackOrder:二階段如果是復原的話,就什麼事情都不做,打個日志就行了。
好了,這就是下單的操作。
7. Storage
最後我們再來看看扣庫存的操作,這個跟扣款比較像,一起來看下:
@RestController
public class StorageController implements StorageServiceApi {
@Autowired
StorageService storageService;
@Override
public boolean deduct(BusinessActionContext actionContext, String productId, Integer count) {
return storageService.prepareDeduct(productId, count);
}
@Override
public boolean commit(BusinessActionContext actionContext) {
return storageService.commitDeduct(actionContext);
}
@Override
public boolean rollback(BusinessActionContext actionContext) {
return storageService.rollbackDeduct(actionContext);
}
}
核心邏輯在 StorageService 中,如下:
@Service
public class StorageService {
private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
@Autowired
StorageMapper storageMapper;
/**
* 預扣庫存
*
* @param productId
* @param count
* @return
*/
@Transactional(rollbackFor = Exception.class)
public boolean prepareDeduct(String productId, Integer count) {
Storage storage = storageMapper.getStorageByProductId(productId);
if (storage == null) {
throw new RuntimeException("商品不存在");
}
if (storage.getCount() < count) {
throw new RuntimeException("庫存不足,預扣庫存失敗");
}
storage.setFreezeCount(storage.getFreezeCount() + count);
storage.setCount(storage.getCount() - count);
int i = storageMapper.updateStorage(storage);
logger.info("{} 商品庫存當機 {} 個", productId, count);
return i == 1;
}
/**
* 扣庫存
*
* @param actionContext
* @return
*/
@Transactional(rollbackFor = Exception.class)
public boolean commitDeduct(BusinessActionContext actionContext) {
String productId = (String) actionContext.getActionContext("productId");
Integer count = (Integer) actionContext.getActionContext("count");
Storage storage = storageMapper.getStorageByProductId(productId);
if (storage.getFreezeCount() < count) {
throw new RuntimeException("庫存不足,扣庫存失敗");
}
storage.setFreezeCount(storage.getFreezeCount() - count);
int i = storageMapper.updateStorage(storage);
logger.info("{} 商品庫存扣除 {} 個", productId, count);
return i == 1;
}
@Transactional(rollbackFor = Exception.class)
public boolean rollbackDeduct(BusinessActionContext actionContext) {
String productId = (String) actionContext.getActionContext("productId");
Integer count = (Integer) actionContext.getActionContext("count");
Storage storage = storageMapper.getStorageByProductId(productId);
if (storage.getFreezeCount() >= count) {
storage.setFreezeCount(storage.getFreezeCount() - count);
storage.setCount(storage.getCount() + count);
int i = storageMapper.updateStorage(storage);
logger.info("{} 商品釋放庫存 {} 個", productId, count);
return i == 1;
}
//說明 prepare 階段就沒有當機
return true;
}
}
這個跟 AccountService 的邏輯基本上是一樣的,我就不多做解釋了。
8. Business
最後再來看看調用的入口 Business。Business 中要調用 storage 和 order,是以先把這兩個的 OpenFeign 整進來:
@FeignClient("order")
public interface OrderServiceApiImpl extends OrderServiceApi {
}
@FeignClient("storage")
public interface StorageServiceApiImpl extends StorageServiceApi {
}
然後看下接口調用:
@RestController
public class BusinessController {
@Autowired
BusinessService businessService;
@PostMapping("/order")
public RespBean order(String account, String productId, Integer count) {
try {
businessService.purchase(account, productId, count);
return RespBean.ok("下單成功");
} catch (Exception e) {
e.printStackTrace();
return RespBean.error("下單失敗", e.getMessage());
}
}
}
@Service
public class BusinessService {
@Autowired
StorageServiceApi storageServiceApi;
@Autowired
OrderServiceApi orderServiceApi;
@GlobalTransactional
public void purchase(String account, String productId, Integer count) {
String xid = RootContext.getXID();
BusinessActionContext actionContext = new BusinessActionContext();
actionContext.setXid(xid);
storageServiceApi.deduct(actionContext, productId, count);
orderServiceApi.prepare(actionContext, account, productId, count);
}
}
BusinessService 中通過 RootContext 擷取全局事務 ID,然後構造一個 BusinessActionContext 對象,開始整個流程的調用。
好啦,大功告成。
9. 測試
最後再來個簡單測試,成功的測試:
調用失敗的測試: