前言
不啰嗦,直奔主題。。。
分布式事務解決方案目前主要有五種:
1. XA 方案
2. TCC 方案
3. 本地消息表
4. 可靠消息最終一緻性方案
5. 最大努力通知方案
今天主要記錄下代碼實測基于XA方案解決分布式事務的過程。
必要知識背景
事務四大特性:具備原子性、一緻性、隔離性和持久性,簡稱 ACID。
分布式概念:分布式事務可以了解為在分布式系統中實作事務,它其實是由多個本地事務組合而成。
XA方案:又稱2PC(Two-phase commit protocol),中文叫二階段送出,是基于資料庫層面實作的事務控制,分為兩個階段,第一個階段是準備階段,第二個階段是送出或復原。
JTA(java transaction api):JTA,即Java Transaction API,JTA允許應用程式執行分布式事務處理——在兩個或多個網絡計算機資源上通路并且更新資料。
JTA模型圖:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPB1kejpXT31kaNBDOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL1kDOxIjMwMjMxIDNwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
spring-boot 整合jpa代碼示例:
- 工程概覽:
分布式事務_spring-boot整合jpa - 測試sql準備
DROP TABLE IF EXISTS `test_order`;
CREATE TABLE `test_order`
(`o_id` bigint NOT NULL AUTO_INCREMENT,
`o_name` varchar(50) NOT NULL COMMENT '訂單名稱',
PRIMARY KEY (`o_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8 COMMENT ='訂單表';
DROP TABLE IF EXISTS `test_product`;
CREATE TABLE `test_product`
(
`p_id` bigint NOT NULL AUTO_INCREMENT,
`p_name` varchar(50) NOT NULL COMMENT '商品名稱',
`p_num` bigint NOT NULL COMMENT '商品數量',
PRIMARY KEY (`p_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8 COMMENT ='商品表';
insert into test_product (p_name, p_num) values ("測試商品1", 100);
- 資料源的注冊;分布式事務管理器的注冊;druid的注冊
package com.example.jpaxa.config;
import com.alibaba.druid.filter.stat.StatFilter;
import com.alibaba.druid.support.http.StatViewServlet;
import com.alibaba.druid.support.http.WebStatFilter;
import com.alibaba.druid.wall.WallConfig;
import com.alibaba.druid.wall.WallFilter;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.sql.DataSource;
import javax.transaction.UserTransaction;
import java.util.Properties;
/**
* created on 2021/4/21
*資料源的注冊;分布式事務管理器的注冊;druid的注冊;
* @author 一毛錢的魅力
**/
@Configuration
public class DruidConfig {
@Bean(name = "systemDataSource")
@Primary
@Autowired
public DataSource systemDataSource(Environment env) {
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
Properties prop = build(env, "spring.datasource.druid.systemDB.");
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName("systemDB");
ds.setPoolSize(5);
ds.setXaProperties(prop);
return ds;
}
@Autowired
@Bean(name = "businessDataSource")
public AtomikosDataSourceBean businessDataSource(Environment env) {
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
Properties prop = build(env, "spring.datasource.druid.businessDB.");
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName("businessDB");
ds.setPoolSize(5);
ds.setXaProperties(prop);
return ds;
}
/**
* 注入事物管理器
* @return
*/
@Bean(name = "xatx")
public JtaTransactionManager regTransactionManager () {
UserTransactionManager userTransactionManager = new UserTransactionManager();
UserTransaction userTransaction = new UserTransactionImp();
return new JtaTransactionManager(userTransaction, userTransactionManager);
}
private Properties build(Environment env, String prefix) {
Properties prop = new Properties();
prop.put("url", env.getProperty(prefix + "url"));
prop.put("username", env.getProperty(prefix + "username"));
prop.put("password", env.getProperty(prefix + "password"));
prop.put("driverClassName", env.getProperty(prefix + "driverClassName", ""));
prop.put("initialSize", env.getProperty(prefix + "initialSize", Integer.class));
prop.put("maxActive", env.getProperty(prefix + "maxActive", Integer.class));
prop.put("minIdle", env.getProperty(prefix + "minIdle", Integer.class));
prop.put("maxWait", env.getProperty(prefix + "maxWait", Integer.class));
prop.put("poolPreparedStatements", env.getProperty(prefix + "poolPreparedStatements", Boolean.class));
prop.put("maxPoolPreparedStatementPerConnectionSize",
env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));
prop.put("maxPoolPreparedStatementPerConnectionSize",
env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));
prop.put("validationQuery", env.getProperty(prefix + "validationQuery"));
prop.put("validationQueryTimeout", env.getProperty(prefix + "validationQueryTimeout", Integer.class));
prop.put("testOnBorrow", env.getProperty(prefix + "testOnBorrow", Boolean.class));
prop.put("testOnReturn", env.getProperty(prefix + "testOnReturn", Boolean.class));
prop.put("testWhileIdle", env.getProperty(prefix + "testWhileIdle", Boolean.class));
prop.put("timeBetweenEvictionRunsMillis",
env.getProperty(prefix + "timeBetweenEvictionRunsMillis", Integer.class));
prop.put("minEvictableIdleTimeMillis", env.getProperty(prefix + "minEvictableIdleTimeMillis", Integer.class));
prop.put("filters", env.getProperty(prefix + "filters"));
return prop;
}
@Bean
public ServletRegistrationBean druidServlet() {
ServletRegistrationBean servletRegistrationBean = new ServletRegistrationBean(new StatViewServlet(), "/druid/*");
//控制台管理使用者,加入下面2行 進入druid背景就需要登入
//servletRegistrationBean.addInitParameter("loginUsername", "admin");
//servletRegistrationBean.addInitParameter("loginPassword", "admin");
return servletRegistrationBean;
}
@Bean
public FilterRegistrationBean filterRegistrationBean() {
FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean();
filterRegistrationBean.setFilter(new WebStatFilter());
filterRegistrationBean.addUrlPatterns("/*");
filterRegistrationBean.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");
filterRegistrationBean.addInitParameter("profileEnable", "true");
return filterRegistrationBean;
}
@Bean
public StatFilter statFilter(){
StatFilter statFilter = new StatFilter();
statFilter.setLogSlowSql(true); //slowSqlMillis用來配置SQL慢的标準,執行時間超過slowSqlMillis的就是慢。
statFilter.setMergeSql(true); //SQL合并配置
statFilter.setSlowSqlMillis(1000);//slowSqlMillis的預設值為3000,也就是3秒。
return statFilter;
}
@Bean
public WallFilter wallFilter(){
WallFilter wallFilter = new WallFilter();
//允許執行多條SQL
WallConfig config = new WallConfig();
config.setMultiStatementAllow(true);
wallFilter.setConfig(config);
return wallFilter;
}
}
- SqlSessionFactoryConfig
package com.example.jpaxa.config;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import javax.sql.DataSource;
/**
* created on 2021/4/21
*
* @author 一毛錢的魅力
**/
@Configuration
@MapperScan(basePackages = "com.example.jpaxa.mapper", sqlSessionFactoryRef = "sqlSessionFactory")
public class SqlSessionFactoryConfig {
@Autowired
@Qualifier("systemDataSource")
private DataSource ds;
@Bean
public SqlSessionFactory sqlSessionFactory() throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(ds);
//指定mapper xml目錄
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
factoryBean.setMapperLocations(resolver.getResources("classpath:mapper/*.xml"));
return factoryBean.getObject();
}
@Bean
public SqlSessionTemplate sqlSessionTemplate() throws Exception {
SqlSessionTemplate template = new SqlSessionTemplate(sqlSessionFactory()); // 使用上面配置的Factory
return template;
}
//關于事務管理器,不管是JPA還是JDBC等都實作自接口 PlatformTransactionManager
// 如果你添加的是 spring-boot-starter-jdbc 依賴,架構會預設注入 DataSourceTransactionManager 執行個體。
//在Spring容器中,我們手工注解@Bean 将被優先加載,架構不會重新執行個體化其他的 PlatformTransactionManager 實作類。
/*@Bean(name = "transactionManager")
@Primary
public DataSourceTransactionManager masterTransactionManager() {
//MyBatis自動參與到spring事務管理中,無需額外配置,隻要org.mybatis.spring.SqlSessionFactoryBean引用的資料源
// 與DataSourceTransactionManager引用的資料源一緻即可,否則事務管理會不起作用。
return new DataSourceTransactionManager(ds);
}*/
}
- SqlSessionFactory2Config
package com.example.jpaxa.config;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import javax.sql.DataSource;
/**
* created on 2021/4/21
*
* @author 一毛錢的魅力
**/
@Configuration
@MapperScan(basePackages = "com.example.jpaxa.mapper2", sqlSessionFactoryRef = "sqlSessionFactory2")
public class SqlSessionFactory2Config {
@Autowired
@Qualifier("businessDataSource")
private DataSource ds;
@Bean
public SqlSessionFactory sqlSessionFactory2() throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(ds);
//指定mapper xml目錄
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
factoryBean.setMapperLocations(resolver.getResources("classpath:mapper2/*.xml"));
return factoryBean.getObject();
}
@Bean
public SqlSessionTemplate sqlSessionTemplate2() throws Exception {
SqlSessionTemplate template = new SqlSessionTemplate(sqlSessionFactory2()); // 使用上面配置的Factory
return template;
}
//關于事務管理器,不管是JPA還是JDBC等都實作自接口 PlatformTransactionManager
// 如果你添加的是 spring-boot-starter-jdbc 依賴,架構會預設注入 DataSourceTransactionManager 執行個體。
//在Spring容器中,我們手工注解@Bean 将被優先加載,架構不會重新執行個體化其他的 PlatformTransactionManager 實作類。
/*@Bean(name = "transactionManager")
@Primary
public DataSourceTransactionManager masterTransactionManager() {
//MyBatis自動參與到spring事務管理中,無需額外配置,隻要org.mybatis.spring.SqlSessionFactoryBean引用的資料源
// 與DataSourceTransactionManager引用的資料源一緻即可,否則事務管理會不起作用。
return new DataSourceTransactionManager(ds);
}*/
}
- OrderService
package com.example.jpaxa.service;
import com.example.jpaxa.mapper.OrderMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* created on 2021/4/21
*
* @author 一毛錢的魅力
**/
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
public int addOrder(String name) {
return orderMapper.addOrder(name);
}
}
- ProductService
package com.example.jpaxa.service;
import com.example.jpaxa.mapper2.ProductMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* created on 2021/4/21
*
* @author 一毛錢的魅力
**/
@Service
public class ProductService {
@Autowired
private ProductMapper productMapper;
public int updProduct() {
return productMapper.updProduct();
}
}
- TestJpaService
package com.example.jpaxa.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.Random;
/**
* created on 2021/4/21
*
* @author 一毛錢的魅力
**/
@Slf4j
@Service
public class TestJpaService {
@Autowired
private OrderService orderService;
@Autowired
private ProductService productService;
//生成訂單并扣減商品資料量
@Transactional(transactionManager = "xatx", propagation = Propagation.REQUIRED, rollbackFor = { java.lang.RuntimeException.class })
public void createOrder(){
//建立訂單
orderService.addOrder("商品"+ new Random().nextInt());
//扣減商品資料量
productService.updProduct();
int i=1/0;
log.info("單應用多資料庫,分布式事務同步成功");
}
}
- JpaXaApplicationTests
package com.example.jpaxa;
import com.example.jpaxa.service.TestJpaService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class JpaXaApplicationTests {
@Autowired
private TestJpaService testJpaService;
@Test
void contextLoads() {
testJpaService.createOrder();
}
}
- application.yml
spring:
datasource:
type: com.alibaba.druid.pool.xa.DruidXADataSource
druid:
systemDB:
name: systemDB
url: jdbc:mysql://localhost:3306/order-db?serverTimezone=UTC&autoReconnect=true&useUnicode=true&characterEncoding=UTF-8
username: root
password: 123456
# 下面為連接配接池的補充設定,應用到上面所有資料源中
# 初始化大小,最小,最大
initialSize: 5
minIdle: 5
maxActive: 20
# 配置擷取連接配接等待逾時的時間
maxWait: 60000
# 配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接配接,機關是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一個連接配接在池中最小生存的時間,機關是毫秒
minEvictableIdleTimeMillis: 30
validationQuery: SELECT 1
validationQueryTimeout: 10000
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
# 打開PSCache,并且指定每個連接配接上PSCache的大小
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall
# 通過connectProperties屬性來打開mergeSql功能;慢SQL記錄
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
# 合并多個DruidDataSource的監控資料
useGlobalDataSourceStat: true
businessDB:
name: businessDB
url: jdbc:mysql://localhost:3306/product-db?serverTimezone=UTC&autoReconnect=true&useUnicode=true&characterEncoding=UTF-8
username: root
password: 123456
# 下面為連接配接池的補充設定,應用到上面所有資料源中
# 初始化大小,最小,最大
initialSize: 5
minIdle: 5
maxActive: 20
# 配置擷取連接配接等待逾時的時間
maxWait: 60000
# 配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接配接,機關是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一個連接配接在池中最小生存的時間,機關是毫秒
minEvictableIdleTimeMillis: 30
validationQuery: SELECT 1
validationQueryTimeout: 10000
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
# 打開PSCache,并且指定每個連接配接上PSCache的大小
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall
# 通過connectProperties屬性來打開mergeSql功能;慢SQL記錄
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
# 合并多個DruidDataSource的監控資料
useGlobalDataSourceStat: true
#jta相關參數配置
jta:
log-dir: classpath:tx-logs
transaction-manager-id: txManager