天天看點

分布式事務解決方案:XA規範

分布式事務解決方案:XA規範

XA規範

二階段送出協定是一個協定,而XA規範是X/Open 組織針對二階段送出協定的實作做的規範。目前幾乎所有的主流資料庫都對XA規範提供了支援。

這樣做的好處是友善多個資源(如資料庫,應用伺服器,消息隊列等)在同一個事務中通路。你可以類比JDBC

我們這篇文章就以MySQL XA為例示範一下XA怎麼玩?

MySQL XA常用的指令如下

指令 解釋
XA START xid 開啟一個事務,并将事務置于ACTIVE狀态,此後執行的SQL語句都将置于該事務中
XA END xid 将事務置于IDLE狀态,表示事務内的SQL操作完成
XA PREPARE xid 實作事務送出的準備工作,事務狀态置于PREPARED狀态。事務如果無法完成送出前的準備操作,該語句會執行失敗
XA COMMIT xid 事務最終送出,完成持久化
XA ROLLBACK xid 事務復原終止
XA RECOVER 檢視MySQL中存在的PREPARED狀态的xa事務

我們在db_account_1和db_account_2都建一個account_info表并初始化2條記錄

CREATE TABLE `account_info`
(
    `id`      INT(11)      NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',
    `user_id` VARCHAR(255) NOT NULL COMMENT '使用者id',
    `balance` INT(11)      NOT NULL DEFAULT 0 COMMENT '使用者餘額',
    PRIMARY KEY (`id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8;

INSERT INTO account_info (id, user_id, balance)
VALUES (1, '1001', 10000);
INSERT INTO account_info (id, user_id, balance)
VALUES (2, '1002', 10000);
           

我們以使用者1001向1002轉賬200元為例

mysql> XA START "transfer_money";
Query OK, 0 rows affected (0.01 sec)
 
mysql> update account_info set balance = balance - 200 where user_id = '1001';
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0
mysql> XA END "transfer_money";
Query OK, 0 rows affected (0.01 sec)
 
mysql> XA PREPARE "transfer_money";
Query OK, 0 rows affected (0.01 sec)
 
mysql> XA COMMIT "transfer_money";
Query OK, 0 rows affected (0.01 sec)
           

在XA START執行後所有資源将會被鎖定,直到執行了XA PREPARE或者XA COMMIT才會釋放。

如果在這個時間段内另外一個事務執行如下語句則會一直被阻塞

這就是XA規範這種解決方案很少被使用的原因,因為中間過程會鎖定資源,很難支援高并發

我們也可以将一個 IDLE 狀态的 XA 事務可以直接送出或者復原

mysql> XA COMMIT "transfer_money";
1399 - XAER_RMFAIL: The command cannot be executed when global transaction is in the  IDLE state
mysql> XA COMMIT "transfer_money" ONE PHASE;
Query OK, 0 rows affected (0.01 sec)
 
mysql> XA START "transfer_money";
Query OK, 0 rows affected (0.01 sec)
 
mysql> update account_info set balance = balance - 200 where user_id = '1001';
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0
mysql> XA END "transfer_money";
Query OK, 0 rows affected (0.01 sec)
 
mysql> XA COMMIT "transfer_money" ONE PHASE;
Query OK, 0 rows affected (0.01 sec)
           

XA事務變化圖

分布式事務解決方案:XA規範

JTA

JTA(Java Transaction API),是J2EE的程式設計接口規範,它是XA規範的Java實作相關的接口有如下2個

javax.transaction.TransactionManager(事務管理器的接口):定義了有關事務的開始、送出、撤回等操作。

javax.transaction.xa.XAResource(滿足XA規範的資源定義接口):一種資源如果要支援JTA事務,就需要讓它的資源實作該XAResource接口,并實作該接口定義的兩階段送出相關的接口

在Java中有很多架構都對XA規範進行了實作,我就示範一下最常用的實作atomikos和seata

atomikos隻能用在單個應用對多個庫進行操作的場景。而seata所有的分布式事務場景都能用

是什麼造成這種差異呢?看Demo

atomikos

先加依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    <version>2.1.14.RELEASE</version>
</dependency>
           

配置2個資料源

spring:
  jta:
    atomikos:
      datasource:
        primary:
          borrow-connection-timeout: 10000.0
          max-lifetime: 20000.0
          max-pool-size: 25.0
          min-pool-size: 3.0
          unique-resource-name: test1
          xa-data-source-class-name: com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
          xa-properties:
            password: test
            url: jdbc:mysql://myhost:3306/db_account_1
            user: test
        secondary:
          borrow-connection-timeout: 10000.0
          max-lifetime: 20000.0
          max-pool-size: 25.0
          min-pool-size: 3.0
          unique-resource-name: test2
          xa-data-source-class-name: com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
          xa-properties:
            password: test
            url: jdbc:mysql://myhost:3306/db_account_2
            user: test
    enabled: true
           
@Configuration
public class DataSourceConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.jta.atomikos.datasource.primary")
    public DataSource primaryDataSource() {
        return new AtomikosDataSourceBean();
    }

    @Bean
    @ConfigurationProperties(prefix = "spring.jta.atomikos.datasource.secondary")
    public DataSource secondaryDataSource() {
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        return ds;
    }

    @Bean
    public JdbcTemplate primaryJdbcTemplate(
            @Qualifier("primaryDataSource") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Bean
    public JdbcTemplate secondaryJdbcTemplate(
            @Qualifier("secondaryDataSource") DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }
}
           
@Service
public class AccountService {

    @Resource
    @Qualifier("primaryJdbcTemplate")
    private JdbcTemplate primaryJdbcTemplate;

    @Resource
    @Qualifier("secondaryJdbcTemplate")
    private JdbcTemplate secondaryJdbcTemplate;

    @Transactional(rollbackFor = Exception.class)
    public void tx1() {
        Integer money = 100;
        String sql = "update account_info set balance = balance + ? where user_id = ?";
        primaryJdbcTemplate.update(sql, new Object[]{-money, 1001});
        secondaryJdbcTemplate.update(sql, new Object[]{money, 1002});
    }

    @Transactional(rollbackFor = Exception.class)
    public void tx2() {
        Integer money = 100;
        String sql = "update account_info set balance = balance + ? where user_id = ?";
        primaryJdbcTemplate.update(sql, new Object[]{-money, 1001});
        secondaryJdbcTemplate.update(sql, new Object[]{money, 1002});
        throw new RuntimeException();
    }
}
           
@RunWith(SpringRunner.class)
@SpringBootTest
public class AtomikosAtApplicationTests {

    @Resource
    private AccountService accountService;

	// 正常執行
    @Test
    public void test1() {
        accountService.tx1();
    }
    
    // 異常復原
    @Test
    public void test2() {
        accountService.tx2();
    }
}
           

seata

我們需要開發2個應用,就不貼pom依賴了,從github看依賴吧

https://github.com/erlieStar/spring-cloud-distributed-transaction

seata-xa-tm

我們隻需要配置一下application.yaml即可。你可能看到很多文章還需要配置file.conf和registry.conf,用了spring starter後直接在application.yaml配置即可

application.yaml

server:
  port: 30002

spring:
  application:
    name: seata-xa-tm
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url : jdbc:mysql://myhost:3306/db_account_1?useUnicode=true&characterEncoding=utf8
    username: test
    password: test

seata:
  data-source-proxy-mode: XA
  enabled: true
  application-id: ${spring.application.name}
  tx-service-group: my_test_tx_group
  service:
    vgroup-mapping:
      my_test_tx_group: default
    grouplist:
      default: myhost:18091
    disable-global-transaction: false
  config:
    type: file
    file:
      name: file.conf
  registry:
    type: file
    file:
      name: file.conf
           

用@EnableAutoDataSourceProxy注解開啟資料源代理,隻需指定為XA模式,因為預設是AT模式

@EnableFeignClients
@SpringBootApplication
@EnableAutoDataSourceProxy(dataSourceProxyMode = "XA")
public class SeataXATm {

    public static void main(String[] args) {
        SpringApplication.run(SeataXATm.class, args);
    }

}
           

開發轉賬接口

@RestController
@RequestMapping("account")
public class AccountController {

    @Resource
    private JdbcTemplate jdbcTemplate;
    @Resource
    private RmAccountClient rmAccountClient;

    @GlobalTransactional
    @RequestMapping("transfer")
    public String transfer(@RequestParam("fromUserId") String fromUserId,
                           @RequestParam("toUserId") String toUserId,
                           @RequestParam("money") Integer money,
                           @RequestParam(value = "flag", required = false) Boolean flag) {
        String sql = "update account_info set balance = balance + ? where user_id = ?";
        jdbcTemplate.update(sql, new Object[]{-money, fromUserId});
        String result = rmAccountClient.transfer(fromUserId, toUserId, money);
        if ("fail".equals(result)) {
            throw new RuntimeException("轉賬失敗");
        }
        if (flag != null && flag) {
            throw new RuntimeException("測試同時復原");
        }
        return "success";
    }
}
           

調用另外一個賬戶服務,為了友善我就不用注冊中心了,直接指定了服務的位址

@FeignClient(value = "seata-xa-rm", url = "http://127.0.0.1:30001")
public interface RmAccountClient {

    @RequestMapping("account/transfer")
    String transfer(@RequestParam("fromUserId") String fromUserId,
                    @RequestParam("toUserId") String toUserId,
                    @RequestParam("money") Integer money);

}
           

seata-xa-rm

這是我們開發的另一個賬戶服務

application.yaml

server:
  port: 30001

spring:
  application:
    name: seata-xa-rm
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url : jdbc:mysql://myhost:3306/db_account_2?useUnicode=true&characterEncoding=utf8
    username: test
    password: test

seata:
  data-source-proxy-mode: XA
  enabled: true
  application-id: ${spring.application.name}
  tx-service-group: my_test_tx_group
  service:
    vgroup-mapping:
      my_test_tx_group: default
    grouplist:
      default: myhost:18091
    disable-global-transaction: false
  config:
    type: file
    file:
      name: file.conf
  registry:
    type: file
    file:
      name: file.conf
           

啟動類

@SpringBootApplication
@EnableAutoDataSourceProxy(dataSourceProxyMode = "XA")
public class SeataXARm {

    public static void main(String[] args) {
        SpringApplication.run(SeataXARm.class, args);
    }
}
           

轉賬接口

@RestController
@RequestMapping("account")
public class AccountController {

    @Resource
    private JdbcTemplate jdbcTemplate;

    @RequestMapping("transfer")
    public String transfer(@RequestParam("fromUserId") String fromUserId,
                           @RequestParam("toUserId") String toUserId,
                           @RequestParam("money") Integer money) {
        String sql = "update account_info set balance = balance + ? where user_id = ?";
        int result = jdbcTemplate.update(sql, new Object[]{money, toUserId});
        if (result == 0) {
            return "fail";
        }
        return "success";
    }
}
           

測試

啟動這2個服務

測試正常轉賬

測試seata-xa-tm項目失敗復原

curl http:127.0.0.1:30002/account?fromUserId=1001&toUserId=1002&money=100&flag=truee
           

用flag=true來讓seata-xa-tm項目失敗復原

測試seata-xa-rm項目失敗復原

curl http:127.0.0.1:30002/account?fromUserId=1001&toUserId=1003&money=100&flag=truee
           

toUserId=1003,使用者不存在,seata-xa-rm傳回fail復原

參考部落格

彙總

[1]https://segmentfault.com/a/1190000040321750

[2]https://zhuanlan.zhihu.com/p/183753774

xa事務

[3]https://www.jianshu.com/p/a59c79186b6d

[4]https://www.jianshu.com/p/7003d58ea182

jta

[5]https://www.jianshu.com/p/86b4ab4f2d18