天天看點

Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)

本文目錄:

  • 需求
  • 什麼是分布式事務
  • 分布式事務解決方案
  • Seata 是什麼?
  • 準備工作
  • 代碼實戰示範
  • 啟動服務功能示範
  • Seata 事務分組說明
  • Seata 分布式事務原了解釋
  • 項目源碼位址

後端工具和環境

  • IDE:IDEA
  • 注冊中心:nacos 1.1.3
  • Spring Cloud:Greenwich.SR3
  • Speing Alibaba Cloud:2.1.1.RELEASE
  • Seata:0.9.0
  • MybatisPlus:3.2.0

一. 需求

在開發我的開源項目 prex 時,加入工作流,解決工作流使用者與目前系統使用者同步問題時,涉及到遠端調用操作兩個資料庫所産生的事務問題,比如系統使用者在增加使用者同步工作流使用者時,系統使用者添加成功,工作流使用者沒有添加成功,則造成資料不一緻問題,本地事務無法復原,那麼則使用分布式事務解決方案。

二. 什麼是分布式事務?

指一次大的操作由不同的小操作組成的,這些小的操作分布在不同的伺服器上,分布式事務需要保證這些小操作要麼全部成功,要麼全部失敗。從本質上來說,分布式事務就是為了保證不同資料庫的資料一緻性。

通俗一點說就是單體應用被拆分成微服務應用,原來的一個子產品被拆分成三個獨立的應用,分别使用獨立的資料源,業務操作需要調用三個服務來完成。

三. 分布式事務解決方案

分布式事務作為微服務應用中的大難題,在現有的解決方案中,個人認為

Seata

是目前最輕量的解決方案

四. Seata 是什麼?

Seata

是一款開源的分布式事務解決方案,緻力于提供高性能和簡單易用的分布式事務服務。

Seata

将為使用者提供了 AT、TCC、SAGA 和 XA 事務模式,為使用者打造一站式的分布式解決方案。

AT 模式

前提

  • 基于支援本地 ACID 事務的關系型資料庫。
  • Java 應用,通過 JDBC 通路資料庫。

整體機制

兩階段送出協定的演變:

  • 一階段:業務資料和復原日志記錄在同一個本地事務中送出,釋放本地鎖和連接配接資源。
  • 二階段:
    • 送出異步化,非常快速地完成。
    • 復原通過一階段的復原日志進行反向補償。

寫隔離

  • 一階段本地事務送出前,需要確定先拿到全局鎖 。
  • 拿不到 全局鎖 ,不能送出本地事務。
  • 拿 全局鎖 的嘗試被限制在一定範圍内,超出範圍将放棄,并復原本地事務,釋放本地鎖。

以一個示例來說明:

兩個全局事務 tx1 和 tx2,分别對 a 表的 m 字段進行更新操作,m 的初始值 1000。

tx1 先開始,開啟本地事務,拿到本地鎖,更新操作 m = 1000 - 100 = 900。本地事務送出前,先拿到該記錄的 全局鎖 ,本地送出釋放本地鎖。 tx2 後開始,開啟本地事務,拿到本地鎖,更新操作 m = 900 - 100 = 800。本地事務送出前,嘗試拿該記錄的 全局鎖 ,tx1 全局送出前,該記錄的全局鎖被 tx1 持有,tx2 需要重試等待 全局鎖 。

Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)

tx1 二階段全局送出,釋放 全局鎖 。tx2 拿到 全局鎖 送出本地事務。

Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)

如果 tx1 的二階段全局復原,則 tx1 需要重新擷取該資料的本地鎖,進行反向補償的更新操作,實作分支的復原。

此時,如果 tx2 仍在等待該資料的 全局鎖,同時持有本地鎖,則 tx1 的分支復原會失敗。分支的復原會一直重試,直到 tx2 的 全局鎖 等鎖逾時,放棄 全局鎖 并復原本地事務釋放本地鎖,tx1 的分支復原最終成功。

因為整個過程 全局鎖 在 tx1 結束前一直是被 tx1 持有的,是以不會發生 髒寫 的問題。

讀隔離

在資料庫本地事務隔離級别 讀已送出(Read Committed) 或以上的基礎上,Seata(AT 模式)的預設全局隔離級别是 讀未送出(Read Uncommitted) 。

如果應用在特定場景下,必需要求全局的 讀已送出 ,目前 Seata 的方式是通過 SELECT FOR UPDATE 語句的代理。

Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)

SELECT FOR UPDATE 語句的執行會申請 全局鎖 ,如果 全局鎖 被其他事務持有,則釋放本地鎖(復原 SELECT FOR UPDATE 語句的本地執行)并重試。這個過程中,查詢是被 block 住的,直到 全局鎖 拿到,即讀取的相關資料是 已送出 的,才傳回。

出于總體性能上的考慮,Seata 目前的方案并沒有對所有 SELECT 語句都進行代理,僅針對 FOR UPDATE 的 SELECT 語句。

工作機制

以一個示例來說明整個 AT 分支的工作過程。

業務表:product

Field Type Key
id bigint(20) PRI
name varchar(100)
since varchar(100)

AT 分支事務的業務邏輯:

update product set name = ‘GTS’ where name = ‘TXC’;

一階段

過程:

  1. 解析 SQL:得到 SQL 的類型(UPDATE),表(product),條件(where name = ‘TXC’)等相關的資訊。
  2. 查詢前鏡像:根據解析得到的條件資訊,生成查詢語句,定位資料。
select id, name, since from product where name = 'TXC';
           

得到前鏡像:

id name since

1 TXC 2014

  1. 執行業務 SQL:更新這條記錄的 name 為 ‘GTS’。
  2. 查詢後鏡像:根據前鏡像的結果,通過 主鍵 定位資料。
select id, name, since from product where id = 1`;
           

得到後鏡像:

id name since

1 GTS 2014

  1. 插入復原日志:把前後鏡像資料以及業務 SQL 相關的資訊組成一條復原日志記錄,插入到 UNDO_LOG 表中。
{
	"branchId": 641789253,
	"undoItems": [{
		"afterImage": {
			"rows": [{
				"fields": [{
					"name": "id",
					"type": 4,
					"value": 1
				}, {
					"name": "name",
					"type": 12,
					"value": "GTS"
				}, {
					"name": "since",
					"type": 12,
					"value": "2014"
				}]
			}],
			"tableName": "product"
		},
		"beforeImage": {
			"rows": [{
				"fields": [{
					"name": "id",
					"type": 4,
					"value": 1
				}, {
					"name": "name",
					"type": 12,
					"value": "TXC"
				}, {
					"name": "since",
					"type": 12,
					"value": "2014"
				}]
			}],
			"tableName": "product"
		},
		"sqlType": "UPDATE"
	}],
	"xid": "xid:xxx"
}
           
  1. 送出前,向 TC 注冊分支:申請 product 表中,主鍵值等于 1 的記錄的 全局鎖 。
  2. 本地事務送出:業務資料的更新和前面步驟中生成的 UNDO LOG 一并送出。
  3. 将本地事務送出的結果上報給 TC。

二階段-復原

  1. 收到 TC 的分支復原請求,開啟一個本地事務,執行如下操作。
  2. 通過 XID 和 Branch ID 查找到相應的 UNDO LOG 記錄。
  3. 資料校驗:拿 UNDO LOG 中的後鏡與目前資料進行比較,如果有不同,說明資料被目前全局事務之外的動作做了修改。這種情況,4. 需要根據配置政策來做處理,詳細的說明在另外的文檔中介紹。

    根據 UNDO LOG 中的前鏡像和業務 SQL 的相關資訊生成并執行復原的語句:

update product set name = 'TXC' where id = 1;
           
  1. 送出本地事務。并把本地事務的執行結果(即分支事務復原的結果)上報給 TC。

二階段-送出

  1. 收到 TC 的分支送出請求,把請求放入一個異步任務的隊列中,馬上傳回送出成功的結果給 TC。
  2. 異步任務階段的分支送出請求将異步和批量地删除相應 UNDO LOG 記錄。

附錄

復原日志表

UNDO_LOG Table:不同資料庫在類型上會略有差别。

以 MySQL 為例:

Field Type
branch_id bigint PK
xid varchar(100)
context varchar(128)
rollback_info longblob
log_status tinyint
log_created datetime
log_modified datetime
-- 注意此處0.7.0+ 增加字段 context
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
           

TCC 模式

回顧總覽中的描述:一個分布式的全局事務,整體是 兩階段送出 的模型。全局事務是由若幹分支事務組成的,分支事務要滿足 兩階段送出 的模型要求,即需要每個分支事務都具備自己的:

  • 一階段 prepare 行為
  • 二階段 commit 或 rollback 行為
Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)

根據兩階段行為模式的不同,我們将分支事務劃分為 Automatic (Branch) Transaction Mode 和 Manual (Branch) Transaction Mode.

AT 模式(參考連結 TBD)基于 支援本地 ACID 事務 的 關系型資料庫:

  • 一階段 prepare 行為:在本地事務中,一并送出業務資料更新和相應復原日志記錄。
  • 二階段 commit 行為:馬上成功結束,自動 異步批量清理復原日志。
  • 二階段 rollback 行為:通過復原日志,自動 生成補償操作,完成資料復原。

相應的,TCC 模式,不依賴于底層資料資源的事務支援:

  • 一階段 prepare 行為:調用 自定義 的 prepare 邏輯。
  • 二階段 commit 行為:調用 自定義 的 commit 邏輯。
  • 二階段 rollback 行為:調用 自定義 的 rollback 邏輯。

    所謂 TCC 模式,是指支援把 自定義 的分支事務納入到全局事務的管理中

Saga 模式

Saga 模式是 SEATA 提供的長事務解決方案,在 Saga 模式中,業務流程中每個參與者都送出本地事務,當出現某一個參與者失敗則補償前面已經成功的參與者,一階段正向服務和二階段補償服務都由業務開發實作。

Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)

理論基礎:Hector & Kenneth 發表論⽂ Sagas (1987)

适用場景:

  • 業務流程長、業務流程多
  • 參與者包含其它公司或遺留系統服務,無法提供 - TCC 模式要求的三個接口

優勢:

  • 一階段送出本地事務,無鎖,高性能
  • 事件驅動架構,參與者可異步執行,高吞吐
  • 補償服務易于實作

缺點:

  • 不保證隔離性(應對方案見使用者文檔)

五. 準備工作

  • 這裡我們使用

    Nacos

    作為注冊中心,Nacos 的安裝及使用可以參考
  • 我們從官網下載下傳

    seata-server

    ,這裡下載下傳的是 seata-server-0.9.0.zip,下載下傳位址:https://github.com/seata/seata/releases

    github 位址下載下傳速度很慢,可以在(李浩東的部落格)公衆号背景回複

    seata安裝包

    快速擷取百度雲下載下傳連結
  • 下載下傳完成後解壓 seata-server 安裝包到指定目錄

解壓完成後我們得到了幾個檔案夾

Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)
  • bin

    存放各個系統的 seata server 啟動腳本

  • conf

    存在 seata server 啟動時所需要的配置資訊、資料庫模式下所需要的建表語句

  • lib

    運作 seata server 所需要的依賴包清單

配置 Seata Server

seata server

所有的配置都在 conf 檔案夾内,該檔案夾内有兩個檔案我們必須要詳細介紹下。

seata server

預設使用 file(檔案方式)進行存儲事務日志、事務運作資訊,我們可以通過-m db 腳本參數的形式來指定,目前僅支援 file、db 這兩種方式。

  • file.conf

    該檔案用于配置存儲方式、透傳事務資訊的 NIO 等資訊,預設對應 registry.conf 檔案内的 file 方式配置

  • registry.conf

    seata server 核心配置檔案,可以通過該檔案配置服務注冊方式、配置讀取方式。

    注冊方式目前支援 file 、nacos 、eureka、redis、zk、consul、etcd3、sofa 等方式,預設為 file,對應讀取 file.conf 内的注冊方式資訊。

    讀取配置資訊的方式支援 file、nacos 、apollo、zk、consul、etcd3 等方式,預設為 file,對應讀取 file.conf 檔案内的配置。

修改 conf 目錄下的 file.conf 配置檔案,主要修改自定義事務組名稱,事務日志存儲模式及資料庫連接配接資訊

transport {
  ...省略
}
service {
  #vgroup->rgroup
  vgroup_mapping.prex_tx_group = "default" #修改事務組名稱為:prex_tx_group,和用戶端自定義的名稱對應
  #only support single node
  default.grouplist = "127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
}

## transaction log store
store {
  ## store mode: file、db
  mode = "db" #修改此處将事務資訊存儲到db資料庫中

  ## database store
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "druid"
    ## mysql/oracle/h2/oceanbase etc.
    db-type = "mysql"
    driver-class-name = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://localhost:3306/seat" #修改資料庫連接配接位址
    user = "root" #修改資料庫使用者名
    password = "root" #修改資料庫密碼
    min-conn = 1
    max-conn = 3
    global.table = "global_table"
    branch.table = "branch_table"
    lock-table = "lock_table"
    query-limit = 100
  }
}
           

說明:

  • 存儲事務日志可以使用 file 檔案和 db 資料庫兩種方式
  • 由于我們使用了 db 模式存儲事務日志,是以我們需要建立一個 seat 資料庫,建表 sql 在 seata-server 的/conf/db_store.sql 中
Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)
  • 修改 conf 目錄下的

    registry.conf

    配置檔案,指明注冊中心為 nacos,及修改 nacos 連接配接資訊即可;
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  nacos {
    serverAddr = "localhost:8848"
    namespace = ""
    cluster = "default"
  }
  ... 省略
}
}

           

配置完成後啟動 Seata

啟動 seata server 的腳本位于 bin 檔案内,

Linux/Mac

環境使用 seata-server.sh 腳本啟動,Windows 環境使用 seata-server.bat 腳本啟動。

Linux/Mac

啟動方式示例如下所示:

nohup sh seata-server.sh -p 8091 -h 127.0.0.1 -m db &> seata.log &
           

通過 nohup 指令讓 seata server 在系統背景運作。

腳本參數:

  • -p

    指定啟動 seata server 的端口号。

  • -h

    指定 seata server 所綁定的主機,這裡配置要注意指定的主機 IP 要與業務服務内的配置檔案保持一緻,如:-h 192.168.1.10,業務服務配置檔案内應該配置 192.168.1.10,即使在同一台主機上也要保持一緻。

  • -m

    事務日志、事務執行資訊存儲的方式,目前支援 file(檔案方式)、db(資料庫方式,建表語句請檢視 config/db_store.sql、config/db_undo_log.sql)

檢視啟動日志

Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)

當我們看到-Server started 時并未發現其他錯誤資訊,我們的 seata server 已經啟動成功

六. 實戰示範

讓我們從一個微服務示例開始

使用者購買商品的業務邏輯。整個業務邏輯由 3 個微服務提供支援:

  • 倉儲服務:對給定的商品扣除倉儲數量。
  • 訂單服務:根據采購需求建立訂單。
  • 帳戶服務:從使用者帳戶中扣除餘額。

架構圖

Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)

資料庫

建立業務資料庫

db-order:存儲訂單的資料庫

db-storage:存儲庫存的資料庫

db-account:存儲賬戶資訊的資料庫

order 訂單表:

DROP TABLE IF EXISTS `order`;
CREATE TABLE `order` (
  `id` int(20) NOT NULL AUTO_INCREMENT COMMENT '主鍵Id',
  `user_id` int(20) DEFAULT NULL COMMENT '使用者Id',
  `pay_money` decimal(11,0) DEFAULT NULL COMMENT '付款金額',
  `product_id` int(20) DEFAULT NULL COMMENT '商品Id',
  `status` int(11) DEFAULT NULL COMMENT '狀态',
  `count` int(11) DEFAULT NULL COMMENT '商品數量',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC COMMENT='訂單表';

SET FOREIGN_KEY_CHECKS = 1;
           

product 商品表:

DROP TABLE IF EXISTS `product`;
CREATE TABLE `product` (
  `id` int(20) NOT NULL COMMENT '主鍵',
  `product_id` int(11) DEFAULT NULL COMMENT '商品Id',
  `price` decimal(11,0) DEFAULT NULL COMMENT '價格',
  `count` int(11) DEFAULT NULL COMMENT '庫存數量',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC COMMENT='倉儲服務';

-- ----------------------------
-- Records of product
-- ----------------------------
BEGIN;
INSERT INTO `product` VALUES (1, 1, 50, 100);
COMMIT;

SET FOREIGN_KEY_CHECKS = 1;
           

account 賬戶表:

DROP TABLE IF EXISTS `account`;
CREATE TABLE `account` (
  `id` int(20) NOT NULL AUTO_INCREMENT COMMENT '主鍵Id',
  `user_id` int(20) DEFAULT NULL COMMENT '使用者Id',
  `balance` decimal(11,0) DEFAULT NULL COMMENT '餘額',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=latin1 ROW_FORMAT=DYNAMIC;

-- ----------------------------
-- Records of account
-- ----------------------------
BEGIN;
INSERT INTO `account` VALUES (1, 1, 100);
COMMIT;

SET FOREIGN_KEY_CHECKS = 1;
           

建立日志復原表

需要在每個資料庫中建立日志復原表,建表 sql 在 seata-server 的/conf/db_undo_log.sql 中。

分布式事務問題産生

三個服務,一個訂單服務,一個倉儲服務,一個賬戶服務。當使用者下單時,會在訂單服務中建立一個訂單,然後通過遠端調用庫存服務來扣減下單商品的庫存,再通過遠端調用賬戶服務來扣減使用者賬戶裡面的餘額,最後在訂單服務中修改訂單狀态為已完成。該操作跨越三個資料庫,有兩次遠端調用,很明顯會有分布式事務問題
           

工程結構

Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)

nacos-seata-account-server 賬戶服務

nacos-seata-order-server 訂單服務

nacos-seata-storage-server 倉儲服務

用戶端配置

  • 對 nacos-seata-account-server、nacos-seata-order-server 和 nacos-seata-storage-server 三個 seata 的用戶端進行配置,它們配置大緻相同,我們下面以 nacos-seata-account-server 的配置為例;
  • 修改 application.yml 檔案,自定義事務組的名稱
spring:
  cloud:
    alibaba:
      seata:
        tx-service-group: prex_tx_group #自定義事務組名稱需要與seata-server中的對應
           
  • 添加并修改 file.conf 配置檔案,主要是修改自定義事務組名稱
service {
  #vgroup->rgroup
  vgroup_mapping.prex_tx_group = "default" #修改自定義事務組名稱
  #only support single node
  default.grouplist = "127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
  disableGlobalTransaction = false
}
           

添加并修改 registry.conf 配置檔案,主要是将注冊中心改為 nacos

registry {
  # file 、nacos 、eureka、redis、zk
  type = "nacos" #修改為nacos

  nacos {
    serverAddr = "localhost:8848" #修改為nacos的連接配接位址
    namespace = ""
    cluster = "default"
  }
}
           

代碼隻展示核心代碼 具體代碼文章尾部連結

  • 在啟動類中取消資料源的自動建立
@EnableDiscoveryClient
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@MapperScan("com.xd.example.seata.mapper")
public class NacosSeataAccountServerApplication {

	public static void main(String[] args) {
		SpringApplication.run(NacosSeataAccountServerApplication.class, args);
	}

}
           
  • 配置 MybatisPlus 使用 Seata 對資料源進行代理

MyBatisPlusConfig:

/**
 * @Classname MyBatisPlusConfig
 * @Description 配置MybatisPlus使用Seata對資料源進行代理
 * @Author Created by Lihaodong (alias:小東啊) [email protected]
 * @Date 2019-11-25 11:21
 * @Version 1.0
 */
@Configuration
public class MyBatisPlusConfig {

    @Value("${mybatis-plus.mapper-locations}")
    private String mapperLocations;

    /**
     * @param sqlSessionFactory SqlSessionFactory
     * @return SqlSessionTemplate
     */
    @Bean
    public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
        return new SqlSessionTemplate(sqlSessionFactory);
    }

    /**
     * 從配置檔案擷取屬性構造datasource,注意字首,這裡用的是druid,根據自己情況配置,
     * 原生datasource字首取"spring.datasource"
     *
     * @return
     */
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.hikari")
    public DataSource hikariDataSource() {
        return new HikariDataSource();
    }

    /**
     * 構造datasource代理對象,替換原來的datasource
     *
     * @param hikariDataSource
     * @return
     */
    @Primary
    @Bean("dataSource")
    public DataSourceProxy dataSourceProxy(DataSource hikariDataSource) {
        return new DataSourceProxy(hikariDataSource);
    }

    @Bean(name = "sqlSessionFactory")
    public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
        MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
        bean.setDataSource(dataSourceProxy);
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        bean.setMapperLocations(resolver.getResources(mapperLocations));

        SqlSessionFactory factory = null;
        try {
            factory = bean.getObject();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return factory;
    }

    /**
     * MP 自帶分頁插件
     *
     * @return
     */
    @Bean
    public PaginationInterceptor paginationInterceptor() {
        PaginationInterceptor page = new PaginationInterceptor();
        page.setDialectType("mysql");
        return page;
    }
}

           
  • 使用@GlobalTransactional 注解開啟分布式事務
package com.xd.example.seata.service.impl;

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.xd.example.seata.domain.Order;
import com.xd.example.seata.mapper.OrderMapper;
import com.xd.example.seata.service.IOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.xd.example.seata.service.RemoteAccountService;
import com.xd.example.seata.service.RemoteStorageService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * <p>
 * 訂單表 服務實作類
 * </p>
 *
 * @author lihaodong
 * @since 2019-11-25
 */
@Slf4j
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {

    @Autowired
    private RemoteStorageService remoteStorageService;

    @Autowired
    private RemoteAccountService remoteAccountService;

    @GlobalTransactional(rollbackFor = Exception.class)
    @Override
    public void createOrder(Order order) {
        log.info("下單開始,使用者:{},商品:{},數量:{},金額:{}", order.getUserId(), order.getProductId(), order.getCount(), order.getPayMoney());
        //建立訂單
        order.setStatus(0);
        boolean save = save(order);
        log.info("儲存訂單{}", save ? "成功" : "失敗");
        log.info("目前 XID: {}", RootContext.getXID());
        //遠端調用庫存服務扣減庫存
        log.info("扣減庫存開始");
        remoteStorageService.decrease(order.getProductId(), order.getCount());
        log.info("扣減庫存結束");

        //遠端調用賬戶服務扣減餘額
        log.info("扣減餘額開始");
        remoteAccountService.decrease(order.getUserId(), order.getPayMoney());
        log.info("扣減餘額結束");

        //修改訂單狀态為已完成
        log.info("修改訂單狀态開始");
        update(Wrappers.<Order>lambdaUpdate().set(Order::getStatus, 1).eq(Order::getUserId, order.getUserId()));
        log.info("修改訂單狀态結束");

        log.info("下單結束");
    }
}

           

七. 啟動服務功能示範

  1. 分别運作 nacos-seata-order-server、nacos-seata-storage-server 和 nacos-seata-account-server 三個服務
    Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)
    可以看到 seata 注冊成功
  2. 查詢資料庫初始資料資訊
    Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)
    Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)
    Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)
  3. 打開浏覽器/Postman 調用接口進行下單操作:http://localhost:8081/order/create?userId=1&productId=1&count=1&payMoney=50

    結果:

    Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)

    檢視控制台列印:

    訂單服務:

Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)

倉儲服務:

Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)

賬戶服務:

Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)
  1. 再次資料庫查詢
    Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)
    Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)
    Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)
  2. 我們在 nacos-seata-account-server 中制造一個逾時異常後(其他異常也行),調用下單接口
package com.xd.example.seata.service.impl;

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.xd.example.seata.domain.Account;
import com.xd.example.seata.mapper.AccountMapper;
import com.xd.example.seata.service.IAccountService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.util.Optional;

/**
 * <p>
 * 服務實作類
 * </p>
 *
 * @author lihaodong
 * @since 2019-11-25
 */
@Slf4j
@Service
public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> implements IAccountService {

    @Override
    public boolean reduceBalance(Integer userId, BigDecimal balance) throws Exception {

        log.info("目前 XID: {}", RootContext.getXID());
        checkBalance(userId, balance);

        log.info("開始扣減使用者 {} 餘額", userId);
        //模拟逾時異常
        try {
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Integer record = baseMapper.reduceBalance(userId, balance);
        log.info("結束扣減使用者 {} 餘額結果:{}", userId, record > 0 ? "操作成功" : "扣減餘額失敗");
        return record > 0;
    }

    private void checkBalance(Integer userId, BigDecimal price) throws Exception {
        log.info("檢查使用者 {} 餘額", userId);

        Optional<Account> account = Optional.ofNullable(baseMapper.selectOne(Wrappers.<Account>lambdaQuery().eq(Account::getUserId, userId)));
        if (account.isPresent()) {
            BigDecimal balance = account.get().getBalance();
            if (balance.compareTo(price) == -1) {
                log.warn("使用者 {} 餘額不足,目前餘額:{}", userId, balance);
                throw new Exception("餘額不足");
            }
        }
    }
}
           

修改完會重新開機賬戶服務,再次發送請求

Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)

訂單服務控制台:

Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)

可以看到訂單正常,扣除庫存正常,賬戶服務讀取逾時異常

  1. 發現下單後資料庫資料并沒有任何改變
    Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)
  2. 我們在 seata-order-service 中注釋掉@GlobalTransactional 來看看會發生什麼
//    @GlobalTransactional(name = "prex-create-order",rollbackFor = Exception.class)
    @Override
    public void createOrder(Order order) {
        log.info("目前 XID: {}", RootContext.getXID());
        log.info("下單開始,使用者:{},商品:{},數量:{},金額:{}", order.getUserId(), order.getProductId(), order.getCount(), order.getPayMoney());
        //建立訂單
        order.setStatus(0);
        boolean save = save(order);
        log.info("儲存訂單{}", save ? "成功" : "失敗");

		... 省略代碼
}
           

儲存重新開機訂單服務,再次請求接口

由于 nacos-seata-account-server 的逾時會導緻當庫存和賬戶金額扣減後訂單狀态并沒有設定為已經完成

Spring Alibaba Cloud使用Seata實作分布式事務,Nacos作為配置中心(一)

八. Seata 事務分組

下一篇更新

Seata 分布式事務原了解釋

下一篇更新

項目源碼位址

https://gitee.com/li_haodong/SpringCloudAlibabaLearn

參考資料:

http://seata.io/zh-cn/

https://url.cn/5pxtYQ4

https://url.cn/5QaYJWh

繼續閱讀