天天看點

分布式事務解決方案——基于Atomikos的實作

聲明:以下關于“JTA規範事務模型”、“Spring JTA分布式事務的實作”等内容均來源于其他大佬的部落格内容,并已經表明出處。

1、JTA規範事務模型

  Java Transaction API,通常稱為JTA,是用于管理 Java中的事務的API 。它允許我們以資源無關的方式啟動,送出和復原事務。

  JTA為J2EE平台提供了分布式事務服務(distributed transaction)的能力。 某種程度上,可以認為JTA規範是XA規範的Java版,其把XA規範中規定的DTP模型互動接口抽象成Java接口中的方法,并規定每個方法要實作什麼樣的功能。

  在DTP模型中,規定了模型的五個組成元素:應用程式(Application)、資料總管(Resource Manager)、事務管理器(Transaction Manager)、通信資料總管(Communication Resource Manager)、 通信協定(Communication Protocol)。

   而在JTA規範中,模型中又多了一個元素Application Server,如下所示:

分布式事務解決方案——基于Atomikos的實作
  • 事務管理器(transaction manager)

    處于圖中最為核心的位置,其他的事務參與者都是與事務管理器進行互動。事務了管理器提供事務聲明,事務資源管理,同步,事務上下文傳播等功能。JTA規範定義了事務管理器與其他事務參與者互動的接口,而JTS規範定義了事務管理器的實作要求,是以我們看到事務管理器底層是基于JTS的。

  • 應用伺服器(application server):

    顧名思義,是應用程式運作的容器。JTA規範規定,事務管理器的功能應該由application server提供,如上圖中的EJB Server。一些常見的其他web容器,如:jboss、weblogic、websphere等,都可以作為application server,這些web容器都實作了JTA規範。特别需要注意的是,并不是所有的web容器都實作了JTA規範,如tomcat并沒有實作JTA規範,是以并不能提供事務管理器的功能。

  • 應用程式(application):

    簡單來說,就是我們自己編寫的應用,部署到了實作了JTA規範的application server中,之後我們就可以我們JTA規範中定義的UserTransaction類來聲明一個分布式事務。通常情況下,application server為了簡化開發者的工作量,并不一定要求開發者使用UserTransaction來聲明一個事務,開發者可以在需要使用分布式事務的方法上添加一個注解,就像spring的聲明式事務一樣,來聲明一個分布式事務。

    特别需要注意的是,JTA規範規定事務管理器的功能由application server提供。但是如果我們的應用不是一個web應用,而是一個本地應用,不需要被部署到application server中,無法使用application server提供的事務管理器功能。又或者我們使用的web容器并沒有事務管理器的功能,如tomcat。對于這些情況,我們可以直接使用一些第三方的事務管理器類庫,如JOTM和Atomikos。将事務管理器直接整合進應用中,不再依賴于application server。

  • 資料總管(resource manager):

    理論上任何可以存儲資料的軟體,都可以認為是資料總管RM。最典型的RM就是關系型資料庫了,如mysql,另外一種比較常見的資料總管是消息中間件,如ActiveMQ、RabbitMQ等, 這些都是真正的資料總管。

    事實上,将資料總管(resource manager)稱為資源擴充卡(resource adapter)似乎更為合适。因為在java程式中,我們都是通過client來于RM進行互動的,例如:我們通過mysql-connector-java-x.x.x.jar驅動包,擷取Conn、執行sql,與mysql服務端進行通信;通過ActiveMQ、RabbitMQ等的用戶端,來發送消息等。

    正常情況下,一個資料庫驅動供應商隻需要實作JDBC規範即可,一個消息中間件供應商隻需要實作JMS規範即可。 而引入了分布式事務的概念後,DB、MQ等在DTP模型中的作用都是RM,二者是等價的,需要由TM統一進行協調。

    為此,JTA規範定義了一個XAResource接口,其定義RM必須要提供給TM調用的一些方法。之後,不管這個RM是DB,還是MQ,TM并不關心,因為其操作的是XAResource接口。而其他規範(如JDBC、JMS)的實作者,同時也對此接口進行實作。如MysqlXAConnection,就實作了XAResource接口。

  • 通信資料總管(Communication Resource Manager):

    這個是DTP模型中就已經存在的概念,對于需要跨應用的分布式事務,事務管理器彼此之間需要通信,這是就是通過CRM這個元件來完成的。JTA規範中,規定CRM需要實作JTS規範定義的接口。

  下圖更加直覺的示範了JTA規範中各個模型元件之間是如何互動的:

分布式事務解決方案——基于Atomikos的實作
上述内容主要來自《3.0 JTA規範》這一篇部落格中。

2、Spring JTA分布式事務的實作

  根據用于管理事務的底層實作,Spring中的事務政策可以分為兩個主要部分:

  • 單連接配接器政策(相當于本地事務管理器) - 底層技術使用單連接配接器。例如,JDBC使用連接配接級事務、Hibernate以及JDO使用會話級事務。可以應用使用AOP和攔截器的聲明式事務管理。
  • 多連接配接器政策(相當于全局事務管理器) - 底層技術具有使用多個連接配接器的能力。當有這方面需求時,JTA是最好的選擇。此政策需要啟用JTA的資料源執行個體。JBossTS、Atomikos、Bitronix都是開源的JTA實作。

  JTA的真正強大之處在于它能夠在單個事務中管理多個資源(如資料庫,消息服務)。

上面内容來自《Spring JTA分布式事務實作》。

3、Atomikos簡介

  Atomikos是一個非常流行的開源事務管理器,并且可以嵌入到你的Spring Boot應用中。

  在前面我們提到了application server,而Tomcat應用伺服器沒有實作JTA規範,是以當使用Tomcat作為應用伺服器的時候,需要使用第三方的事務管理器類來作為全局的事務管理器,而Atomikos架構就是這個作用,即将事務管理整合到應用中,而不依賴于application server。

關于Atomikos可以參考《atomikos JTA/XA全局事務》。

4、基于Atomikos實作分布式事務

  我們這裡主要使用Spring Boot + Mybatis + MySql + Atomikos實作一個分布式事務的示例,具體實作過程如下:

4.1、pom.xml檔案

  引入Atomikos依賴(mybatis、mysql依賴省略了),如下所示:

<!--atomikos 依賴-->
<dependency>
   <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    <version>2.4.1</version>
</dependency>
           
4.2、application.properties檔案

  application.properties檔案,定義資料源資訊,後續通過指定的配置類進行讀取配置參數。

server.port=8080

#配置first資料源
spring.datasource.first.username=root
spring.datasource.first.password=123456
spring.datasource.first.url=jdbc:mysql://192.168.1.8:3306/db_8?useSSL=true&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai

#配置second資料源
spring.datasource.second.username=root
spring.datasource.second.password=123456
spring.datasource.second.url=jdbc:mysql://192.168.1.9:3306/db_9?useSSL=true&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai


logging.level.com.qriver.distributed.transaction=debug
           
4.3、配置檔案讀取類FirstDbProperties、SecondDbProperties

  這兩個配置類的參數一樣,隻是掃描配置檔案參數的字首不一樣,是以我們先定義一個基類DbProperties,具體如下:

public class DbProperties {

    private String type;
    private String driverClassName;
    private String url;
    private String username;
    private String password;
//省略setter 和 getter
}
           

  FirstDbProperties、SecondDbProperties兩個類定義如下:

//讀取first資料源的配置
@Configuration
@ConfigurationProperties(prefix = "spring.datasource.first")
public class FirstDbProperties extends DbProperties{

}
//讀取second資料源的配置
@Configuration
@ConfigurationProperties(prefix = "spring.datasource.second")
public class SecondDbProperties extends DbProperties{

}
           
4.4、配置XA資料源和mybatis的SqlSessionFactoryBean

  每個資料源都需要配置,我們這裡分别配置first、second兩個資料源。這裡主要以first為例進行分析。

@Configuration
//basePackages 定義了掃描的包路徑。sqlSessionFactoryRef 主要引入SqlSessionFactoryBean 對象
@MapperScan(basePackages = "com.qriver.distributed.transaction.mapper.first", sqlSessionFactoryRef = "firstSqlSessionFactoryBean")
public class FirstDataSourceConfig {

    /**
     * 建立資料源
     * @param firstDbProperties
     * @return
     */
    @Bean("firstDataSource")
    public DataSource firstDataSource(FirstDbProperties firstDbProperties){
        //建立Mysql的xa資料源
        MysqlXADataSource xaDataSource = new MysqlXADataSource();
        xaDataSource.setURL(firstDbProperties.getUrl());
        xaDataSource.setUser(firstDbProperties.getUsername());
        xaDataSource.setPassword(firstDbProperties.getPassword());
        //建立atomikos資料源
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSource(xaDataSource);
        return atomikosDataSourceBean;
    }

    @Bean("firstSqlSessionFactoryBean")
    public SqlSessionFactoryBean sqlSessionFactoryBean(@Qualifier("firstDataSource") DataSource dataSource) throws IOException {
        //建立SqlSessionFactoryBean對象
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSource);
        //設定mapper映射檔案路徑
        ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
        sqlSessionFactoryBean.setMapperLocations(resourceResolver.getResources("classpath:/mappings/first/*Mapper.xml"));
        return sqlSessionFactoryBean;
    }

}
           
4.5、全局事務管理器配置 JtaTransactionConfig

  前面配置的多個資料源相當于XA模型中的RM,我們還需要配置一個全局事務管理器,即TM,進行協調處理全局事務,配置如下:

@Configuration
public class JtaTransactionConfig {

    @Bean("jtaTransaction")
    public JtaTransactionManager jtaTransactionManager(){
		
        UserTransaction userTransaction = new UserTransactionImp();
        UserTransactionManager userTransactionManager = new UserTransactionManager();

        return new JtaTransactionManager(userTransaction,userTransactionManager);

    }

}
           

  上述UserTransaction 接口,是jta規範中提供的, UserTransactionImp實作類由Atomikos提供,UserTransactionManager 也是由Atomikos提供。

4.6、實作業務邏輯

  前面已經完成了Atomikos的配置,後續我們按照前面的配置建立對應的資料庫操作邏輯即可。

  業務代碼的編寫和普通的spring boot + mybatis的方式沒有差別,這裡隻需要注意以下兩點即可:

  1. mapper配置檔案的路徑,因為我們在配置資料源的時候,在代碼中設定了讀取映射檔案的位置,是以業務代碼中的檔案路徑應該相比對。
  2. 再一個就是注解@MapperScan中定義了基礎掃描路徑,是以Mapper接口包名也應該比對。
4.7、測試

  完成了業務代碼的編寫,我們這裡編寫一個測試的Service類,在insertData()方法中,我們操作了兩個資料庫,注意,這裡需要添加@Transactional注解,引入使用的transactionManager 對象。

@Service
public class DemoTestService {

    @Autowired
    private FirstMapper firstMapper;

    @Autowired
    private SecondMapper secondMapper;

    @Transactional(transactionManager = "jtaTransaction")
    public void insertData(){
        //插入first
        FirstEntity firstEntity = new FirstEntity();
        firstEntity.setId(1);
        firstEntity.setName("test");
        firstMapper.insert(firstEntity);
        //插入second
        SecondEntity secondEntity = new SecondEntity();
        secondEntity.setId(1);
        secondEntity.setName("test2");
        secondMapper.insert(secondEntity);
    }

}
           

  然後,再編寫單元測試類,如下:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = DistributedTransactionAtomikosApplication.class)
public class DemoTest {

    @Autowired
    private DemoTestService demoTestService;

    @Test
    public void insertTest(){
        demoTestService.insertData();
    }

}
           

  我們可以通過執行單元測試方法進行測試,通過修改資料庫表的字段讓插入操作失敗,進而驗證Atomikos跨資料的分布式事務。這裡不再示範。

繼續閱讀