天天看點

Spring JTA分布式事務實作

1.概述

Java Transaction API,通常稱為JTA,是用于管理 Java中的事務的API 。它允許我們以資源無關的方式啟動,送出和復原事務。

根據用于管理事務的底層實作,Spring中的事務政策可以分為兩個主要部分:

  • 單連接配接器政策(相當于本地事務管理器) - 底層技術使用單連接配接器。例如,JDBC使用連接配接級事務、Hibernate以及JDO使用會話級事務。可以應用使用AOP和攔截器的聲明式事務管理。
  • 多連接配接器政策(相當于全局事務管理器) - 底層技術具有使用多個連接配接器的能力。當有這方面需求時,JTA是最好的選擇。此政策需要啟用JTA的資料源執行個體。JBossTS、Atomikos、Bitronix都是開源的JTA實作。

JTA的真正強大之處在于它能夠在單個事務中管理多個資源(如資料庫,消息服務)。

在本文中,我們将從概念層面了解JTA,并了解業務代碼通常是如何與JTA互動。

2.通用接口和分布式事務

JTA提供了對業務代碼的事務控制(開始,送出和復原)的抽象。

如果沒有這種抽象,我們必須處理每種資源類型的各個API。

例如,我們需要處理JDBC資源。同樣,JMS資源可能具有類似但不相容的模型。

通過JTA,我們可以以一緻和協調的方式管理不同類型的多種資源。

作為API,JTA定義了由事務管理器實作的接口和語義 。實作由Atomikos和Bitronix等庫提供。

3.示例項目

本例子模拟了銀行應用的一個非常簡單的轉賬業務。我們有兩個服務:銀行賬戶服務BankAccountService 和操作行為審計服務AuditService,它們使用了兩個不同的資料庫。

資料庫采用的是JAVA内置資料庫HSQLDB,這些獨立的資料庫需要在事務開始,送出或復原時進行協調。

JTA事務管理器采用Bitronix。

我們的示例項目使用Spring Boot來簡化配置:

<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-dependencies</artifactId>
			<version>2.1.4.RELEASE</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
	</dependencies>
</dependencyManagement>
<dependencies>
	<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jta-bitronix</artifactId>
    </dependency>
    <dependency>
        <groupId>org.hsqldb</groupId>
        <artifactId>hsqldb</artifactId>
    </dependency>
</dependencies>
           

在服務啟動時,建立2個資料源:accountDb,auditDb:

@Bean("dataSourceAccount")
public DataSource dataSource() throws Exception {
	return createHsqlXADatasource("jdbc:hsqldb:mem:accountDb");
}

@Bean("dataSourceAudit")
public DataSource dataSourceAudit() throws Exception {
	return createHsqlXADatasource("jdbc:hsqldb:mem:auditDb");
}
           

在每個測試方法之前,我們使用腳本分别在每個庫下建立表:ACCOUNT和AUDIT_LOG,并初始化一些資料:

ID BALANCE
a0000001 1000
a0000002 2000

4.聲明性事務界定

在JTA中處理事務的第一種方法是使用@Transactional注解。

讓我們用@Transactional注解服務方法 executeTranser()。 這表明事務管理器開始事務:

@Transactional
public void executeTransfer(String fromAccontId, String toAccountId,
		BigDecimal amount) {
	bankAccountService.transfer(fromAccontId, toAccountId, amount);
	auditService.log(fromAccontId, toAccountId, amount);
	BigDecimal balance = bankAccountService.balanceOf(fromAccontId);
	if (balance.compareTo(BigDecimal.ZERO) < 0) {
		throw new RuntimeException("餘額不足!");
	}
}
           

這裡 executeTranser()方法調用了2個不同的服務:AccountService和AuditService。這2個服務使用2個不同的資料庫。

當 executeTransfer()傳回時,該事務管理器識别出它是事務的結束,将作用于兩個資料庫。

在方法結束時,如果轉賬人的資金不足, executeTransfer()會檢查帳戶餘額并抛出 RuntimeException異常。

4.1沒有異常全部送出

場景1:當從a0000001賬戶給a0000002賬戶轉賬500元時,由于執行金額小于餘額,一切正常。行為審計表裡增加一條記錄。

@Test
public void givenAnnotationTx_whenNoException_thenAllCommitted()
		throws Exception {
	tellerService.executeTransfer("a0000001", "a0000002",
			BigDecimal.valueOf(500));

	assertThat(accountService.balanceOf("a0000001"))
			.isEqualByComparingTo(BigDecimal.valueOf(500));
	assertThat(accountService.balanceOf("a0000002"))
			.isEqualByComparingTo(BigDecimal.valueOf(2500));

	TransferLog lastTransferLog = auditService.lastTransferLog();
	assertThat(lastTransferLog).isNotNull();
	assertThat(lastTransferLog.getFromAccountId()).isEqualTo("a0000001");
	assertThat(lastTransferLog.getToAccountId()).isEqualTo("a0000002");
	assertThat(lastTransferLog.getAmount())
			.isEqualByComparingTo(BigDecimal.valueOf(500));
}
           

4.2出現異常進行復原

場景2:當從a0000002賬戶給a0000001賬戶轉賬10000元時,由于執行金額大于餘額,抛出異常。兩個資料庫進行復原,賬戶餘額不變,行為審計表沒有資料。

@Test
public void givenAnnotationTx_whenException_thenAllRolledBack()
		throws Exception {
	assertThatThrownBy(() -> {
		tellerService.executeTransfer("a0000002", "a0000001",
				BigDecimal.valueOf(100000));
	}).hasMessage("餘額不足!");

	assertThat(accountService.balanceOf("a0000001"))
			.isEqualByComparingTo(BigDecimal.valueOf(1000));
	assertThat(accountService.balanceOf("a0000002"))
			.isEqualByComparingTo(BigDecimal.valueOf(2000));
	assertThat(auditService.lastTransferLog()).isNull();
}
           

5.程式設計性事務界定

另一種控制JTA事務的方法是通過調用 javax.transaction.UserTransaction以程式設計方式實作。

public void executeTransferProgrammaticTx(String fromAccontId,
			String toAccountId, BigDecimal amount) throws Exception {
	userTransaction.begin();
	bankAccountService.transfer(fromAccontId, toAccountId, amount);
	auditService.log(fromAccontId, toAccountId, amount);
	BigDecimal balance = bankAccountService.balanceOf(fromAccontId);
	if (balance.compareTo(BigDecimal.ZERO) < 0) {
		userTransaction.rollback();
		throw new RuntimeException("餘額不足!");
	} else {
		userTransaction.commit();
	}
}
           

在我們的示例中,begin()方法啟動了一個新事務。如果餘額驗證失敗,調用rollback(),它将復原兩個資料庫。否則,調用commit() 會将更改送出給兩個資料庫。

需要注意的是 commit()和 rollback()都 結束目前事務。

6.總結

在本文中,我們讨論了JTA試圖解決的問題。通過示例工程說明了使用注釋和程式設計方式來控制事務,涉及需要在單個事務中協調2個事務資源。

示例完整代碼可以在GitHub上找到。