前言
在上一篇博文(分布式事務與Seate架構(1)——分布式事務理論)中了解了足夠的分布式事務的理論知識後,到了實踐部分,在工作中雖然用到了Seata,但是自己卻并沒有完全實踐過,是以自己私下花點時間實踐以加深了解,實際上在實踐過程中遇到了很多的坑(比如Seata與SpringCloudAlibaba的整合中版本相容性問題,是個很讓人頭疼的一塊,甚至專門去github提過issue),有時候甚至得跟蹤源碼進行分析,這也使我加強了對閱讀源碼的能力。總之都是要code的。本篇博文主要結合實踐深入講解Seata AT模式!
參考資料《Spring Cloud Alibaba 微服務原理與實戰》(PDF電子書資源,有需要的小夥伴可以評論私信我)、官方wiki
博文中源碼已上傳至github(https://github.com/Jian0110/learning-cloudalibaba),歡迎小夥伴們star...
一、實踐準備工作
1、架構介紹
實踐主要是以“訂單-庫存-賬戶”系統示範,主要的架構圖如下,圖中各個部分充當的分布式事務角色已标明。
具體流程:
1)使用者登入XXX商品購物系統(假設已有賬戶),
2)點選購買某個商品,發起建立訂單請求;
3)檢查購買商品的庫存量,如果不夠則建立訂單失敗提示庫存不足;否則鎖定該商品---->減少庫存--->建立訂單;
4)訂單建立成功後點選付款(或直接付款無需點選,實際上整個Demo中下單之後模拟立馬支付,并不會點選付款);
5)如果購買成功則對賬戶進行餘額進行判斷,餘額足夠則進行減扣,餘額不夠則進行提示說明
6)傳回購買成功失敗提示說明。
2、項目結構
項目結構如下:
mvn package打包運作seata服務,即運作TC伺服器(這裡隻展示單機)
初始化Seata庫,導入sql腳本
二、代碼實踐
這裡隻展示關鍵代碼,全部代碼已送出gituhb:,有需要的小夥伴可以自行擷取
1、“訂單-庫存-賬戶”服務
訂單服務:
TM(microService):seata-order-service
RM(DB Resources):jdbc:mysql://127.0.0.1:3306/order
OrderService:
@GlobalTransactional // TM開啟全局事務
@Transactional(rollbackFor = Exception.class)
public void createOrder(Long productId, BigDecimal price){
// 這裡模拟擷取的是使用者的賬戶ID
// 通過上下文擷取userId再擷取accountId(單個賬戶)
Long accountId = 1L; // 假設已經擷取到了賬戶ID
// 1.rpc調用庫存微服務檢查庫存并減庫存操作
Boolean deductStorageSuccess = storageFeignClient.deduct(productId);
if (!deductStorageSuccess) {
throw new RuntimeException("storage deduct failed!");
}
// 2.建立訂單
ProductOrder order = ProductOrder.builder()
.productId(productId)
.accountId(accountId)
.payAmount(price)
.build();
log.info("create order : {}", order);
// 這裡為了模拟復原,是以先對價格的判斷放到了建立訂單之後,抛出runtime exception
if (price.compareTo(BigDecimal.ZERO) < 0) {
throw new NumberFormatException("product price must greater than zero!");
}
orderMapper.insertSelective(order);
// 3.rpc調用賬戶微服務對餘額檢查并扣款操作
Boolean deductAccountSuccess = accountFeignClient.deduct(accountId, price);
if (!deductAccountSuccess) {
throw new RuntimeException("account deduct failed!");
}
// 4. 回報結果
}
OrderController:
/**
* 模拟建立訂單
* @param productId
* @param price
* @return
*/
@PostMapping("/create")
public String create(Long productId, BigDecimal price){
try {
orderService.createOrder(productId, price);
} catch (Exception e) {
log.error("order failed: ", e);
return "order failed";
}
return "order success";
}
調用的Feign:
@FeignClient(name="seata-account-service")
public interface AccountFeignClient {
@PostMapping("/account/deduct")
Boolean deduct(@RequestParam("accountId") Long accountId, @RequestParam("payAmount") BigDecimal payAmount);
}
@FeignClient(name="seata-storage-service")
public interface StorageFeignClient {
@PostMapping("/storage/deduct")
Boolean deduct(@RequestParam("productId") Long productId);
}
庫存服務:
microService:seata-storage-service
RM(DB Resources):jdbc:mysql://127.0.0.1:3306/storage
StorageService
public Boolean deduct(Long productId){
// 這裡先檢查有沒有庫存了, 生産環境下這裡是需要for update資料庫鎖,或者分布式鎖
Repo repoFromDB = repoMapper.selectByPrimaryKey(productId);
if (repoFromDB == null) {
throw new RuntimeException("product not exist!");
}
// 對庫存減一
int afterCount = repoFromDB.getAmount()-1;
// 沒有庫存剩餘了
if (afterCount < 0) {
throw new RuntimeException("product storage is no remaining!");
}
Repo repo = Repo.builder()
.id(productId)
.amount(afterCount)
.build();
repoMapper.updateAmount(repo);
log.info("deduct product[{}] storage, current amount is {}", productId, afterCount);
return true;
}
StorageController:
/**
* 模拟對商品庫存減一
* @param productId
* @return
*/
@PostMapping("/deduct")
public Boolean deduct(Long productId){
try {
storageService.deduct(productId);
} catch (Exception e) {
return false;
}
return true;
}
賬戶服務:
microService:seata-account-service
RM(DB Resources):jdbc:mysql:127.0.0.1/account
AccountService:
public void deduct(Long accountId, BigDecimal payAmount){
// 這裡先檢查有沒有賬戶存在, 生産環境下這裡是需要for update資料庫鎖,或者分布式鎖
UserAccount userAccountFromDB = userAccountMapper.selectByPrimaryKey(accountId);
if (userAccountFromDB == null) {
throw new RuntimeException("account not exist!");
}
// 檢查餘額是否足夠
BigDecimal afterBalance = userAccountFromDB.getBalance().subtract(payAmount);
if (afterBalance.compareTo(BigDecimal.ZERO) < 0) {
throw new RuntimeException("the balance is not enough!");
}
UserAccount userAccount = UserAccount.builder()
.id(accountId)
.balance(afterBalance)
.build();
log.info("deduct account[{}] , current balance is {}", accountId, afterBalance);
userAccountMapper.updateBalance(userAccount);
}
AccountController:
/**
* 模拟賬戶扣款
* @param accountId
* @param payAmount
* @return
*/
@PostMapping("/deduct")
public Boolean deduct(Long accountId, BigDecimal payAmount){
try {
accountService.deduct(accountId, payAmount);
} catch (Exception e) {
return false;
}
return true;
}
2、Seata伺服器,即TC角色
首先初始化seata的sql腳本(sql腳本參考官方wiki),并開啟seata庫,之後開啟Seata Server(具體的配置與啟動前nacos配置,事務分組等相關概念請參考官方wiki)
3、檢查Nacos服務與配置清單
微服務子產品啟動後快速注冊到dev命名空間下的SEATA_GROUP分組,此時TM、RM、TC都已經具備
啟動微服務子產品後可以看到日志輸出,說明啟動成功并且已經成功注冊
RM will register :jdbc:mysql://127.0.0.1:3306/account
nacos registry, SEATA_GROUP seata-account-service 192.168.99.1:6009 register finished
Started SeataAccountApplication in 30.115 seconds (JVM running for 33.158)
.......
NettyPool create channel to transactionRole:TMROLE,address:169.254.6.29:8091,msg:< RegisterTMRequest{applicationId='seata-account-service', transactionServiceGroup='my_test_tx_group'} >
register TM success. client version:1.4.0, server version:1.4.0,channel:[id: 0xa77dc065, L:/169.254.6.29:52794 - R:/169.254.6.29:8091]
register success, cost 4 ms, version:1.4.0,role:TMROLE,channel:[id: 0xa77dc065, L:/169.254.6.29:52794 - R:/169.254.6.29:8091]
三、運作測試
1、模拟購買支付成功情況
運作啟動所有的微服務後,在TC Serve的日志可以看到所有的TM、RM都已經注冊了
此時productId=1庫存還剩998
accountId=1的使用者餘額還剩1000元
接下來就是模拟使用者購買商品環節,調用http://localhost:6008/order/create,表示使用者想買商品ID=1,價格為12.25的商品
清空日志,并發起請求檢視日志:
16:10:45.167 INFO --- [rverHandlerThread_1_4_500] i.s.s.coordinator.DefaultCoordinator : Begin new global transaction applicationId: seata-order-service,transactionServiceGroup: my_test_tx_group, transactionName: createOrder(java.lang.Long, java.math.BigDecimal),timeout:60000,xid:169.254.6.29:8091:136139747123908608
16:10:45.964 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=169.254.6.29:8091:136139747123908608,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/storage,lockKey=repo:1
,clientIp:169.254.6.29,vgroup:my_test_tx_group
16:10:46.086 INFO --- [rverHandlerThread_1_5_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 169.254.6.29:8091:136139747123908608, branchId = 136139750928142336, resourceId = jdbc:mysql://127.0.0.1:3306/storage ,lockKeys = repo:1
16:10:46.788 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=169.254.6.29:8091:136139747123908608,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/account,lockKey=user_account:1
,clientIp:169.254.6.29,vgroup:my_test_tx_group
16:10:46.918 INFO --- [rverHandlerThread_1_6_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 169.254.6.29:8091:136139747123908608, branchId = 136139754342305793, resourceId = jdbc:mysql://127.0.0.1:3306/account ,lockKeys = user_account:1
16:10:47.015 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : xid=169.254.6.29:8091:136139747123908608,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/order,lockKey=product_order:6,clientIp:169.254.6.29,vgroup:my_test_tx_group
16:10:47.073 INFO --- [rverHandlerThread_1_7_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 169.254.6.29:8091:136139747123908608, branchId = 136139755294412801, resourceId = jdbc:mysql://127.0.0.1:3306/order ,lockKeys = product_order:6
16:10:47.184 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : xid=169.254.6.29:8091:136139747123908608,extraData=null,clientIp:169.254.6.29,vgroup:my_test_tx_group
16:10:48.084 INFO --- [ AsyncCommitting_1_1] io.seata.server.coordinator.DefaultCore : Committing global transaction is successfully done, xid = 169.254.6.29:8091:136139747123908608.
16:10:53.908 INFO --- [ TxTimeoutCheck_1_1] i.s.s.coordinator.DefaultCoordinator : Global transaction[169.254.6.29:8091:136139530647490560] is timeout and will be rollback.
16:10:54.947 INFO --- [ RetryRollbacking_1_1] io.seata.server.coordinator.DefaultCore : Rollback global transaction successfully, xid = 169.254.6.29:8091:136139530647490560.
從日志中我們可以看到:
1)全局事務XID已經生成,各個分支注冊成功,
2)branchId也已經生成并在全局事務XID下,資源已被鎖住
3)全局事務送出成功
檢視此時的庫存與餘額,都已經進行了減扣
2、模拟庫存不足情況
修改productId=1的商品庫存為0:
再次發起請求,檢視TC Server日志,可以查出明顯發生了全局事務的復原
16:20:24.258 INFO --- [verHandlerThread_1_12_500] i.s.s.coordinator.DefaultCoordinator : Begin new global transaction applicationId: seata-order-service,transactionServiceGroup: my_test_tx_group, transactionName: createOrder(java.lang.Long, java.math.BigDecimal),timeout:60000,xid:169.254.6.29:8091:136142176250875904
16:20:24.279 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : xid=169.254.6.29:8091:136142176250875904,extraData=null,clientIp:169.254.6.29,vgroup:my_test_tx_group
16:20:24.420 INFO --- [verHandlerThread_1_13_500] io.seata.server.coordinator.DefaultCore : Rollback global transaction successfully, xid = 169.254.6.29:8091:136142176250875904.
檢視庫存與餘額情況,庫存仍然是0,餘額仍然是987.75
3、模拟餘額不足情況
修改accountId=1的賬戶餘額小于12.25
再次發起請求,檢視日志
16:27:41.811 INFO --- [verHandlerThread_1_14_500] i.s.s.coordinator.DefaultCoordinator : Begin new global transaction applicationId: seata-order-service,transactionServiceGroup: my_test_tx_group, transactionName: createOrder(java.lang.Long, java.math.BigDecimal),timeout:60000,xid:169.254.6.29:8091:136144011456008192
16:27:41.836 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : SeataMergeMessage xid=169.254.6.29:8091:136144011456008192,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/storage,lockKey=repo:1
,clientIp:169.254.6.29,vgroup:my_test_tx_group
16:27:41.889 INFO --- [verHandlerThread_1_15_500] i.seata.server.coordinator.AbstractCore : Register branch successfully, xid = 169.254.6.29:8091:136144011456008192, branchId = 136144011762192385, resourceId = jdbc:mysql://127.0.0.1:3306/storage ,lockKeys = repo:1
16:27:42.088 INFO --- [ batchLoggerPrint_1_1] i.s.c.r.p.server.BatchLogHandler : xid=169.254.6.29:8091:136144011456008192,extraData=null,clientIp:169.254.6.29,vgroup:my_test_tx_group
16:27:42.632 INFO --- [verHandlerThread_1_16_500] io.seata.server.coordinator.DefaultCore : Rollback branch transaction successfully, xid = 169.254.6.29:8091:136144011456008192 branchId = 136144011762192385
16:27:42.754 INFO --- [verHandlerThread_1_16_500] io.seata.server.coordinator.DefaultCore : Rollback global transaction successfully, xid = 169.254.6.29:8091:136144011456008192.
不同于庫存不足的情況的是,這裡庫存服務分支事務是先注冊TC Server的,因為有異常的并不是庫存服務,需要注意的是因為我模拟的是下單之後立馬支付,支付失敗的話訂單也是不會存在,實際生活中應該是訂單顯示“支付失敗”。
檢視庫存與餘額情況,庫存仍然是997,餘額仍然是10.75