前言:
springboot整合多資料源,大家肯定不陌生,方式不一,但是相信大家整合多資料源,如果涉及到事務,都會非常煩惱,多資料源的事務不是沖突就是失效,而如今網上千篇一律的老年部落格,想找到真正解決問題的,非常少。是以我決定出來分享下可用的整合方案,而且是從頭到尾的那種。
這一篇我選擇的是以AOP注解的方式去進行資料源的動态切換,順帶整合jta-atomikos把煩人的事務問題解決調,持久層架構用mybatis,資料庫連接配接池使用druid,這些在我們周圍目前使用比較多,友善大家根據項目實際需求,能在這個腳手架上進行進一步的擴充(能擴充什麼?也可以看看我的springboot專欄,說不定會有額外的收獲)
這篇篇幅可能較長,但是跟着我全部代碼流程走完,你以後就可以把這個作為多資料源+分布式事務的腳手架,以後對于多資料源相關的事務問題,對你來說就不是問題。
接下來,我們開始整合。
先看下項目目錄結構,大緻能了解到我們這個實戰整合做了些什麼。

先準備兩個資料源,
創個user表用于後面使用,
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`username` varchar(255) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
pom.xml:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--分布式事務-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
<!-- springboot2.1.x版本預設的mysql-connector-java 版本比較高 8.0.x ,需要降低版本-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
</dependency>
<!--Druid連接配接池-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.9</version>
</dependency>
<!--aop starter-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!--整合mybatis-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<!--調試-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
然後是資料源的yml資訊,application.yml:
server:
port: 8077
spring:
application:
name: jta-dbsource
datasource:
druid:
mydbone:
url: jdbc:mysql://localhost:3306/mydbone?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8&pinGlobalTxToPhysicalConnection=true&autoReconnect=true
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
# 初始化時建立實體連接配接的個數。初始化發生在顯示調用 init 方法,或者第一次 getConnection 時
initialSize: 5
# 最小連接配接池數量
minIdle: 5
# 最大連接配接池數量
maxActive: 10
# 擷取連接配接時最大等待時間,機關毫秒。配置了 maxWait 之後,預設啟用公平鎖,并發效率會有所下降,如果需要可以通過配置 useUnfairLock 屬性為 true 使用非公平鎖。
maxWait: 60000
# Destroy 線程會檢測連接配接的間隔時間,如果連接配接空閑時間大于等于 minEvictableIdleTimeMillis 則關閉實體連接配接。
timeBetweenEvictionRunsMillis: 60000
# 連接配接保持空閑而不被驅逐的最小時間
minEvictableIdleTimeMillis: 300000
# 用來檢測連接配接是否有效的 sql 因資料庫方言而異, 例如 oracle 應該寫成 SELECT 1 FROM DUAL
validationQuery: SELECT 1
# 建議配置為 true,不影響性能,并且保證安全性。申請連接配接的時候檢測,如果空閑時間大于 timeBetweenEvictionRunsMillis,執行 validationQuery 檢測連接配接是否有效。
testWhileIdle: true
# 申請連接配接時執行 validationQuery 檢測連接配接是否有效,做了這個配置會降低性能。
testOnBorrow: false
# 歸還連接配接時執行 validationQuery 檢測連接配接是否有效,做了這個配置會降低性能。
testOnReturn: false
# 是否自動回收逾時連接配接
removeAbandoned: false
# 逾時時間 (以秒數為機關)
remove-abandoned-timeout: 1800
logAbandoned: true
pinGlobalTxToPhysicalConnection: true
mydbtwo:
url: jdbc:mysql://localhost:3306/mydbtwo?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8&pinGlobalTxToPhysicalConnection=true&autoReconnect=true
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 6
minIdle: 6
maxActive: 10
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
removeAbandoned: false
remove-abandoned-timeout: 1800
logAbandoned: true
pinGlobalTxToPhysicalConnection: true
# WebStatFilter 用于采集 web-jdbc 關聯監控的資料。
web-stat-filter:
# 是否開啟 WebStatFilter 預設是 true
enabled: true
# 需要攔截的 url
url-pattern: /*
# 排除靜态資源的請求
exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*"
# Druid 内置提供了一個 StatViewServlet 用于展示 Druid 的統計資訊。
stat-view-servlet:
#是否啟用 StatViewServlet 預設值 true
enabled: true
# 需要攔截的 url
url-pattern: /druid/*
# 允許清空統計資料
reset-enable: true
login-username: myname
login-password: mypwd
-----接下來就是代碼環節-----
大家多注意看注釋,很多關鍵資訊都用注釋方式進行了簡明的介紹:
先建立一個自定義注解,DataSource.java:
import java.lang.annotation.*;
/**
* @Author : JCccc
* @CreateTime : 2019/8/28
* @Description :
**/
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface DataSource {
String value() default DataSourceNames.ONE;
}
然後是建立 DataSourceNames.java,用于簡單資料源命名:
/**
* @Author : JCccc
* @CreateTime : 2019/8/28
* @Description :
**/
public interface DataSourceNames {
String ONE = "ONE";
String TWO = "TWO";
}
ps:其實這些都是我之前aop切換資料源的時候敲的,大概8月份的時候,這次我相當于在這個基礎上着重解決事務問題
然後是将自定義注解作為切點,進行aop方式動态切換邏輯補全,建立DynamicDataSourceAspect.java:
import com.test.jtadbsource.dbConfig.DataSourceContextHolder;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
/**
* @Author : JCccc
* @CreateTime : 2019/12/10
* @Description :
**/
@Aspect
@Component
public class DynamicDataSourceAspect {
protected Logger logger = LoggerFactory.getLogger(getClass());
/**
* 切點: 所有配置 DataSource 注解的方法
*/
@Pointcut("@annotation(com.test.jtadbsource.dbAop.DataSource)")
public void dataSourcePointCut() {}
@Around("dataSourcePointCut()")
public Object around(ProceedingJoinPoint point) throws Throwable {
DataSource ds = null;
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
//擷取自定義注解
ds = method.getAnnotation(DataSource.class);
if (ds == null) {
//如果監測到自定義注解不存在,那麼預設切換到資料源 mydbone
DataSourceContextHolder.setDataSourceKey(DataSourceNames.ONE);
logger.info("set default datasource is " + DataSourceNames.ONE);
} else {
//自定義存在,則按照注解的值去切換資料源
DataSourceContextHolder.setDataSourceKey(ds.value());
logger.info("set datasource is " + ds.value());
}
return point.proceed();
}
@After(value = "dataSourcePointCut()")
public void afterSwitchDS(JoinPoint point) {
DataSourceContextHolder.clearDataSourceKey();
logger.info("clean datasource");
}
}
上面用到的DataSourceContextHolder.java:
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
/**
* @Author : JCccc
* @CreateTime : 2019/12/10
* @Description :
**/
public class DataSourceContextHolder extends AbstractRoutingDataSource {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
// 設定資料源名
public static void setDataSourceKey(String dbName) {
contextHolder.set(dbName);
}
// 擷取資料源名
public static String getDataSourceKey() {
return contextHolder.get();
}
// 清除資料源名
public static void clearDataSourceKey() {
contextHolder.remove();
}
@Override
protected Object determineCurrentLookupKey() {
return getDataSourceKey();
}
}
ok,到這裡,基本的動态切換邊框的東西都完畢了,接下來是比較核心的:
1. DataSourceFactory.java :
用于 不同的資料源DataSource的資訊配置,使用DruidXADataSource建立,支援jta事務;
将不同資料源DataSource分别都關聯上對應的AtomikosDataSourceBean,這樣事務能提取到JTA事務管理器;
重寫資料源會話工廠,為每個資料源單獨配置一個。
配置重寫的sqlSessionTemplate,将實際使用的不同資料源的sqlsession和spring的事務機制關聯起來。
import com.alibaba.druid.pool.xa.DruidXADataSource;
import com.test.jtadbsource.dbAop.DataSourceNames;
import org.apache.ibatis.logging.stdout.StdOutImpl;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
/**
* @Author : JCccc
* @CreateTime : 2019/12/10
* @Description :多資料源配置
**/
@Configuration
@MapperScan(basePackages = DataSourceFactory.BASE_PACKAGES, sqlSessionTemplateRef = "sqlSessionTemplate")
public class DataSourceFactory {
static final String BASE_PACKAGES = "com.test.jtadbsource.mapper";
private static final String MAPPER_LOCATION = "classpath:mybatis/mapper/*.xml";
/***
* 建立 DruidXADataSource mydbone 用@ConfigurationProperties 自動配置屬性
*/
@Bean
@ConfigurationProperties("spring.datasource.druid.mydbone")
public DataSource druidDataSourceOne() {
return new DruidXADataSource();
}
/***
* 建立 DruidXADataSource mydbtwo
*/
@Bean
@ConfigurationProperties("spring.datasource.druid.mydbtwo")
public DataSource druidDataSourceTwo() {
return new DruidXADataSource();
}
/**
* 建立支援 XA 事務的 Atomikos 資料源 mydbone
*/
@Bean
public DataSource dataSourceOne(DataSource druidDataSourceOne) {
AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
sourceBean.setXaDataSource((DruidXADataSource) druidDataSourceOne);
// 必須為資料源指定唯一辨別
sourceBean.setPoolSize(5);
sourceBean.setTestQuery("SELECT 1");
sourceBean.setUniqueResourceName("mydbone");
return sourceBean;
}
/**
* 建立支援 XA 事務的 Atomikos 資料源 mydbtwo
*/
@Bean
public DataSource dataSourceTwo(DataSource druidDataSourceTwo) {
AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
sourceBean.setXaDataSource((DruidXADataSource) druidDataSourceTwo);
sourceBean.setPoolSize(5);
sourceBean.setTestQuery("SELECT 1");
sourceBean.setUniqueResourceName("mydbtwo");
return sourceBean;
}
/**
* @param dataSourceOne 資料源 mydbone
* @return 資料源 mydbone 的會話工廠
*/
@Bean
public SqlSessionFactory sqlSessionFactoryOne(DataSource dataSourceOne)
throws Exception {
return createSqlSessionFactory(dataSourceOne);
}
/**
* @param dataSourceTwo 資料源 mydbtwo
* @return 資料源 mydbtwo 的會話工廠
*/
@Bean
public SqlSessionFactory sqlSessionFactoryTwo(DataSource dataSourceTwo)
throws Exception {
return createSqlSessionFactory(dataSourceTwo);
}
/***
* sqlSessionTemplate 與 Spring 事務管理一起使用,以確定使用的實際 SqlSession 是與目前 Spring 事務關聯的,
* 此外它還管理會話生命周期,包括根據 Spring 事務配置根據需要關閉,送出或復原會話
* @param sqlSessionFactoryOne 資料源 mydbone
* @param sqlSessionFactoryTwo 資料源 mydbtwo
*/
@Bean
public CustomSqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactoryOne, SqlSessionFactory sqlSessionFactoryTwo) {
Map<Object, SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
sqlSessionFactoryMap.put(DataSourceNames.ONE, sqlSessionFactoryOne);
sqlSessionFactoryMap.put(DataSourceNames.TWO, sqlSessionFactoryTwo);
CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(sqlSessionFactoryOne);
customSqlSessionTemplate.setTargetSqlSessionFactories(sqlSessionFactoryMap);
return customSqlSessionTemplate;
}
/***
* 自定義會話工廠
* @param dataSource 資料源
* @return :自定義的會話工廠
*/
private SqlSessionFactory createSqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSource);
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
//配置駝峰命名
configuration.setMapUnderscoreToCamelCase(true);
//配置sql日志
configuration.setLogImpl(StdOutImpl.class);
factoryBean.setConfiguration(configuration);
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
//配置讀取mapper.xml路徑
factoryBean.setMapperLocations(resolver.getResources(MAPPER_LOCATION));
return factoryBean.getObject();
}
}
上面用到的自定義CustomSqlSessionTemplate (重寫SqlSessionTemplate):
import static java.lang.reflect.Proxy.newProxyInstance;
import static org.apache.ibatis.reflection.ExceptionUtil.unwrapThrowable;
import static org.mybatis.spring.SqlSessionUtils.closeSqlSession;
import static org.mybatis.spring.SqlSessionUtils.getSqlSession;
import static org.mybatis.spring.SqlSessionUtils.isSqlSessionTransactional;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.util.List;
import java.util.Map;
import org.apache.ibatis.exceptions.PersistenceException;
import org.apache.ibatis.executor.BatchResult;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.MyBatisExceptionTranslator;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.util.Assert;
public class CustomSqlSessionTemplate extends SqlSessionTemplate {
private final SqlSessionFactory sqlSessionFactory;
private final ExecutorType executorType;
private final SqlSession sqlSessionProxy;
private final PersistenceExceptionTranslator exceptionTranslator;
private Map<Object, SqlSessionFactory> targetSqlSessionFactories;
private SqlSessionFactory defaultTargetSqlSessionFactory;
/**
* 通過Map傳入
* @param targetSqlSessionFactories
*/
public void setTargetSqlSessionFactories(Map<Object, SqlSessionFactory> targetSqlSessionFactories) {
this.targetSqlSessionFactories = targetSqlSessionFactories;
}
public void setDefaultTargetSqlSessionFactory(SqlSessionFactory defaultTargetSqlSessionFactory) {
this.defaultTargetSqlSessionFactory = defaultTargetSqlSessionFactory;
}
public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
this(sqlSessionFactory, sqlSessionFactory.getConfiguration().getDefaultExecutorType());
}
public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType) {
this(sqlSessionFactory, executorType, new MyBatisExceptionTranslator(sqlSessionFactory.getConfiguration()
.getEnvironment().getDataSource(), true));
}
public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator) {
super(sqlSessionFactory, executorType, exceptionTranslator);
this.sqlSessionFactory = sqlSessionFactory;
this.executorType = executorType;
this.exceptionTranslator = exceptionTranslator;
this.sqlSessionProxy = (SqlSession) newProxyInstance(
SqlSessionFactory.class.getClassLoader(),
new Class[] { SqlSession.class },
new SqlSessionInterceptor());
this.defaultTargetSqlSessionFactory = sqlSessionFactory;
}
//通過DataSourceContextHolder擷取目前的會話工廠
@Override
public SqlSessionFactory getSqlSessionFactory() {
String dataSourceKey = DataSourceContextHolder.getDataSourceKey();
SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactories.get(dataSourceKey);
if (targetSqlSessionFactory != null) {
return targetSqlSessionFactory;
} else if (defaultTargetSqlSessionFactory != null) {
return defaultTargetSqlSessionFactory;
} else {
Assert.notNull(targetSqlSessionFactories, "Property 'targetSqlSessionFactories' or 'defaultTargetSqlSessionFactory' are required");
Assert.notNull(defaultTargetSqlSessionFactory, "Property 'defaultTargetSqlSessionFactory' or 'targetSqlSessionFactories' are required");
}
return this.sqlSessionFactory;
}
@Override
public Configuration getConfiguration() {
return this.getSqlSessionFactory().getConfiguration();
}
public ExecutorType getExecutorType() {
return this.executorType;
}
public PersistenceExceptionTranslator getPersistenceExceptionTranslator() {
return this.exceptionTranslator;
}
/**
* {@inheritDoc}
*/
public <T> T selectOne(String statement) {
return this.sqlSessionProxy.<T> selectOne(statement);
}
/**
* {@inheritDoc}
*/
public <T> T selectOne(String statement, Object parameter) {
return this.sqlSessionProxy.<T> selectOne(statement, parameter);
}
/**
* {@inheritDoc}
*/
public <K, V> Map<K, V> selectMap(String statement, String mapKey) {
return this.sqlSessionProxy.<K, V> selectMap(statement, mapKey);
}
/**
* {@inheritDoc}
*/
public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey) {
return this.sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey);
}
/**
* {@inheritDoc}
*/
public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey, RowBounds rowBounds) {
return this.sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey, rowBounds);
}
/**
* {@inheritDoc}
*/
public <E> List<E> selectList(String statement) {
return this.sqlSessionProxy.<E> selectList(statement);
}
/**
* {@inheritDoc}
*/
public <E> List<E> selectList(String statement, Object parameter) {
return this.sqlSessionProxy.<E> selectList(statement, parameter);
}
/**
* {@inheritDoc}
*/
public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
return this.sqlSessionProxy.<E> selectList(statement, parameter, rowBounds);
}
/**
* {@inheritDoc}
*/
public void select(String statement, ResultHandler handler) {
this.sqlSessionProxy.select(statement, handler);
}
/**
* {@inheritDoc}
*/
public void select(String statement, Object parameter, ResultHandler handler) {
this.sqlSessionProxy.select(statement, parameter, handler);
}
/**
* {@inheritDoc}
*/
public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) {
this.sqlSessionProxy.select(statement, parameter, rowBounds, handler);
}
/**
* {@inheritDoc}
*/
public int insert(String statement) {
return this.sqlSessionProxy.insert(statement);
}
/**
* {@inheritDoc}
*/
public int insert(String statement, Object parameter) {
return this.sqlSessionProxy.insert(statement, parameter);
}
/**
* {@inheritDoc}
*/
public int update(String statement) {
return this.sqlSessionProxy.update(statement);
}
/**
* {@inheritDoc}
*/
public int update(String statement, Object parameter) {
return this.sqlSessionProxy.update(statement, parameter);
}
/**
* {@inheritDoc}
*/
public int delete(String statement) {
return this.sqlSessionProxy.delete(statement);
}
/**
* {@inheritDoc}
*/
public int delete(String statement, Object parameter) {
return this.sqlSessionProxy.delete(statement, parameter);
}
/**
* {@inheritDoc}
*/
public <T> T getMapper(Class<T> type) {
return getConfiguration().getMapper(type, this);
}
/**
* {@inheritDoc}
*/
public void commit() {
throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
}
/**
* {@inheritDoc}
*/
public void commit(boolean force) {
throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
}
/**
* {@inheritDoc}
*/
public void rollback() {
throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
}
/**
* {@inheritDoc}
*/
public void rollback(boolean force) {
throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
}
/**
* {@inheritDoc}
*/
public void close() {
throw new UnsupportedOperationException("Manual close is not allowed over a Spring managed SqlSession");
}
/**
* {@inheritDoc}
*/
public void clearCache() {
this.sqlSessionProxy.clearCache();
}
/**
* {@inheritDoc}
*/
public Connection getConnection() {
return this.sqlSessionProxy.getConnection();
}
/**
* {@inheritDoc}
* @since 1.0.2
*/
public List<BatchResult> flushStatements() {
return this.sqlSessionProxy.flushStatements();
}
/**
* Proxy needed to route MyBatis method calls to the proper SqlSession got from Spring's Transaction Manager It also
* unwraps exceptions thrown by {@code Method#invoke(Object, Object...)} to pass a {@code PersistenceException} to
* the {@code PersistenceExceptionTranslator}.
*/
private class SqlSessionInterceptor implements InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
final SqlSession sqlSession = getSqlSession(
CustomSqlSessionTemplate.this.getSqlSessionFactory(),
CustomSqlSessionTemplate.this.executorType,
CustomSqlSessionTemplate.this.exceptionTranslator);
try {
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, CustomSqlSessionTemplate.this.getSqlSessionFactory())) {
sqlSession.commit(true);
}
return result;
} catch (Throwable t) {
Throwable unwrapped = unwrapThrowable(t);
if (CustomSqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
Throwable translated = CustomSqlSessionTemplate.this.exceptionTranslator
.translateExceptionIfPossible((PersistenceException) unwrapped);
if (translated != null) {
unwrapped = translated;
}
}
throw unwrapped;
} finally {
closeSqlSession(sqlSession, CustomSqlSessionTemplate.this.getSqlSessionFactory());
}
}
}
}
然後是xat分布式事務管理器,XATransactionManagerConfig.java:
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
/**
* @Author : JCccc
* @CreateTime : 2019/12/10
* @Description :JTA 事務配置
**/
@Configuration
@EnableTransactionManagement
public class XATransactionManagerConfig {
@Bean
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}
@Bean
public TransactionManager atomikosTransactionManager() {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(true);
return userTransactionManager;
}
@Bean
public PlatformTransactionManager transactionManager(UserTransaction userTransaction,
TransactionManager transactionManager) {
return new JtaTransactionManager(userTransaction, transactionManager);
}
}
然後,在啟動類上,去除掉自動加載的資料源配置類,
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class JtadbsourceApplication {
public static void main(String[] args) {
SpringApplication.run(JtadbsourceApplication.class, args);
}
}
到這裡,aop注解方式整合多資料源+分布式事務jta已經完畢了!
接下來就是使用測試環節,包括單資料源資料插入&事務復原,多資料源切換插入&事務復原:
首先建立實體類,User.java:
import lombok.Data;
import lombok.ToString;
/**
* @Author : JCccc
* @CreateTime : 2019/10/22
* @Description :
**/
@Data
@ToString
public class User {
private Integer id;
private String username;
private Integer age;
}
然後是UserMapper.java:
import com.test.jtadbsource.pojo.User;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.stereotype.Repository;
/**
* @Author : JCccc
* @CreateTime : 2019/12/10
* @Description :
**/
@Mapper
public interface UserMapper {
int insert(User user);
}
然後是建立一個TestJtaservice.java:
import com.test.jtadbsource.pojo.User;
/**
* @Author : JCccc
* @CreateTime : 2019/12/9
* @Description :
**/
public interface TestJtaService {
void testInsertUser(User user);
void testInsertUser2(User user);
}
然後是TestJtaServiceImpl.java , 這裡将會通過我們開始建立的自定義注解來辨別,哪些service使用哪些資料源:
import com.test.jtadbsource.dbAop.DataSource;
import com.test.jtadbsource.dbAop.DataSourceNames;
import com.test.jtadbsource.mapper.UserMapper;
import com.test.jtadbsource.pojo.User;
import com.test.jtadbsource.service.TestJtaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @Author : JCccc
* @CreateTime : 2019/12/9
* @Description :
**/
@Service
public class TestJtaServiceImpl implements TestJtaService {
@Autowired
UserMapper userMapper;
public void testInsertUser(User user){
int insertNum = userMapper.insert(user);
System.out.println("插入成功,條數:"+insertNum);
}
@DataSource(DataSourceNames.TWO)
public void testInsertUser2(User user){
int insertNum = userMapper.insert(user);
System.out.println("插入成功,條數:"+insertNum);
}
}
最後,我們寫個接口,先來測試下資料源方面,操作不同資料是否正常:
調用下該接口,
資料正常插入:
那麼我們直接測試下單資料源的事務復原,
不使用手動復原,這樣測試下其實也行:
調用下接口,事務復原正常:
接下來是兩個資料源資料同時插入:
調用下接口,資料正常插入:
然後是不同資料源事務一起復原:
調用下接口: