天天看點

分布式事務_spring-boot整合jpa

前言

不啰嗦,直奔主題。。。

分布式事務解決方案目前主要有五種:

1. XA 方案

2. TCC 方案

3. 本地消息表

4. 可靠消息最終一緻性方案

5. 最大努力通知方案

今天主要記錄下代碼實測基于XA方案解決分布式事務的過程。

必要知識背景

事務四大特性:具備原子性、一緻性、隔離性和持久性,簡稱 ACID。

分布式概念:分布式事務可以了解為在分布式系統中實作事務,它其實是由多個本地事務組合而成。

XA方案:又稱2PC(Two-phase commit protocol),中文叫二階段送出,是基于資料庫層面實作的事務控制,分為兩個階段,第一個階段是準備階段,第二個階段是送出或復原。

JTA(java transaction api):JTA,即Java Transaction API,JTA允許應用程式執行分布式事務處理——在兩個或多個網絡計算機資源上通路并且更新資料。

JTA模型圖:

分布式事務_spring-boot整合jpa

spring-boot 整合jpa代碼示例:

  1. 工程概覽:
    分布式事務_spring-boot整合jpa
  2. 測試sql準備
DROP TABLE IF EXISTS `test_order`;
CREATE TABLE `test_order`
(`o_id`   bigint      NOT NULL AUTO_INCREMENT,
  `o_name` varchar(50) NOT NULL COMMENT '訂單名稱',
  PRIMARY KEY (`o_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8 COMMENT ='訂單表';

DROP TABLE IF EXISTS `test_product`;
CREATE TABLE `test_product`
(
  `p_id`   bigint      NOT NULL AUTO_INCREMENT,
  `p_name` varchar(50) NOT NULL COMMENT '商品名稱',
  `p_num`  bigint      NOT NULL COMMENT '商品數量',
  PRIMARY KEY (`p_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8 COMMENT ='商品表';
insert into test_product (p_name, p_num) values ("測試商品1", 100);
           
  1. 資料源的注冊;分布式事務管理器的注冊;druid的注冊
package com.example.jpaxa.config;

import com.alibaba.druid.filter.stat.StatFilter;
import com.alibaba.druid.support.http.StatViewServlet;
import com.alibaba.druid.support.http.WebStatFilter;
import com.alibaba.druid.wall.WallConfig;
import com.alibaba.druid.wall.WallFilter;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.sql.DataSource;
import javax.transaction.UserTransaction;
import java.util.Properties;


/**
 * created on 2021/4/21
 *資料源的注冊;分布式事務管理器的注冊;druid的注冊;
 * @author 一毛錢的魅力
 **/
@Configuration
public class DruidConfig {
    @Bean(name = "systemDataSource")
    @Primary
    @Autowired
    public DataSource systemDataSource(Environment env) {
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        Properties prop = build(env, "spring.datasource.druid.systemDB.");
        ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        ds.setUniqueResourceName("systemDB");
        ds.setPoolSize(5);
        ds.setXaProperties(prop);
        return ds;

    }

    @Autowired
    @Bean(name = "businessDataSource")
    public AtomikosDataSourceBean businessDataSource(Environment env) {

        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        Properties prop = build(env, "spring.datasource.druid.businessDB.");
        ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        ds.setUniqueResourceName("businessDB");
        ds.setPoolSize(5);
        ds.setXaProperties(prop);

        return ds;
    }


    /**
     * 注入事物管理器
     * @return
     */
    @Bean(name = "xatx")
    public JtaTransactionManager regTransactionManager () {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        UserTransaction userTransaction = new UserTransactionImp();
        return new JtaTransactionManager(userTransaction, userTransactionManager);
    }


    private Properties build(Environment env, String prefix) {

        Properties prop = new Properties();
        prop.put("url", env.getProperty(prefix + "url"));
        prop.put("username", env.getProperty(prefix + "username"));
        prop.put("password", env.getProperty(prefix + "password"));
        prop.put("driverClassName", env.getProperty(prefix + "driverClassName", ""));
        prop.put("initialSize", env.getProperty(prefix + "initialSize", Integer.class));
        prop.put("maxActive", env.getProperty(prefix + "maxActive", Integer.class));
        prop.put("minIdle", env.getProperty(prefix + "minIdle", Integer.class));
        prop.put("maxWait", env.getProperty(prefix + "maxWait", Integer.class));
        prop.put("poolPreparedStatements", env.getProperty(prefix + "poolPreparedStatements", Boolean.class));

        prop.put("maxPoolPreparedStatementPerConnectionSize",
                env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));

        prop.put("maxPoolPreparedStatementPerConnectionSize",
                env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));
        prop.put("validationQuery", env.getProperty(prefix + "validationQuery"));
        prop.put("validationQueryTimeout", env.getProperty(prefix + "validationQueryTimeout", Integer.class));
        prop.put("testOnBorrow", env.getProperty(prefix + "testOnBorrow", Boolean.class));
        prop.put("testOnReturn", env.getProperty(prefix + "testOnReturn", Boolean.class));
        prop.put("testWhileIdle", env.getProperty(prefix + "testWhileIdle", Boolean.class));
        prop.put("timeBetweenEvictionRunsMillis",
                env.getProperty(prefix + "timeBetweenEvictionRunsMillis", Integer.class));
        prop.put("minEvictableIdleTimeMillis", env.getProperty(prefix + "minEvictableIdleTimeMillis", Integer.class));
        prop.put("filters", env.getProperty(prefix + "filters"));

        return prop;
    }

    @Bean
    public ServletRegistrationBean druidServlet() {
        ServletRegistrationBean servletRegistrationBean = new ServletRegistrationBean(new StatViewServlet(), "/druid/*");

        //控制台管理使用者,加入下面2行 進入druid背景就需要登入
        //servletRegistrationBean.addInitParameter("loginUsername", "admin");
        //servletRegistrationBean.addInitParameter("loginPassword", "admin");
        return servletRegistrationBean;
    }

    @Bean
    public FilterRegistrationBean filterRegistrationBean() {
        FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean();
        filterRegistrationBean.setFilter(new WebStatFilter());
        filterRegistrationBean.addUrlPatterns("/*");
        filterRegistrationBean.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");
        filterRegistrationBean.addInitParameter("profileEnable", "true");
        return filterRegistrationBean;
    }

    @Bean
    public StatFilter statFilter(){
        StatFilter statFilter = new StatFilter();
        statFilter.setLogSlowSql(true); //slowSqlMillis用來配置SQL慢的标準,執行時間超過slowSqlMillis的就是慢。
        statFilter.setMergeSql(true); //SQL合并配置
        statFilter.setSlowSqlMillis(1000);//slowSqlMillis的預設值為3000,也就是3秒。
        return statFilter;
    }

    @Bean
    public WallFilter wallFilter(){
        WallFilter wallFilter = new WallFilter();
        //允許執行多條SQL
        WallConfig config = new WallConfig();
        config.setMultiStatementAllow(true);
        wallFilter.setConfig(config);
        return wallFilter;
    }

}

           
  1. SqlSessionFactoryConfig
package com.example.jpaxa.config;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;

import javax.sql.DataSource;


/**
 * created on 2021/4/21
 *
 * @author 一毛錢的魅力
 **/

@Configuration
@MapperScan(basePackages = "com.example.jpaxa.mapper", sqlSessionFactoryRef = "sqlSessionFactory")
public class SqlSessionFactoryConfig {

    @Autowired
    @Qualifier("systemDataSource")
    private DataSource ds;

    @Bean
    public SqlSessionFactory sqlSessionFactory() throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(ds);
        //指定mapper xml目錄
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        factoryBean.setMapperLocations(resolver.getResources("classpath:mapper/*.xml"));
        return factoryBean.getObject();

    }

    @Bean
    public SqlSessionTemplate sqlSessionTemplate() throws Exception {
        SqlSessionTemplate template = new SqlSessionTemplate(sqlSessionFactory()); // 使用上面配置的Factory
        return template;
    }

    //關于事務管理器,不管是JPA還是JDBC等都實作自接口 PlatformTransactionManager
    // 如果你添加的是 spring-boot-starter-jdbc 依賴,架構會預設注入 DataSourceTransactionManager 執行個體。
    //在Spring容器中,我們手工注解@Bean 将被優先加載,架構不會重新執行個體化其他的 PlatformTransactionManager 實作類。
    /*@Bean(name = "transactionManager")
    @Primary
    public DataSourceTransactionManager masterTransactionManager() {
        //MyBatis自動參與到spring事務管理中,無需額外配置,隻要org.mybatis.spring.SqlSessionFactoryBean引用的資料源
        // 與DataSourceTransactionManager引用的資料源一緻即可,否則事務管理會不起作用。
        return new DataSourceTransactionManager(ds);
    }*/
}

           
  1. SqlSessionFactory2Config
package com.example.jpaxa.config;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;

import javax.sql.DataSource;

/**
 * created on 2021/4/21
 *
 * @author 一毛錢的魅力
 **/
@Configuration
@MapperScan(basePackages = "com.example.jpaxa.mapper2", sqlSessionFactoryRef = "sqlSessionFactory2")
public class SqlSessionFactory2Config {
    @Autowired
    @Qualifier("businessDataSource")
    private DataSource ds;

    @Bean
    public SqlSessionFactory sqlSessionFactory2() throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(ds);
        //指定mapper xml目錄
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        factoryBean.setMapperLocations(resolver.getResources("classpath:mapper2/*.xml"));
        return factoryBean.getObject();

    }

    @Bean
    public SqlSessionTemplate sqlSessionTemplate2() throws Exception {
        SqlSessionTemplate template = new SqlSessionTemplate(sqlSessionFactory2()); // 使用上面配置的Factory
        return template;
    }

    //關于事務管理器,不管是JPA還是JDBC等都實作自接口 PlatformTransactionManager
    // 如果你添加的是 spring-boot-starter-jdbc 依賴,架構會預設注入 DataSourceTransactionManager 執行個體。
    //在Spring容器中,我們手工注解@Bean 将被優先加載,架構不會重新執行個體化其他的 PlatformTransactionManager 實作類。
    /*@Bean(name = "transactionManager")
    @Primary
    public DataSourceTransactionManager masterTransactionManager() {
        //MyBatis自動參與到spring事務管理中,無需額外配置,隻要org.mybatis.spring.SqlSessionFactoryBean引用的資料源
        // 與DataSourceTransactionManager引用的資料源一緻即可,否則事務管理會不起作用。
        return new DataSourceTransactionManager(ds);
    }*/
}

           
  1. OrderService
package com.example.jpaxa.service;

import com.example.jpaxa.mapper.OrderMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * created on 2021/4/21
 *
 * @author 一毛錢的魅力
 **/
@Service
public class OrderService {
    @Autowired
    private OrderMapper orderMapper;

    public int addOrder(String name) {
        return orderMapper.addOrder(name);
    }
}

           
  1. ProductService
package com.example.jpaxa.service;

import com.example.jpaxa.mapper2.ProductMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * created on 2021/4/21
 *
 * @author 一毛錢的魅力
 **/
@Service
public class ProductService {
    @Autowired
    private ProductMapper productMapper;

    public int updProduct() {
        return productMapper.updProduct();
    }
}

           
  1. TestJpaService
package com.example.jpaxa.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.Random;

/**
 * created on 2021/4/21
 *
 * @author 一毛錢的魅力
 **/
@Slf4j
@Service
public class TestJpaService {
    @Autowired
    private OrderService orderService;
    @Autowired
    private ProductService productService;

    //生成訂單并扣減商品資料量
    @Transactional(transactionManager = "xatx", propagation = Propagation.REQUIRED, rollbackFor = { java.lang.RuntimeException.class })
    public void createOrder(){
        //建立訂單
        orderService.addOrder("商品"+ new Random().nextInt());
        //扣減商品資料量
        productService.updProduct();
        int i=1/0;
        log.info("單應用多資料庫,分布式事務同步成功");

    }
}

           
  1. JpaXaApplicationTests
package com.example.jpaxa;

import com.example.jpaxa.service.TestJpaService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class JpaXaApplicationTests {
    @Autowired
    private TestJpaService testJpaService;

    @Test
    void contextLoads() {
        testJpaService.createOrder();
    }

}

           
  1. application.yml
spring:
  datasource:
    type: com.alibaba.druid.pool.xa.DruidXADataSource
    druid:

      systemDB:
        name: systemDB
        url: jdbc:mysql://localhost:3306/order-db?serverTimezone=UTC&autoReconnect=true&useUnicode=true&characterEncoding=UTF-8
        username: root
        password: 123456
        # 下面為連接配接池的補充設定,應用到上面所有資料源中
        # 初始化大小,最小,最大
        initialSize: 5
        minIdle: 5
        maxActive: 20
        # 配置擷取連接配接等待逾時的時間
        maxWait: 60000
        # 配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接配接,機關是毫秒
        timeBetweenEvictionRunsMillis: 60000
        # 配置一個連接配接在池中最小生存的時間,機關是毫秒
        minEvictableIdleTimeMillis: 30
        validationQuery: SELECT 1
        validationQueryTimeout: 10000
        testWhileIdle: true
        testOnBorrow: false
        testOnReturn: false
        # 打開PSCache,并且指定每個連接配接上PSCache的大小
        poolPreparedStatements: true
        maxPoolPreparedStatementPerConnectionSize: 20
        filters: stat,wall
        # 通過connectProperties屬性來打開mergeSql功能;慢SQL記錄
        connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
        # 合并多個DruidDataSource的監控資料
        useGlobalDataSourceStat: true

      businessDB:
        name: businessDB

        url: jdbc:mysql://localhost:3306/product-db?serverTimezone=UTC&autoReconnect=true&useUnicode=true&characterEncoding=UTF-8
        username: root
        password: 123456
        # 下面為連接配接池的補充設定,應用到上面所有資料源中
        # 初始化大小,最小,最大
        initialSize: 5
        minIdle: 5
        maxActive: 20
        # 配置擷取連接配接等待逾時的時間
        maxWait: 60000
        # 配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接配接,機關是毫秒
        timeBetweenEvictionRunsMillis: 60000
        # 配置一個連接配接在池中最小生存的時間,機關是毫秒
        minEvictableIdleTimeMillis: 30
        validationQuery: SELECT 1
        validationQueryTimeout: 10000
        testWhileIdle: true
        testOnBorrow: false
        testOnReturn: false
        # 打開PSCache,并且指定每個連接配接上PSCache的大小
        poolPreparedStatements: true
        maxPoolPreparedStatementPerConnectionSize: 20
        filters: stat,wall
        # 通過connectProperties屬性來打開mergeSql功能;慢SQL記錄
        connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
        # 合并多個DruidDataSource的監控資料
        useGlobalDataSourceStat: true

  #jta相關參數配置
  jta:
    log-dir: classpath:tx-logs
    transaction-manager-id: txManager