天天看點

多資料源事務處理-涉及分布式事務

一. 資料源跨庫但是不跨 MySql 執行個體

這個形式就是資料源在同一個 MySQL 下,但是 jdbc-url 上的資料庫配置不同,涉及多個資料庫時,如果方法中發生異常,隻有開啟事務的資料源會發生復原,其他資料源不會復原。看到這裡可能有點迷惑,什麼是 隻有開啟事務的資料源會發生復原,其他資料源不會復原?

下面給出代碼驗證:

主資料源配置

@Slf4j
@EnableTransactionManagement
@EnableAspectJAutoProxy
@Configuration
@MapperScan(basePackages = "ltd.newbee.mall.core.dao", sqlSessionFactoryRef = "masterSqlSessionFactory")
public class Db1DataSourceConfig {

    @Primary
    @Bean
    @ConfigurationProperties("spring.datasource.druid.master")
    public DataSource masterDataSource(DruidProperties druidProperties) {
        DruidDataSource build = DruidDataSourceBuilder.create().build();
        return druidProperties.dataSource(build);
    }

    /**
     * @param datasource 資料源
     * @return SqlSessionFactory
     * @Primary 預設SqlSessionFactory
     */
    @Primary
    @Bean(name = "masterSqlSessionFactory")
    public SqlSessionFactory masterSqlSessionFactory(@Qualifier("masterDataSource") DataSource datasource,
                                                     Interceptor interceptor) throws Exception {
        MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
        bean.setDataSource(datasource);
        // mybatis掃描xml所在位置
        bean.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath*:mapper/*.xml"));
        bean.setTypeAliasesPackage("ltd.**.core.entity");
        bean.setPlugins(interceptor);
        GlobalConfig globalConfig = new GlobalConfig();
        GlobalConfig.DbConfig dbConfig = new GlobalConfig.DbConfig();
        dbConfig.setLogicDeleteField("isDeleted");
        dbConfig.setLogicDeleteValue("1");
        dbConfig.setLogicNotDeleteValue("0");
        globalConfig.setDbConfig(dbConfig);
        bean.setGlobalConfig(globalConfig);
        log.info("masterDataSource 配置成功");
        return bean.getObject();
    }

    @Primary
    @Bean(name = "masterTransactionManager")
    public DataSourceTransactionManager masterTransactionManager(@Qualifier("masterDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

}
複制代碼           

從資料源配置

@Slf4j
@ConditionalOnProperty(value = "transactional.mode", havingValue = "seata")
@EnableTransactionManagement
@EnableAspectJAutoProxy
@Configuration
@MapperScan(basePackages = "ltd.newbee.mall.slave.dao", sqlSessionFactoryRef = "slaveSqlSessionFactory")
public class Db2DataSourceConfig {

    @Bean
    @ConfigurationProperties("spring.datasource.druid.slave")
    public DataSource slaveDataSource(DruidProperties druidProperties) {
        DruidDataSource build = DruidDataSourceBuilder.create().build();
        return druidProperties.dataSource(build);
    }


    /**
     * @param datasource 資料源
     * @return SqlSessionFactory
     * @Primary 預設SqlSessionFactory
     */
    @Bean(name = "slaveSqlSessionFactory")
    public SqlSessionFactory slaveSqlSessionFactory(@Qualifier("slaveDataSource") DataSource datasource,
                                                    Interceptor interceptor) throws Exception {
        MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
        bean.setDataSource(datasource);
        // mybatis掃描xml所在位置
        bean.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath*:slavemapper/*.xml"));
        bean.setTypeAliasesPackage("ltd.**.slave.entity");
        bean.setPlugins(interceptor);
        GlobalConfig globalConfig = new GlobalConfig();
        GlobalConfig.DbConfig dbConfig = new GlobalConfig.DbConfig();
        dbConfig.setLogicDeleteField("isDeleted");
        dbConfig.setLogicDeleteValue("1");
        dbConfig.setLogicNotDeleteValue("0");
        globalConfig.setDbConfig(dbConfig);
        bean.setGlobalConfig(globalConfig);
        log.info("slaveDataSource 配置成功");
        return bean.getObject();
    }
    
    @Bean(name = "slaveTransactionManager")
    public DataSourceTransactionManager slaveTransactionManager(@Qualifier("slaveDataSource") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

}
複制代碼           

劃重點-上述代碼在每個資料源中都配置了 DataSourceTransactionManager(事務管理器),并且在主配置中添加 @Primary 注解,表示預設事務管理器優先使用主資料源的事務管理器。 下面給出測試代碼:

/**
 *  Springboot測試類
 */
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class MultiDataSourceTest {
    @Autowired
    private MultiDataService multiDataService;
    @Test
    public void testRollback() {
        multiDataService.testRollback();
    }
}
/**
 *  MultiDataService實作類
 */
@Slf4j
@Service
public class MultiDataServiceImpl implements MultiDataService {
    @Autowired
    private TbTable1Service tbTable1Service;
    @Autowired
    private TbTable2Service tbTable2Service;
    @Autowired
    private PlatformTransactionManager transactionManager;
    @Override
    public void testRollback() {
        DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
        TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);
        try {
            TbTable1 tbTable1 = new TbTable1();
            tbTable1.setName("test1");
            // 插入table1表
            boolean save1 = tbTable1Service.save(tbTable1);
            TbTable2 tbTable2 = new TbTable2();
            tbTable2.setName("test2");
            // 插入table2表
            boolean save2 = tbTable2Service.save(tbTable2);
            int i = 1 / 0;
            transactionManager.commit(transaction);
            Assert.isTrue(save1 && save2);
        } catch (Exception e) {
            log.info(e.getMessage(), e);
            transactionManager.rollback(transaction);
        }
    }
}
複制代碼           

執行結果:table1表復原成功,table2表復原失敗。由此結果,對于 隻有開啟事務的資料源會發生復原,其他資料源不會復原? 我們的解釋就是 Spring 中預設使用的事務管理器是使用主資料源配置還是從資料源配置由我們通過 @Primary 決定,當我們把 @Primary 切換在從資料源配置上,執行結果:table2表復原成功,table1表復原失敗。那怎麼解決這個問題?

當涉及到跨庫或者跨 MySQL 執行個體,想要保證事務操作,我們這裡先給出XA事務解決方案。附 XA 事務的說明:

XA 是由 X/Open 組織提出的分布式事務規範,XA 規範主要定義了事務協調者(Transaction Manager)和資料總管(Resource Manager)之間的接口。

事務協調者(Transaction Manager),因為 XA 事務是基于兩階段送出協定的,是以需要有一個協調者,來保證所有的事務參與者都完成了準備工作,也就是 2PC 的第一階段。如果事務協調者收到所有參與者都準備好的消息,就會通知所有的事務都可以送出,也就是 2PC 的第二階段。

資料總管(Resource Manager),負責控制和管理實際資源,比如資料庫。

(劃重點)XA 的 MySQL 實作使 MySQL 伺服器能夠充當資料總管,在全局事務中處理 XA 事務。連接配接到 MySQL 伺服器的用戶端程式充當事務協調者

XA 事務的執行流程

XA 事務是兩階段送出的一種實作方式,根據 2PC 的規範,XA 将一次事務分割成了兩個階段,即 Prepare 和 Commit 階段。

Prepare 階段,TM 向所有 RM 發送 prepare 指令,RM 接受到指令後,執行資料修改和日志記錄等操作,然後傳回可以送出或者不送出的消息給 TM。如果事務協調者 TM 收到所有參與者都準備好的消息,會通知所有的事務送出,然後進入第二階段。

Commit 階段,TM 接受到所有 RM 的 prepare 結果,如果有 RM 傳回是不可送出或者逾時,那麼向所有 RM 發送 Rollback 指令;如果所有 RM 都傳回可以送出,那麼向所有 RM 發送 Commit 指令,完成一次事務操作。

下面給出兩種基于 XA 事務的解決方案:

  • Springboot 項目中可以使用 jta,完成對 XA 協定的支援,缺點就是 jta 需要改造資料源配置
  • Springboot 項目引入 seata ,seata 支援 XA 協定,且引入 seata-spring-boot-starter 依賴對業務無侵入,缺點需要引入 seata-server 降低了系統可用性

Springboot 項目中可以啟用 jta

  1. 引入 spring-boot-starter-jta-atomikos
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
複制代碼           
  1. 修改主從資料源 DataSource 配置,進行包裝添加 XA 資料源支援,如下;
@Primary
    @Bean
    @ConfigurationProperties("spring.datasource.druid.master")
    public DataSource dataSource(DruidProperties druidProperties) {
        DruidXADataSource dataSource = druidProperties.dataSource(new DruidXADataSource());
        dataSource.setUrl("jdbc:mysql://localhost:3306/xxx?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8");
        dataSource.setUsername("root");
        dataSource.setPassword("");
        dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        atomikosDataSourceBean.setUniqueResourceName("master-xa");
        atomikosDataSourceBean.setXaDataSource(dataSource);
        return atomikosDataSourceBean;
    }
複制代碼           
  1. 添加 JtaTransactionManager
@Bean
public JtaTransactionManager transactionManager() throws Exception {
    JtaTransactionManager transactionManager = new JtaTransactionManager();
    UserTransactionManager userTransactionManager = new UserTransactionManager();
    userTransactionManager.setForceShutdown(true);
    userTransactionManager.setTransactionTimeout(3000);
    transactionManager.setUserTransaction(userTransactionManager);
    transactionManager.setAllowCustomIsolationLevels(true);
    return transactionManager;
}
複制代碼           
  1. 完成測試,代碼如下:
/**
 *  Springboot測試類
 */
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class MultiDataSourceTest {
    @Autowired
    private MultiDataService multiDataService;
    @Test
    public void jtaTestRollback() {
        multiDataService.jtaTestRollback();
    }
}
/**
 *  MultiDataService實作類
 */
@Slf4j
@Service
public class MultiDataServiceImpl implements MultiDataService {
    @Autowired
    private TbTable1Service tbTable1Service;
    @Autowired
    private TbTable2Service tbTable2Service;
    @Autowired
    private JtaTransactionManager jtaTransactionManager;
    @Override
    public void jtaTestRollback() {
        DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
        TransactionStatus transaction = jtaTransactionManager.getTransaction(transactionDefinition);
        try {
            TbTable1 tbTable1 = new TbTable1();
            tbTable1.setName("test1");
            boolean save1 = tbTable1Service.save(tbTable1);
            TbTable2 tbTable2 = new TbTable2();
            tbTable2.setName("test2");
            boolean save2 = tbTable2Service.save(tbTable2);
            int i = 1 / 0;
            jtaTransactionManager.commit(transaction);
            Assert.isTrue(save1 && save2);
        } catch (Exception e) {
            log.info(e.getMessage(), e);
            jtaTransactionManager.rollback(transaction);
        }
    }
}
複制代碼           

可以看到我們使用的是 JtaTransactionManager, 執行結果:table1表復原成功,table2表復原成功。驗證OK

引入 seata,添加XA協定支援

  1. 下載下傳安裝啟動 seata-server,這裡給出官網教程:https://seata.io/zh-cn/docs/ops/deploy-guide-beginner.html

在 Springboot中引入seata最新依賴

<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>1.5.2</version>
</dependency>
複制代碼           
  1. 在yml檔案中添加 seata 配置
seata:
  config:
    type: file
  registry:
    type: file
  application-id: newbeemall # Seata 應用編号,預設為 ${spring.application.name}
  tx-service-group: newbeemall-group # Seata 事務組編号,用于 TC 叢集名
  # 服務配置項,對應 ServiceProperties 類
  service:
    # 虛拟組和分組的映射
    vgroup-mapping:
      newbeemall-group: default
    # 分組和 Seata 服務的映射
    grouplist:
      default: 127.0.0.1:8091
  data-source-proxy-mode: XA
  enabled: true
複制代碼           
  1. 完成測試,代碼如下:
/**
 *  Springboot測試類
 */
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class MultiDataSourceTest {
    @Autowired
    private MultiDataService multiDataService;
    @Test
    public void seataTestRollback() {
        multiDataService.seataTestRollback();
    }
}
/**
 *  MultiDataService實作類
 */
@Slf4j
@Service
public class MultiDataServiceImpl implements MultiDataService {
    @Autowired
    private TbTable1Service tbTable1Service;
    @Autowired
    private TbTable2Service tbTable2Service;
    @GlobalTransactional
    @Override
    public void seataTestRollback() {
        log.info("目前 XID: {}", RootContext.getXID());
        TbTable1 tbTable1 = new TbTable1();
        tbTable1.setName("test1");
        boolean save1 = tbTable1Service.save(tbTable1);
        TbTable2 tbTable2 = new TbTable2();
        tbTable2.setName("test2");
        boolean save2 = tbTable2Service.save(tbTable2);
        int i = 1 / 0;
    }
}
複制代碼           

如上代碼,使用 seata 時需要啟用 @GlobalTransactional 注解,并且在事務中傳遞 XID( RootContext.getXID()),執行結果:table1表復原成功,table2表復原成功。驗證OK

二. 資料源分布在不同 MySql 執行個體

當資料源分布在不同 MySql 執行個體時,這時候其實已經進入分布式事務的範疇,由上可知,XA 事務可以解決分布式環境下事務問題,也就是說上述最後兩種解決方案都可以解決分布式事務問題,但是實際使用過程中,我們建議使用 seata,理由是他不僅支援 XA 事務還支援 AT、Saga、TCC事務模型。引入 seata 官網介紹

Seata 是一款開源的分布式事務解決方案,緻力于提供高性能和簡單易用的分布式事務服務。Seata 将為使用者提供了 AT、TCC、SAGA 和 XA 事務模式,為使用者打造一站式的分布式解決方案。

總結

關于多資料源事務的問題,不管跨不跨庫其實都屬于分布式事務的問題。推薦使用 seata 解決。

多資料源事務處理-涉及分布式事務

繼續閱讀