分布式事務産生的背景
分布式架構演進之 - 資料庫的水準拆分
業務資料庫起初是單庫單表,但随着業務資料規模的快速發展,資料量越來越大,單庫單表逐漸成為瓶頸。是以我們對資料庫進行了水準拆分,将原單庫單表拆分成資料庫分片。
如下圖所示,分庫分表之後,原來在一個資料庫上就能完成的寫操作,可能就會跨多個資料庫,這就産生了跨資料庫事務問題。
分布式架構演進之 - 業務服務化拆分
在業務發展初期,“一塊大餅”的單業務系統架構,能滿足基本的業務需求。但是随着業務的快速發展,系統的通路量和業務複雜程度都在快速增長,單系統架構逐漸成為業務發展瓶頸,解決業務系統的高耦合、可伸縮問題的需求越來越強烈。
按照面向服務架構(SOA)的設計原則,将單業務系統拆分成多個業務系統,降低了各系統之間的耦合度,使不同的業務系統專注于自身業務,更有利于業務的發展和系統容量的伸縮。
業務系統按照服務拆分之後,一個完整的業務往往需要調用多個服務,如何保證多個服務間的資料一緻性成為一個難題。
分布式事務 Seata 介紹
Seata(Simple Extensible Autonomous Transaction Architecture,簡單可擴充自治事務架構)是 2019 年 1 月份阿裡巴巴共同開源的分布式事務解決方案。Seata 開源半年左右,目前已經有超過 1.1 萬 star,社群非常活躍。我們熱忱歡迎大家參與到 Seata 社群建設中,一同将 Seata 打造成開源分布式事務标杆産品。
Seata:https://github.com/seata/seata
分布式事務 Seata 産品子產品
如下圖所示,Seata 中有三大子產品,分别是 TM、RM 和 TC。其中 TM 和 RM 是作為 Seata 的用戶端與業務系統內建在一起,TC 作為 Seata 的服務端獨立部署。
在 Seata 中,分布式事務的執行流程:
- TM 開啟分布式事務(TM 向 TC 注冊全局事務記錄);
- 按業務場景,編排資料庫、服務等事務内資源(RM 向 TC 彙報資源準備狀态 );
- TM 結束分布式事務,事務一階段結束(TM 通知 TC 送出/復原分布式事務);
- TC 彙總事務資訊,決定分布式事務是送出還是復原;
- TC 通知所有 RM 送出/復原 資源,事務二階段結束;
分布式事務 Seata 解決方案
Seata 會有 4 種分布式事務解決方案,分别是 AT 模式、TCC 模式、Saga 模式和 XA 模式。
在本地搭建一個TC服務(事務協調者).
下載下傳seata的安裝包
官網(http://seata.io/zh-cn/)
seata-server-1.1.0.zip
,解壓即可使用.
配置
打開解壓目錄下的conf/registry.conf檔案如下
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
# 可以把seata-server了解為一個服務,它需要把自己注冊到某個注冊中心上去,友善使用seata的服務來找到自己
#在這裡就是指定注冊中心的類型,由于我們項目用的是eureka,是以這裡我選擇eureka,即這一堆配置就下面一個eureka生效了
#這裡預設的是file,即檔案,選了檔案就可以不用搭注冊中心,直接從檔案裡讀取服務清單
#複制之後一定要改一改
type = "eureka"
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
eureka { #"隻有我生效啦"
serviceUrl = "http://localhost:10086/eureka" #eureka位址
application = "seata_tc_server" #在eureka裡顯示的名字
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
#在這裡選擇配置中心,這裡我們選擇file
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
#由于選擇了file,是以這裡生效了
name = "file.conf"
}
}
是以接下來看一下
file.conf
檔案
## transaction log store, only used in seata-server
store {
## store mode: file、db
#選擇配置中心的存儲模式,由于選擇file存到檔案裡(性能高)會變為二進制流不好觀察,是以選擇資料庫
#複制之後一定要改一改
mode = "db"
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}
## database store property
db {
#選擇了資料庫必定要做出一些配置,資料庫裡一定要有這3張表
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://192.168.206.99:3306/seata"
user = "root"
password = "root"
minConn = 1
maxConn = 10
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
}
}
建表SQL如下:
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
啟動
如果是linux環境(要有JRE),執行
seata-server.sh
如果是windows環境,執行
seata-server.bat
改造微服務
隻要是需要用到seata(分布式事務)的服務,都要做類似的配置.
引入依賴
我這裡是springboot項目,是以我先在父pom中聲明了.如下
<properties>
<alibaba.seata.version>2.1.0.RELEASE</alibaba.seata.version>
<seata.version>1.1.0</seata.version>
</properties>
<dependencyManagement>
<dependencies>
<!--seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>${alibaba.seata.version}</version>
<exclusions>
<exclusion>
<artifactId>seata-all</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<artifactId>seata-all</artifactId>
<groupId>io.seata</groupId>
<version>${seata.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
接下來隻要在需要seata的微服務裡添加依賴就好了.
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</dependency>
添加配置
spring:
cloud:
alibaba:
seata:
tx-service-group: test_tx_group # 定義事務組的名稱
在resources目錄下添加2個檔案 file.conf
和 registry.conf
file.conf
registry.conf
registry.conf
和前面的一樣,直接複制過來就好.
file.conf裡的内容不同了,新的内容如下:
transport { # tcp udt unix-domain-socket type = "TCP" #NIO NATIVE server = "NIO" #enable heartbeat heartbeat = true # the client batch send request enable enableClientBatchSendRequest = true #thread factory for netty threadFactory { bossThreadPrefix = "NettyBoss" workerThreadPrefix = "NettyServerNIOWorker" serverExecutorThread-prefix = "NettyServerBizHandler" shareBossWorker = false clientSelectorThreadPrefix = "NettyClientSelector" clientSelectorThreadSize = 1 clientWorkerThreadPrefix = "NettyClientWorkerThread" # netty boss thread size,will not be used for UDT bossThreadSize = 1 #auto default pin or 8 workerThreadSize = "default" } shutdown { # when destroy server, wait seconds wait = 3 } serialization = "seata" compressor = "none"}service {#這裡注意,等号前後都是配置,前面是yml裡配置的事務組,後面是register.conf裡定義的seata-server vgroupMapping.test_tx_group = "seata_tc_server" #only support when registry.type=file, please don't set multiple addresses seata_tc_server.grouplist = "127.0.0.1:8091" #degrade, current not support enableDegrade = false #disable seata disableGlobalTransaction = false}client { rm { asyncCommitBufferLimit = 10000 lock { retryInterval = 10 retryTimes = 30 retryPolicyBranchRollbackOnConflict = true } reportRetryCount = 5 tableMetaCheckEnable = false reportSuccessEnable = false } tm { commitRetryCount = 5 rollbackRetryCount = 5 } undo { dataValidation = true logSerialization = "jackson" logTable = "undo_log" } log { exceptionRate = 100 }}
配置解讀:
-
:與TC互動的一些配置transport
-
:client和server通信心跳檢測開關heartbeat
-
:用戶端事務消息請求是否批量合并發送enableClientBatchSendRequest
-
:TC的位址配置,用于擷取TC的位址service
-
:是事務組名稱,要與application.yml中配置一緻,test_tx_group
-
:是TC服務端叢集的名稱,将來通過注冊中心擷取TC位址seata_tc_server
-
:服務降級開關,預設關閉。如果開啟,當業務重試多次失敗後會放棄全局事務enableDegrade
-
:全局事務開關,預設false。false為開啟,true為關閉disableGlobalTransaction
-
:vgroupMapping.test_tx_group = "seata_tc_server"
-
:這個當注冊中心為file的時候,才用到default.grouplist
-
:用戶端配置client
-
:出現復原異常時的日志記錄頻率,預設100,百分之一機率。復原失敗基本是髒資料,無需輸出堆棧占用硬碟空間exceptionRate
-
:是否開啟二階段復原鏡像校驗,預設truedataValidation
-
:undo序列化方式,預設JacksonlogSerialization
-
:自定義undo表名,預設是logTable
undo_log
-
:一階段全局送出結果上報TC重試次數,預設1commitRetryCount
-
:一階段全局復原結果上報TC重試次數,預設1rollbackRetryCount
-
:二階段送出預設是異步執行,這裡指定異步隊列的大小asynCommitBufferLimit
-
:全局鎖配置lock
-
:一階段結果上報TC失敗後重試次數,預設5次reportRetryCount
-
:校驗或占用全局鎖重試間隔,預設10,機關毫秒retryInterval
-
:校驗或占用全局鎖重試次數,預設30次retryTimes
-
:分支事務與其它全局復原事務沖突時鎖政策,預設true,優先釋放本地鎖讓復原成功retryPolicyBranchRollbackOnConflict
-
:資料總管配rm
-
:事務管理器配置tm
-
:undo_log的配置undo
-
:日志配置log
代理DataSource
由于在一階段是通過攔截sql分析語義來生成復原政策,原來的資料源已經不夠用了,得換個牛逼的.在服務裡建立一個配置類.
- 如果是使用的是mybatis
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
public class DataSourceProxyConfig {
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSource dataSource) throws Exception {
// 因為使用的是mybatis,這裡定義SqlSessionFactoryBean
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
// 配置資料源代理
sqlSessionFactoryBean.setDataSource(new DataSourceProxy(dataSource));
return sqlSessionFactoryBean.getObject();
}
}
如果使用的是mybatis-plus
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
public class DataSourceProxyConfig {
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSource dataSource) throws Exception {
// 訂單服務中引入了mybatis-plus,是以要使用特殊的SqlSessionFactoryBean
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
// 代理資料源
sqlSessionFactoryBean.setDataSource(new DataSourceProxy(dataSource));
// 生成SqlSessionFactory
return sqlSessionFactoryBean.getObject();
}
}
加上注解
給事務發起者的方法上加上
@GlobalTransactional
即可,其它的參與者隻要加
@Transactional
就好了.