本文章僅作為seata接入文檔,seata原理和源碼請自行轉至github https://github.com/seata/seata 官方文檔位址 https://github.com/seata/seata/wiki/Home_Chinese
1 由于系統演進,大佬覺得 需要做微服務,腦子一拍開始對原來的系統進行微服務改造,
在改造過程中,分布式事務不可避免,tcc mq等等概念研究一遍後,結合時間成本,發現阿裡gts 比較适合,無奈需要接入外網遂放棄,後來偶然發現seata 開源版gts 嘗試接入項目
先放一張流程圖

接入流程
1 首先去
官網git下載下傳一份源碼,我下載下傳的是0.5.2版本。
2 在本地解壓加載到idea下載下傳jar包後直接啟動server項目中的啟動類Server.java ,在調試過程中發現netty存在有時記憶體不夠問題,遂增加啟動參數-XX:MaxDirectMemorySize=1024m
3 server 基于netty開發目前隻支援單節點啟動,記憶體大小沒有進行壓力測試,seata配置檔案為registry.conf 附上關鍵配置
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "file"
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul
type = "file"
file {
name = "file.conf"
}
}
type 指定配置方式 目前支援file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
預設配置為file 在接入過程中也使用file 配置類型
file.conf 和registry.conf 目前都在resource目錄下
附上file.conf 關鍵配置
service {
#vgroup->rgroup
vgroup_mapping.my_test_tx_group = "default"
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
}
接入過程 vgroup_mapping.my_test_tx_group = "default" 修改配置為自定義配置vgroup_mapping.my_group= "default"
具體原則不清楚
4 架構圖中的tc搭建完成,下一步搭建RM 也就是微服務的原子系統 引入seata 的jar包
<!--架構問題,指定durid版本-->
<properties>
<druid.version>1.1.10</druid.version>
<seata.version>0.5.0</seata.version>
</properties>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring</artifactId>
<version>${seata.version}</version>
</dependency>
5 ,按照官方文檔,需要将資料源替換為seata資料源,本項目是springboot+durid+mybitas 直接上代碼 關鍵代碼 DataSourceProxy proxy = new DataSourceProxy(datasource);
@Configuration
public class DruidConfig {
@Value("${spring.datasource.url}")
private String dbUrl;
@Value("${spring.datasource.username}")
private String username;
@Value("${spring.datasource.password}")
private String password;
@Value("${spring.datasource.driver-class-name}")
private String driverClassName;
@Value("${spring.datasource.initialSize}")
private int initialSize;
@Value("${spring.datasource.minIdle}")
private int minIdle;
@Value("${spring.datasource.maxActive}")
private int maxActive;
@Value("${spring.datasource.maxWait}")
private int maxWait;
@Value("${spring.datasource.timeBetweenEvictionRunsMillis}")
private int timeBetweenEvictionRunsMillis;
@Value("${spring.datasource.minEvictableIdleTimeMillis}")
private int minEvictableIdleTimeMillis;
@Value("${spring.datasource.validationQuery}")
private String validationQuery;
@Value("${spring.datasource.testWhileIdle}")
private boolean testWhileIdle;
@Value("${spring.datasource.testOnBorrow}")
private boolean testOnBorrow;
@Value("${spring.datasource.testOnReturn}")
private boolean testOnReturn;
@Value("${spring.datasource.poolPreparedStatements}")
private boolean poolPreparedStatements;
@Value("${spring.datasource.filters}")
private String filters;
@Value("${mybatis.mapper-locations}")
private String mapperLocation;
@Bean
@Primary
public DataSource druidDataSource() {
DruidDataSource datasource = new DruidDataSource();
datasource.setUrl(this.dbUrl);
datasource.setUsername(username);
datasource.setPassword(password);
datasource.setDriverClassName(driverClassName);
datasource.setInitialSize(initialSize);
datasource.setMinIdle(minIdle);
datasource.setMaxActive(maxActive);
datasource.setMaxWait(maxWait);
datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
datasource.setValidationQuery(validationQuery);
datasource.setTestWhileIdle(testWhileIdle);
datasource.setTestOnBorrow(testOnBorrow);
datasource.setTestOnReturn(testOnReturn);
datasource.setPoolPreparedStatements(poolPreparedStatements);
DataSourceProxy proxy = new DataSourceProxy(datasource);
return proxy;
}
@Bean(name="sqlSessionFactory")
public SqlSessionFactoryBean sqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
Resource[] mapperXmlResource = resolver.getResources(mapperLocation);
sqlSessionFactory.setDataSource(dataSource);
sqlSessionFactory.setMapperLocations(mapperXmlResource);
return sqlSessionFactory;
}
}
6 新增seata 掃描器配置 直接上代碼
@Configuration
public class SeataConfiguration {
@Value("${spring.application.name}")
private String applicationId;
/**
* 注冊一個StatViewServlet
*
* @return global transaction scanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
GlobalTransactionScanner globalTransactionScanner = new GlobalTransactionScanner(applicationId,
"my_group");
return globalTransactionScanner;
}
}
7
新增攔截器過濾器或者切面等,在業務執行前,攔截每個請求,并擷取XID并綁定,本人是在service前增加切面,并處理資料
seata目前支援dubbo和springcloud 預設XID放在headers中,由于我們的項目使用的自有的rpc架構,是以需要自己手動擷取XID,為了友善
我将XID寫在了body中,自己接入的時候,需要按照需要自行設定
上代碼
String xid = RootContext.getXID();
String restXid = StringUtil.getStringValue(esbInput.getParams().get("Seata-Xid"));
boolean bind = false;
if (StringUtils.isBlank(xid) && StringUtils.isNotBlank(restXid)) {
RootContext.bind(restXid);
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[" + restXid + "] to RootContext");
}
}
try{
//執行方法體
object = joinPoint.proceed(args);
}catch (GeneralException e){//對外接口統一異常捕獲解析
}finally {
if (bind) {
String unbindXid = RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
}
if (!restXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during http rest from " + restXid + " to " + unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
}
}
}
}
8 增加配置檔案 file.conf 和registry.conf 并按照自己的實際情況進行配置,同時按照官方文檔,在原子服務資料庫新增日志回退表
undo_log 建表語句為
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=94 DEFAULT CHARSET=utf8;
9 建立TM服務
引入jar包
`
js
<seata.version>0.5.0</seata.version>
<groupId>io.seata</groupId>
<artifactId>seata-spring</artifactId>
<version>${seata.version}</version>
`
10
新增scanner 配置 和rm 一緻
@Configuration
public class SeataConfiguration {
@Value("${spring.application.name}")
private String applicationId;
/**
* 注冊一個StatViewServlet
*
* @return global transaction scanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
GlobalTransactionScanner globalTransactionScanner = new GlobalTransactionScanner(applicationId,
"nguc_tx_group");
return globalTransactionScanner;
}
}
11 開啟事務和事務的送出復原
private void transactionalRollBack(GlobalTransaction globalTransaction,String xid){
LOGGER.error("分布式事務中斷,事務開始復原");
try {
globalTransaction.rollback();
} catch (TransactionException txe) {
LOGGER.error("分布式事務復原失敗,全局事務XID : " + xid);
}
}
public XX doTransaction(){
GlobalTransaction globalTransaction = GlobalTransactionContext.getCurrentOrCreate();
//begin GlobalTransactional
try {
globalTransaction.begin(20000, "test");
} catch (TransactionException e) {
LOGGER.error("全局事務開啟失敗")
return outObject;
}
String xid = RootContext.getXID();
//組合服務标示
try {
call(input)
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
transactionalRollBack(globalTransaction, xid);
}
try {
globalTransaction.rollback();
LOGGER.error("全局事務送出成功,全局事務XID : " + xid);
} catch (TransactionException txe) {
LOGGER.error("全局事務送出失敗,全局事務XID : " + xid);
}
return xx;
}
12 通過TM調用rm服務,并測試復原,可以在commit前添加斷點檢視undo_log中的資料