聲明:以下關于“JTA規範事務模型”、“Spring JTA分布式事務的實作”等内容均來源于其他大佬的部落格内容,并已經表明出處。
1、JTA規範事務模型
Java Transaction API,通常稱為JTA,是用于管理 Java中的事務的API 。它允許我們以資源無關的方式啟動,送出和復原事務。
JTA為J2EE平台提供了分布式事務服務(distributed transaction)的能力。 某種程度上,可以認為JTA規範是XA規範的Java版,其把XA規範中規定的DTP模型互動接口抽象成Java接口中的方法,并規定每個方法要實作什麼樣的功能。
在DTP模型中,規定了模型的五個組成元素:應用程式(Application)、資料總管(Resource Manager)、事務管理器(Transaction Manager)、通信資料總管(Communication Resource Manager)、 通信協定(Communication Protocol)。
而在JTA規範中,模型中又多了一個元素Application Server,如下所示:
-
事務管理器(transaction manager)
處于圖中最為核心的位置,其他的事務參與者都是與事務管理器進行互動。事務了管理器提供事務聲明,事務資源管理,同步,事務上下文傳播等功能。JTA規範定義了事務管理器與其他事務參與者互動的接口,而JTS規範定義了事務管理器的實作要求,是以我們看到事務管理器底層是基于JTS的。
-
應用伺服器(application server):
顧名思義,是應用程式運作的容器。JTA規範規定,事務管理器的功能應該由application server提供,如上圖中的EJB Server。一些常見的其他web容器,如:jboss、weblogic、websphere等,都可以作為application server,這些web容器都實作了JTA規範。特别需要注意的是,并不是所有的web容器都實作了JTA規範,如tomcat并沒有實作JTA規範,是以并不能提供事務管理器的功能。
-
應用程式(application):
簡單來說,就是我們自己編寫的應用,部署到了實作了JTA規範的application server中,之後我們就可以我們JTA規範中定義的UserTransaction類來聲明一個分布式事務。通常情況下,application server為了簡化開發者的工作量,并不一定要求開發者使用UserTransaction來聲明一個事務,開發者可以在需要使用分布式事務的方法上添加一個注解,就像spring的聲明式事務一樣,來聲明一個分布式事務。
特别需要注意的是,JTA規範規定事務管理器的功能由application server提供。但是如果我們的應用不是一個web應用,而是一個本地應用,不需要被部署到application server中,無法使用application server提供的事務管理器功能。又或者我們使用的web容器并沒有事務管理器的功能,如tomcat。對于這些情況,我們可以直接使用一些第三方的事務管理器類庫,如JOTM和Atomikos。将事務管理器直接整合進應用中,不再依賴于application server。
-
資料總管(resource manager):
理論上任何可以存儲資料的軟體,都可以認為是資料總管RM。最典型的RM就是關系型資料庫了,如mysql,另外一種比較常見的資料總管是消息中間件,如ActiveMQ、RabbitMQ等, 這些都是真正的資料總管。
事實上,将資料總管(resource manager)稱為資源擴充卡(resource adapter)似乎更為合适。因為在java程式中,我們都是通過client來于RM進行互動的,例如:我們通過mysql-connector-java-x.x.x.jar驅動包,擷取Conn、執行sql,與mysql服務端進行通信;通過ActiveMQ、RabbitMQ等的用戶端,來發送消息等。
正常情況下,一個資料庫驅動供應商隻需要實作JDBC規範即可,一個消息中間件供應商隻需要實作JMS規範即可。 而引入了分布式事務的概念後,DB、MQ等在DTP模型中的作用都是RM,二者是等價的,需要由TM統一進行協調。
為此,JTA規範定義了一個XAResource接口,其定義RM必須要提供給TM調用的一些方法。之後,不管這個RM是DB,還是MQ,TM并不關心,因為其操作的是XAResource接口。而其他規範(如JDBC、JMS)的實作者,同時也對此接口進行實作。如MysqlXAConnection,就實作了XAResource接口。
-
通信資料總管(Communication Resource Manager):
這個是DTP模型中就已經存在的概念,對于需要跨應用的分布式事務,事務管理器彼此之間需要通信,這是就是通過CRM這個元件來完成的。JTA規範中,規定CRM需要實作JTS規範定義的接口。
下圖更加直覺的示範了JTA規範中各個模型元件之間是如何互動的:
上述内容主要來自《3.0 JTA規範》這一篇部落格中。
2、Spring JTA分布式事務的實作
根據用于管理事務的底層實作,Spring中的事務政策可以分為兩個主要部分:
- 單連接配接器政策(相當于本地事務管理器) - 底層技術使用單連接配接器。例如,JDBC使用連接配接級事務、Hibernate以及JDO使用會話級事務。可以應用使用AOP和攔截器的聲明式事務管理。
- 多連接配接器政策(相當于全局事務管理器) - 底層技術具有使用多個連接配接器的能力。當有這方面需求時,JTA是最好的選擇。此政策需要啟用JTA的資料源執行個體。JBossTS、Atomikos、Bitronix都是開源的JTA實作。
JTA的真正強大之處在于它能夠在單個事務中管理多個資源(如資料庫,消息服務)。
上面内容來自《Spring JTA分布式事務實作》。
3、Atomikos簡介
Atomikos是一個非常流行的開源事務管理器,并且可以嵌入到你的Spring Boot應用中。
在前面我們提到了application server,而Tomcat應用伺服器沒有實作JTA規範,是以當使用Tomcat作為應用伺服器的時候,需要使用第三方的事務管理器類來作為全局的事務管理器,而Atomikos架構就是這個作用,即将事務管理整合到應用中,而不依賴于application server。
關于Atomikos可以參考《atomikos JTA/XA全局事務》。
4、基于Atomikos實作分布式事務
我們這裡主要使用Spring Boot + Mybatis + MySql + Atomikos實作一個分布式事務的示例,具體實作過程如下:
4.1、pom.xml檔案
引入Atomikos依賴(mybatis、mysql依賴省略了),如下所示:
<!--atomikos 依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
<version>2.4.1</version>
</dependency>
4.2、application.properties檔案
application.properties檔案,定義資料源資訊,後續通過指定的配置類進行讀取配置參數。
server.port=8080
#配置first資料源
spring.datasource.first.username=root
spring.datasource.first.password=123456
spring.datasource.first.url=jdbc:mysql://192.168.1.8:3306/db_8?useSSL=true&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai
#配置second資料源
spring.datasource.second.username=root
spring.datasource.second.password=123456
spring.datasource.second.url=jdbc:mysql://192.168.1.9:3306/db_9?useSSL=true&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai
logging.level.com.qriver.distributed.transaction=debug
4.3、配置檔案讀取類FirstDbProperties、SecondDbProperties
這兩個配置類的參數一樣,隻是掃描配置檔案參數的字首不一樣,是以我們先定義一個基類DbProperties,具體如下:
public class DbProperties {
private String type;
private String driverClassName;
private String url;
private String username;
private String password;
//省略setter 和 getter
}
FirstDbProperties、SecondDbProperties兩個類定義如下:
//讀取first資料源的配置
@Configuration
@ConfigurationProperties(prefix = "spring.datasource.first")
public class FirstDbProperties extends DbProperties{
}
//讀取second資料源的配置
@Configuration
@ConfigurationProperties(prefix = "spring.datasource.second")
public class SecondDbProperties extends DbProperties{
}
4.4、配置XA資料源和mybatis的SqlSessionFactoryBean
每個資料源都需要配置,我們這裡分别配置first、second兩個資料源。這裡主要以first為例進行分析。
@Configuration
//basePackages 定義了掃描的包路徑。sqlSessionFactoryRef 主要引入SqlSessionFactoryBean 對象
@MapperScan(basePackages = "com.qriver.distributed.transaction.mapper.first", sqlSessionFactoryRef = "firstSqlSessionFactoryBean")
public class FirstDataSourceConfig {
/**
* 建立資料源
* @param firstDbProperties
* @return
*/
@Bean("firstDataSource")
public DataSource firstDataSource(FirstDbProperties firstDbProperties){
//建立Mysql的xa資料源
MysqlXADataSource xaDataSource = new MysqlXADataSource();
xaDataSource.setURL(firstDbProperties.getUrl());
xaDataSource.setUser(firstDbProperties.getUsername());
xaDataSource.setPassword(firstDbProperties.getPassword());
//建立atomikos資料源
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSource(xaDataSource);
return atomikosDataSourceBean;
}
@Bean("firstSqlSessionFactoryBean")
public SqlSessionFactoryBean sqlSessionFactoryBean(@Qualifier("firstDataSource") DataSource dataSource) throws IOException {
//建立SqlSessionFactoryBean對象
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSource);
//設定mapper映射檔案路徑
ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
sqlSessionFactoryBean.setMapperLocations(resourceResolver.getResources("classpath:/mappings/first/*Mapper.xml"));
return sqlSessionFactoryBean;
}
}
4.5、全局事務管理器配置 JtaTransactionConfig
前面配置的多個資料源相當于XA模型中的RM,我們還需要配置一個全局事務管理器,即TM,進行協調處理全局事務,配置如下:
@Configuration
public class JtaTransactionConfig {
@Bean("jtaTransaction")
public JtaTransactionManager jtaTransactionManager(){
UserTransaction userTransaction = new UserTransactionImp();
UserTransactionManager userTransactionManager = new UserTransactionManager();
return new JtaTransactionManager(userTransaction,userTransactionManager);
}
}
上述UserTransaction 接口,是jta規範中提供的, UserTransactionImp實作類由Atomikos提供,UserTransactionManager 也是由Atomikos提供。
4.6、實作業務邏輯
前面已經完成了Atomikos的配置,後續我們按照前面的配置建立對應的資料庫操作邏輯即可。
業務代碼的編寫和普通的spring boot + mybatis的方式沒有差別,這裡隻需要注意以下兩點即可:
- mapper配置檔案的路徑,因為我們在配置資料源的時候,在代碼中設定了讀取映射檔案的位置,是以業務代碼中的檔案路徑應該相比對。
- 再一個就是注解@MapperScan中定義了基礎掃描路徑,是以Mapper接口包名也應該比對。
4.7、測試
完成了業務代碼的編寫,我們這裡編寫一個測試的Service類,在insertData()方法中,我們操作了兩個資料庫,注意,這裡需要添加@Transactional注解,引入使用的transactionManager 對象。
@Service
public class DemoTestService {
@Autowired
private FirstMapper firstMapper;
@Autowired
private SecondMapper secondMapper;
@Transactional(transactionManager = "jtaTransaction")
public void insertData(){
//插入first
FirstEntity firstEntity = new FirstEntity();
firstEntity.setId(1);
firstEntity.setName("test");
firstMapper.insert(firstEntity);
//插入second
SecondEntity secondEntity = new SecondEntity();
secondEntity.setId(1);
secondEntity.setName("test2");
secondMapper.insert(secondEntity);
}
}
然後,再編寫單元測試類,如下:
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = DistributedTransactionAtomikosApplication.class)
public class DemoTest {
@Autowired
private DemoTestService demoTestService;
@Test
public void insertTest(){
demoTestService.insertData();
}
}
我們可以通過執行單元測試方法進行測試,通過修改資料庫表的字段讓插入操作失敗,進而驗證Atomikos跨資料的分布式事務。這裡不再示範。