天天看點

Spring + Atomikos 分布式事務實作方式

             前段時間發現對分布式事務了解的不夠清晰,最近又重新看了一下分布式事務,簡單做個記錄,以後友善檢視

Java規範對分布式事務定義了标準的規範Java事務API和Java事務服務,分别是JTA和JTS

一個分布式事務必須包括一個事務管理器和多個資料總管,

資料總管是任意類型的持久化資料存儲,而事務管理器則是承擔着所有事務參與單元者的互相通訊的責任

JTA的規範制定了分布式事務的實作的整套流程架構,定義了各個接口且隻有接口,而實作分别交給事務管理器的實作方和資料總管的實作方

對于資料總管而言,主要包括資料庫連接配接,JMS等,還有很多了解的不清楚

對于事務管理器而言,從網上了解主要是應用伺服器,包括JBOSS,WEBLOGIC等應用伺服器,也就是說事務管理器的實作方是應用伺服器,用來管理事務的通訊和協調

對于大多數談的資料庫了解,事務管理器需要從資料庫獲得XAConnection , XAResource等對象,而這些對象是資料庫驅動程式需要提供的

是以如果要實作分布式事務還必須有支援分布式事務的資料庫伺服器以及資料庫驅動程式

對Mysql而言,在mysql5.0以上的版本已經支援了分布式事務,另外常用的mysql-connector-java-5.1.25-bin.jar也是支援分布式事務的

可以在jar包的com.mysql.jdbc.jdbc2.optional中找到XA對象的實作

上面介紹了事務管理器和資料總管的實作方式,在學習研究過程中發現對于事務管理器,特别強調了tomcat等伺服器是不支援的,這句話的意思應該是在tomcat容器内

并沒有分布式事務管理器的實作對象。而在JBOSS或者WEBLOGIC等商業伺服器應該内置了分布式事務管理器的實作對象,應用程式可以通過JNDI方式擷取UserTransaction

和TransactionManager等分布式事務環境中所需要用到的對象

Spring + Atomikos 分布式事務實作方式

事務管理器作為管理和協調分布式事務的關鍵進行中心非常重要,是以應用伺服器可以單獨隻用過事務管理器。

上圖具體文章連結為http://blog.csdn.net/xiaol_zhong/article/details/7983863

上面主要是一些基本的概念,在學習研究中總結出來的,可能不太全面,下面主要介紹一下在使用Spring使用分布式事務中的心得,這種做法也是将事務管理器嵌入應用中。

開始準備Spring的時候,網上介紹了Jotm以及Atomikos等工具,實際上這些工具都是取代應用伺服器對事務管理器的支援,負責實作事務管理器對象

Jotm需要使用特定資料庫連接配接池enhydra,而且網上說因為維護時間久遠,問題不少,是以直接使用Atomikos進行測試

在Maven上下載下傳了Atomikos的3.9.0版本的相關需要jar包。主要包括

atomikos-util-3.9.0.jar   transactions-3.9.0.jar  transactions-api-3.9.0.jar  transactions-jdbc-3.9.0.jar  transactions-jta-3.9.0.jar  jta-1.1.jar 

下面看一下主要的配置檔案的配置方式:

<?xml version="1.0" encoding="UTF-8"?>


<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<bean id="dataSource0" class="com.mchange.v2.c3p0.ComboPooledDataSource">
<property name="driverClass" value="com.mysql.jdbc.Driver" />
<property name="jdbcUrl" value="jdbc:mysql://172.17.2.5:3003/jta" />
<property name="user" value="root" />
<property name="password" value="ems" />
<property name="autoCommitOnClose" value="true" />
</bean>
 
<bean id="dataSource1" class="com.atomikos.jdbc.AtomikosDataSourceBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="ds1" />
<property name="xaDataSourceClassName"
value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
<property name="xaProperties">
<props>
<prop key="url">jdbc:mysql://172.17.2.5:3003/jta</prop>
<prop key="user">root</prop>
<prop key="password">ems</prop>
</props>
</property>
<property name="minPoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="borrowConnectionTimeout" value="30" />
<property name="testQuery" value="select 1" />
<property name="maintenanceInterval" value="60" />
</bean>


<bean id="dataSource2" class="com.atomikos.jdbc.AtomikosDataSourceBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="ds2" />
<property name="xaDataSourceClassName"
value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
<property name="xaProperties">
<props>
<prop key="url">jdbc:mysql://172.17.2.5:3306/jta</prop>
<prop key="user">root</prop>
<prop key="password">ems</prop>
</props>
</property>
<property name="minPoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="borrowConnectionTimeout" value="30" />
<property name="testQuery" value="select 1" />
<property name="maintenanceInterval" value="60" />
</bean>

<bean id="dataSource3" class="com.mchange.v2.c3p0.ComboPooledDataSource">
<property name="driverClass" value="com.mysql.jdbc.Driver" />
<property name="jdbcUrl" value="jdbc:mysql://172.17.2.5:3306/jta" />
<property name="user" value="root" />
<property name="password" value="ems" />
<property name="autoCommitOnClose" value="true" />
</bean>


<!--SqlMap setup for MyBatis Database Layer -->
<bean id="sqlSessionFactoryForD1" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource1" />
<property name="mapperLocations" value="classpath:jtaatomikos/jta.xml" />
</bean>
<bean id="sqlSessionTemplateForD1" class="org.mybatis.spring.SqlSessionTemplate">
<constructor-arg index="0" ref="sqlSessionFactoryForD1" />
</bean>


<bean id="sqlSessionFactoryForD2" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource2" />
<property name="mapperLocations" value="classpath:jtaatomikos/jta.xml" />
</bean>
<bean id="sqlSessionTemplateForD2" class="org.mybatis.spring.SqlSessionTemplate">
<constructor-arg index="0" ref="sqlSessionFactoryForD2" />
</bean>


<!-- Config JTA UserTransactionManager Impl -->
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close">
<property name="forceShutdown">
<value>true</value>
</property>
</bean>




<!-- Config JTA UserTransaction Impl -->
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout">
<value>300</value>
</property>
</bean>


<!-- Spring JtaTransactionManager Config -->
<bean id="springJTATransactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager">
<ref bean="atomikosTransactionManager" />
</property>
<property name="userTransaction">
<ref bean="atomikosUserTransaction" />
</property>
</bean>

<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource1" />
</bean>


    <!-- Aop Config -->
<aop:config>
<aop:pointcut id="jtaServiceOperation"
expression="execution(* jtaatomikos.*Service.*(..))"></aop:pointcut>
<aop:advisor pointcut-ref="jtaServiceOperation"
advice-ref="txAdvice"></aop:advisor>
</aop:config>


    <!-- Transacation Advice Handle -->
<tx:advice id="txAdvice" transaction-manager="springJTATransactionManager">
<tx:attributes>
<tx:method name="update*" rollback-for="Exception" />
</tx:attributes>
</tx:advice>




<context:component-scan base-package="jtaatomikos"></context:component-scan>




</beans>
           

下面分别對每段配置進行部分記錄

分别定義了dataSource0 dataSource1 dataSource2 dataSource3 

其中0和3都是普通的C3P0資料庫連接配接池,而1和2是AtomikosDataSourceBean

定義0和3的目的是在後面的測試中測試如果不是AtomikosDataSourceBean的連接配接是不能加入到分布式事務中的

接着定義了兩個sqlSessionTemplate,分别對應3003和3306兩個資料庫。對應程式中的兩個Dao

下面是定義分布式事務最重要的兩個實作UserTransactionManager和UserTransactionImpl,均使用Atomikos中的實作

接着是Spring的AOP代理所使用的事務處理器springJTATransactionManager,這是Spring自帶的JTA實作類,但是Spring隻負責提供接口,真正内部實作分布式事務的上面定義

的兩個對象,是以需要将上面定義的兩個對象進行注入,是以Spring架構負責提供接口,Atomikos負責實作

另外再定義一個transactionManager是為了測試在傳統的Spring事務方式下,為什麼不能支援分布式事務

後面就是為了測試定義的AOP配置,不再多說

再來看看具體測試實作,隻貼出核心測試方法

存在Service如下

public interface D1Service {
    
    void updateAccount(Integer account);


}
           

實作如下:

public void updateAccount(Integer account) {
        int userAId = 1;
        int userBId = 2;
        int userA_Account = d1Dao.getAccount(userAId);
        int userB_Account = d2Dao.getAccount(userBId);
        d1Dao.saveAccount(userAId, userA_Account + account);
        d2Dao.saveAccount(userBId, userB_Account - account);
        if(userB_Account - account < 0){
            throw new AccountNotEnoughException();
        }
    }
           

分别在3003和3306資料庫上建立相同的資料庫表,簡單測試需要字段userId和account,初始化定義資料為1 10000 ; 2 10000;

代表使用者1和2分别賬戶有10000。

測試程式如下:

<pre name="code" class="java">public static void main(String[] args) {
        ApplicationContext appContext = new ClassPathXmlApplicationContext("jtaatomikos/application-jta-atomikos.xml");
        D1Service service = (D1Service) appContext.getBean("d1Service");
        service.updateAccount(1000);
        service.updateAccount(9100);
    }
           

很明顯在執行轉賬1000的時候,是沒有問題的,在第二部執行轉賬9100的時候,由于userB_Account-account< 0 成立是以會有異常抛出

此時就是測試分布式事務的關鍵

假設以下幾種情況

在使用springJTATransactionManager的情況下

1  均使用正确的AtomikosDataSourceBean,此時兩個事務均能正确復原

2  如果分别使用AtomikosDataSourceBean和C3P0,則隻有前者對應資料庫會復原,而後者則不會復原,猜想這是因為 springJTATransactionManager在處理事務的時候, 内部的atomikosTransactionManager隻會将AtomokosDataSourceBean加入到分布式事務中,而不考慮其他連接配接方式

3  如果均使用C3P0,根據上面的解釋,很清楚的可以猜到兩個資料庫的資料均不會復原,測試結果也符合該預期

再來談談分布式事務為什麼需要使用springJTATransactionManager

Spring傳統的事務管理均使用

org.springframework.jdbc.datasource.DataSourceTransactionManager,那這種事務管理器為什麼不能支援分布式事務呢?

從配置中可以看出該對象需要注入dataSource屬性,注意隻能注入單一的dataSource,顯然這是不符合分布式事務的必須使用多個資料庫這一基礎的,是以在使用傳統的該事務管理器,隻能選擇一個資料連接配接進行事務管理,本身來說Spring的事務管理也是基于這點實作的,保證事務管理内的所有資料庫操作均使用同一個Connection,例如Connection的begin和commit以及rollback控制事務。

當使用org.springframework.jdbc.datasource.DataSourceTransactionManager 

測試結果如下:

第一個結論:無論使用DataSource0 - DataSource3 中的任何一個,均能正常復原,也就是說該事務管理器不依賴DataSource的具體實作,不論是Atomikos的實作或者是其他的資料庫連接配接實作,均能夠被傳統的事務管理器管理

第二個結論:因為該事務管理器隻能配置單一的DataSource,是以隻能保證配置的DataSource能被事務管理,其它的DataSource都不受事務控制,其原理也很顯而易見,因為傳統的事務管理器使用單一Connection進行事務管理,在分布式事務多個不同資料庫的Connection條件下,顯然這種實作方式不能成立。是以需要Atimikos提供實作了JTA規範标準的事務管理器

關于JTA的兩段送出方案,網上也很多教程,後面自己也會進行一步步實踐,後面會跟進進行記錄