天天看點

Spring Cloud Alibaba之seata 分布式事務

這篇部落格涉及到nacos和seata-server的搭建和基本使用,可以看如下兩篇

​​Docker 部署 Seata Server(使用nacos 做為注冊中心和配置中心)​​Nacos的簡單介紹以及服務注冊與發現功能的基本使用

Seata簡單介紹

Seata 是一款開源的分布式事務解決方案,緻力于在微服務架構下提供高性能和簡單易用的分布式事務服務。

Spring Cloud Alibaba之seata 分布式事務

Seata術語

  • TC (Transaction Coordinator) - 事務協調者:維護全局和分支事務的狀态,驅動全局事務送出或復原。
  • TM (Transaction Manager) - 事務管理器: 定義全局事務的範圍:開始全局事務、送出或復原全局事務。
  • RM (Resource Manager) - 資料總管: 管理分支事務處理的資源,與TC交談以注冊分支事務和報告分支事務的狀态,并驅動分支事務送出或復原。

TC 為單獨部署的server服務端,TM和RM 是嵌入到應用中的client用戶端

處理流程

  1. TM請求TC開啟一個全局事務。TC會生成一個XID作為該全局事務的編号。
  2. XID,會在微服務的調用鍊路中傳播,保證将多個微服務的子事務關聯在一起。
  3. RM請求TC将本地事務注冊為全局事務的分支事務,通過全局事務的XID進行關聯。
  4. TM請求TC告訴XID對應的全局事務是進行送出還是復原。
  5. TC驅動RM們将XID對應的自己的本地事務進行送出還是復原。
Spring Cloud Alibaba之seata 分布式事務

開啟全局事務

以@GlobalTransactional為入口,GlobalTransactionalInterceptor為切入點,

TM會向TC發起一個請求(服務端使用的netty)開啟一個全局事務,生成全局事務的XID,通過服務調用鍊路傳播

開啟分支事務

執行業務代碼,準備開啟分支事務。

分支事務開啟的原理:

1.由于seata對底層的DataSource,Connection等使用DataSourceProxy,ConnectionProxy代理

2.當進行資料庫操作的時候,ConnectionProxy會判斷是否包含全局事務

2.1 包含全局食物

2.1.1 RM向TC發起請求注冊分支事務

2.1.2 插入復原日志(undo_log表,業務庫必須建立這個表)

2.1.3 事務送出

2.1.4 向TC上報事務狀态

2.2 不包含全局事務

2.2.1 事務送出

全局事務送出

當業務邏輯執行沒問題的話,就需要執行全局事務的送出。

1.TM向TC發起全局事務送出請求

2.TC收到之後,會向各個分支事務發起事務送出請求

3.分支事務接收到請求,隻需要删除全局事務的undo_log記錄就可以了

全局事務復原

當業務邏輯執行發生異常,就需要執行全局事務的復原。

1.TM向TC發起全局事務復原請求

2.TC收到之後,會向各個分支事務發起事務復原請求

3.分支事務接收到請求,隻需要根據XID對應的undo_log表記錄進行復原即可(記錄執行前後的記錄)

重要的類

  • TC : DefaultCoordinator
  • TM : GlobalTransaction、GlobalTransactionalInterceptor、TransactionalTemplate
  • RM : DataSourceProxy、ConnectProxy

本地事務和分布式事務

當隻有一個應用程序的時候,修改一個資料庫,就不會造成分布式事務問題。

本地事務使用Spring 事務 – @Transactional就可以解決本地的事務問題

當分布式微服務興起,一個服務會引起其他多個微服務的耦合操作。其他微服務會去不同的資料庫做CRUD,這樣跨程序和跨資料庫所導緻的事務問題就是分布式事務,如果處理的不好,就會影響全局的資料一緻性問題。

比如:有三個服務A,B,C。我們調用A服務的時候,A服務會調用B服務和C服務,而這三個服務又有各自的資料庫,A,B,C三個服務會分别對自己的資料庫做CRUD操作。要是三個都執行成功,就沒什麼問題,但是B服務可能因為當機或者其他異常而不能執行,但是這個時候A,C服務都正常執行了,這樣全局的資料就不一緻了。

總結來說就是一次業務操作需要垮多個資料源或需要垮多個系統進行遠端調用,就會産生分布式事務問題

​​版本說明​​

元件版本關系

每個 Spring Cloud Alibaba 版本及其自身所适配的各元件對應版本(經過驗證,自行搭配各元件版本不保證可用)如下表所示(最新版本用*标記):

Spring Cloud Alibaba之seata 分布式事務

畢業版本依賴關系(推薦使用)

下表為按時間順序釋出的 Spring Cloud Alibaba 以及對應的适配 Spring Cloud 和 Spring Boot 版本關系(由于 Spring Cloud 版本命名有調整,是以對應的 Spring Cloud Alibaba 版本号也做了對應變化)

Spring Cloud Alibaba之seata 分布式事務

示例

大緻說明

總共有四個微服務子產品,将四個微服務子產品都注冊到nacos server 上面

seata-buy : 使用者購買商品,是以要和seata-order,seata-stock和seata-user三個微服務互動。

seata-order: 使用者購買商品的時候,将訂單儲存到orderform資料庫

seata-stock:使用者購買商品的時候,stock資料庫裡指定ID的商品數量減去使用者購買的數量

seata-user: 使用者購買商品的時候,user資料庫裡指定使用者ID的使用者減去購買所花的錢财

這樣就是一個分布式服務的場景,不過我的資料表都是儲存在一個資料庫裡面的,資料庫沒有做分布式

建表

# 建立seata 資料庫
CREATE DATABASE seata;
# 進入使用資料庫 seata
USE seata;

# 建立商品stock表
DROP TABLE IF EXISTS `stock`;
CREATE TABLE `stock` (
     `id` int(11) NOT NULL AUTO_INCREMENT,
     `number` int(11) DEFAULT 0,
     PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

# 建立 訂單orderform表
DROP TABLE IF EXISTS `orderform`;
CREATE TABLE `orderform` (
     `id` int(11) NOT NULL AUTO_INCREMENT,
     `user_id` int(11) DEFAULT NULL,
     `product_id` int(11) DEFAULT NULL,
     `number` int(11) DEFAULT 0,
     `money` int(11) DEFAULT 0,
     PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

# 建立使用者user表
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
    `id` int(11) NOT NULL AUTO_INCREMENT,
    `money` int(11) DEFAULT 0,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

# 手動插入資料
INSERT INTO user (id, money) VALUES (1,3);

# 手動插入資料
insert into stock (id,number) values (1,3);      

父項目pom.xml檔案規定依賴版本

我使用的資料庫是mysql 8,其他元件版本下面都有,下面是一個父module,用于管理各元件版本,便于管理下面的子module(各微服務子產品的元件版本統一比較好)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xt</groupId>
    <artifactId>springcloud</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>seata-buy</module>
        <module>seata-order</module>
        <module>seata-stock</module>
        <module>seata-user</module>
    </modules>


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.12.RELEASE</version>
    </parent>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <spring-cloud-alibaba-version>2.2.7.RELEASE</spring-cloud-alibaba-version>
        <spring-cloud-version>Hoxton.SR12</spring-cloud-version>
        <mysql-connector-java-version>8.0.26</mysql-connector-java-version>
        <lombok-version>1.18.22</lombok-version>
        <mybatis-plus-boot-starter-version>3.4.2</mybatis-plus-boot-starter-version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring-cloud-alibaba-version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud-version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>com.baomidou</groupId>
                <artifactId>mybatis-plus-boot-starter</artifactId>
                <version>${mybatis-plus-boot-starter-version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>${lombok-version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql-connector-java-version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
        
    </dependencyManagement>
</project>      

seata-buy子項目

交易請求的接口,與下面三個微服務項目互動

結構圖

Spring Cloud Alibaba之seata 分布式事務

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud</artifactId>
        <groupId>com.xt</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>seata-buy</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency>
        
    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.*</include>
                </includes>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.*</include>
                </includes>
            </resource>
        </resources>
    </build>

</project>      

application.yml

server:
  port: 9000

spring:
  application:
    name: seata-buy
  cloud:
    nacos:
      # 将這個服務注冊到nacos 上面去
      discovery:
        server-addr: nacos IP:8848
    alibaba:
      seata:
        tx-service-group: changsha_tx_group  # 配置事務分組
        service:
          vgroup-mapping:
            changsha_tx_group: default

seata:
  registry:
    #配置seata 的注冊中心,告訴seata client 怎麼去通路seata server(TC,裡面運作着這個事務協調器)
    type: nacos
    nacos:
      server-addr: nacos IP:8848  # seata-server 所在的nacos服務位址
      application: seata-server     # 服務名
      username: nacos
      password: nacos
      group: SEATA_GROUP   # seata-server 所在的分組
  config:
    type: nacos
    nacos:
      server-addr: nacos IP:8848
      group: SEATA_GROUP      

BuyController

package com.xt.springcloud.controller;

import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;

@RestController
public class BuyController {

    @Autowired
    private final RestTemplate restTemplate;

    public BuyController(RestTemplate restTemplate) {
        this.restTemplate = restTemplate;
    }

    @PostMapping("/buy")
    @GlobalTransactional
    public String buy(@RequestParam("userId") Integer userId,
                      @RequestParam("productId") Integer productId,
                      @RequestParam("number") Integer count) {

        // 請求參數
        MultiValueMap<String, String> queryParams = new LinkedMultiValueMap<>();
        queryParams.add("userId", userId.toString());
        queryParams.add("productId", productId.toString());
        queryParams.add("number", count.toString());
        queryParams.add("money", count.toString());

        // 構造請求
        UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://seata-order/orderform/create").queryParams(queryParams);
        restTemplate.postForObject(builder.toUriString(), null, Void.class);

        // 構造請求
        builder = UriComponentsBuilder.fromHttpUrl("http://seata-stock/stock/deduct").queryParams(queryParams);
        restTemplate.postForObject(builder.toUriString(), null, Void.class);

        // 構造請求 restTemplate 加上了@LoadBalanced注解以後,必須使用應用名來通路指定服務,而不是IP位址
        builder = UriComponentsBuilder.fromHttpUrl("http://seata-user/user/debit").queryParams(queryParams);
        restTemplate.postForObject(builder.toUriString(), null, Void.class);
        return "success";
    }

}      

SeataBuyApplication

package com.xt.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
@EnableDiscoveryClient
public class SeataBuyApplication {

    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    public static void main(String[] args) {
        SpringApplication.run(SeataBuyApplication.class);
    }

}      

seata-order子項目

交易進行時,生成使用者購買指定商品的訂單,然後入庫

結構圖

Spring Cloud Alibaba之seata 分布式事務

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud</artifactId>
        <groupId>com.xt</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>seata-order</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>

        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.2</version>
        </dependency>
        
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency>
    </dependencies>

</project>      

application.yml

server:
  port: 9001

spring:
  application:
    name: seata-order

  cloud:
    nacos:
      # 将這個服務注冊到nacos 上面去
      discovery:
        server-addr: nacos IP:8848
    alibaba:
      seata:
        tx-service-group: changsha_tx_group  # 配置事務分組,異地容災使用
        service:
          vgroup-mapping:
            changsha_tx_group: default

  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    username:
    password: 
    url: jdbc:mysql://mysql IP:3306/seata?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
    hikari:
      max-lifetime: 30000
#  jpa:
#    show-sql: true


# 配置日志輸出 使用預設控制台列印
mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl


seata:
  registry:
    #配置seata 的注冊中心,告訴seata client 怎麼去通路seata server(TC,裡面運作着這個事務協調器)
    type: nacos
    nacos:
      server-addr: nacos IP:8848  # seata-server 所在的nacos服務位址
      application: seata-server     # 服務名
      username: nacos
      password: nacos
      group: SEATA_GROUP   # seata-server 所在的分組
  config:
    type: nacos
    nacos:
      server-addr: nacos IP:8848
      group: SEATA_GROUP      

OrderFormController

package com.xt.springcloud.controller;

import com.xt.springcloud.service.IOrderFormService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("/orderform")
public class OrderFormController {

    @Resource
    private IOrderFormService orderService;

    @PostMapping("/create")

    public void create(@RequestParam("userId") Integer userId,
                       @RequestParam("productId") Integer productId,
                       @RequestParam("number") Integer count,
                       @RequestParam("money") Integer money) {
        orderService.create(userId, productId, count, money);
    }
}      

OrderForm

package com.xt.springcloud.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@TableName( "orderform") 
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderForm {
    // 訂單id

    @TableId(type = IdType.AUTO)
    public Integer id;

    // 使用者id
    public Integer userId;

    // 商品id
    public Integer productId;

    // 商品購買數量
    public Integer number;

    // 訂單金額
    public Integer money;
}      

OrderFormMapper

package com.xt.springcloud.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

import com.xt.springcloud.entity.OrderForm;
import org.springframework.stereotype.Repository;

@Repository
public interface OrderFormMapper extends BaseMapper<OrderForm> {
}      

IOrderFormService

package com.xt.springcloud.service;


public interface IOrderFormService {

    void create(int userId, int productId, int number, int money);
}      

OrderFormServiceImpl

package com.xt.springcloud.service.impl;

import com.xt.springcloud.entity.OrderForm;

import com.xt.springcloud.mapper.OrderFormMapper;
import com.xt.springcloud.service.IOrderFormService;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service
public class OrderFormServiceImpl implements IOrderFormService {

    @Resource
    private OrderFormMapper orderFormMapper;

    @Override
    public void create(int userId, int productId, int number, int money) {
        // 生成訂單
        OrderForm order = new OrderForm();
        order.setUserId(userId);
        order.setProductId(productId);
        order.setNumber(number);
        order.setMoney(money);
        System.out.println(order.toString());
        orderFormMapper.insert(order);

    }
}      

SeataOrderFormApplication

package com.xt.springcloud;


import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.xt.springcloud.mapper")
public class SeataOrderFormApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeataOrderFormApplication.class);
    }
}      

seata-stock子項目

交易進行時,扣除指定商品的庫存

Spring Cloud Alibaba之seata 分布式事務

pom.xml

和上一個子項目的依賴都一樣,我就不貼了

application.yml

也和上一個差不多,改一下服務端口号和應用名稱就行

server:
  port: 9002

spring:
  application:
    name: seata-stock      

StockController

package com.xt.springcloud.controller;

import com.xt.springcloud.service.IStockService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("/stock")
public class StockController {

    @Resource
    private IStockService storageService;

    @PostMapping("/deduct")
    public void deduct(@RequestParam("productId") Integer productId,
                       @RequestParam("number") Integer count) {
        storageService.deduct(productId, count);
    }
}      

Stock

package com.xt.springcloud.entity;

import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

@TableName("stock")
@Data
public class Stock {

    // 商品id
    public Integer id;

    // 庫存
    public Integer number;
}      

StockMapper

package com.xt.springcloud.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.xt.springcloud.entity.Stock;
import org.springframework.stereotype.Repository;

@Repository
public interface StockMapper extends BaseMapper<Stock> {
}      

IStockService

package com.xt.springcloud.service;


public interface IStockService {

    void deduct(int productId, int number);
}      

StockServiceImpl

package com.xt.springcloud.service.impl;

import com.xt.springcloud.entity.Stock;
import com.xt.springcloud.mapper.StockMapper;
import com.xt.springcloud.service.IStockService;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Optional;


@Service
public class StockServiceImpl implements IStockService {

    @Resource
    private StockMapper stockMapper;

    @Override
    public void deduct(int productId, int number) {
        Optional<Stock> byId = Optional.ofNullable(stockMapper.selectById(productId));

        if(byId.isPresent()) {
            Stock storage = byId.get();
            if(storage.getNumber() >= number) {
                // 減庫存
                storage.setNumber(storage.getNumber() - number);
                stockMapper.insert(storage);
            }
            else {
                throw new RuntimeException("該商品庫存不足!");
            }
        }
        else {
            throw new RuntimeException("該商品不存在!");
        }
    }
}      

SeataStockApplication

package com.xt.springcloud;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@EnableDiscoveryClient
@SpringBootApplication
@MapperScan("com.xt.springcloud.mapper")
public class SeataStockApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeataStockApplication.class);
    }
}      

seata-user子項目

交易進行時,扣除指定使用者賬戶裡面的錢

Spring Cloud Alibaba之seata 分布式事務

pom.xml檔案和上一子項目依賴都一樣

application.yml 和上一子項目也差不多,就改一下服務端口号和應用名

server:
  port: 9003

spring:
  application:
    name: seata-user      

UserController

package com.xt.springcloud.controller;

import com.xt.springcloud.service.IUserService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("/user")
public class UserController {

    @Resource
    private IUserService userService;

    @PostMapping("/debit")
    public void debit(@RequestParam("userId") Integer userId,
                      @RequestParam("money") Integer money) {
        userService.debit(userId, money);
    }
}      

User

package com.xt.springcloud.entity;

import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import javax.persistence.*;

@TableName("user")
@Data
public class User {

    // 使用者id
    public Integer id;
    // 使用者餘額
    public Integer money;
}      

UserMapper

package com.xt.springcloud.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.xt.springcloud.entity.User;
import org.springframework.stereotype.Repository;


@Repository
public interface UserMapper extends BaseMapper<User> {
}      

IUserService

package com.xt.springcloud.service;


public interface IUserService {

    void debit(int userId, int money);
}      

UserServiceImpl

package com.xt.springcloud.service.impl;

import com.xt.springcloud.entity.User;
import com.xt.springcloud.mapper.UserMapper;
import com.xt.springcloud.service.IUserService;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Optional;

@Service
public class UserServiceImpl implements IUserService {

    @Resource
    private UserMapper userMapper;

    @Override
    public void debit(int userId, int money) {
        Optional<User> byId = Optional.ofNullable(userMapper.selectById(userId));
        if(byId.isPresent()) {
            User user = byId.get();
            if(user.getMoney() >= money) {
                // 減餘額
                user.setMoney(user.getMoney() - money);
                userMapper.insert(user);
            }
            else {
                throw new RuntimeException("該使用者餘額不足!");
            }
        }
        else {
            throw new RuntimeException("沒有該使用者!");
        }
    }
}      

SeataUserApplication

package com.xt.springcloud;


import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.xt.springcloud.mapper")
public class SeataUserApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeataUserApplication.class);
    }

}      

測試(沒引入seata之前)

沒使用seata分布式事務之前,先手動插入兩條資料

# 手動插入資料
INSERT INTO user (id, money) VALUES (1,3);

# 手動插入資料
insert into stock (id,number) values (1,3);      

然後使用postman發起請求測試/buy,讓他請求其餘三個微服務

http://127.0.0.1:9000/buy?userId=1&productId=1&number=4      

seata-stock微服務抛出異常

Spring Cloud Alibaba之seata 分布式事務

但是seata-order微服務卻正常運作

Spring Cloud Alibaba之seata 分布式事務

這樣就導緻了全局資料不一緻的問題。明明這個交易沒有成功,但訂單卻已經入庫。而貨物的數量和使用者的錢都沒有發生變化使用者1的錢沒有發生變化,還是3

Spring Cloud Alibaba之seata 分布式事務

商品1的庫存沒有發生變化,還是3

Spring Cloud Alibaba之seata 分布式事務

但是訂單資料庫卻多了一筆交易記錄

Spring Cloud Alibaba之seata 分布式事務

這種情況開啟本地事務也是沒用的,他不能協調多個程序之間的事務關系,隻能保證自己的操作是事務的。

Seata 引入(AT模式)

內建seata

每個微服務下面都添加這條依賴

<dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
  </dependency>      

我們這裡使用seata 的AT模式

微服務對應的資料庫中要建立一個undo_log,是當事務沒有運作成功的時候做復原用的

通過已經運作的sql進行記錄日志,成功了删掉日志,失敗了通過undo_log日志來復原資料,再删掉日志

(如果是多個資料庫,記得每個微服務對應的資料庫都要建立一個undo_log表)

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';      

增加配置資訊

seata:
  registry:
    #配置seata 的注冊中心,告訴seata client 怎麼去通路seata server(TC,裡面運作着這個事務協調器)
    type: nacos
    nacos:
      server-addr: IP:8848  # seata-server 所在的nacos服務位址
      application: seata-server     # 服務名
      username: nacos
      password: nacos
      group: SEATA_GROUP   # seata-server 所在的分組
  config:
    type: nacos
    nacos:
      server-addr: IP:8848
      group: SEATA_GROUP

spring:
  cloud:
    alibaba:
      seata:
        tx-service-group: changsha_tx_group  # 配置事務分組,異地容災使用
        service:
          vgroup-mapping:
            changsha_tx_group: default      

在buy的這個方法上加上@GlobalTransactional注解,因為他與其他微服務的方法有互動依賴

以@GlobalTransactional為入口,GlobalTransactionalInterceptor為切入點,

TM會向TC發起一個請求(服務端使用的netty)開啟一個全局事務,生成全局事務的XID,通過服務調用鍊路傳播

@PostMapping("/buy")
 @GlobalTransactional
 public String buy(@RequestParam("userId") Integer userId,
                   @RequestParam("productId") Integer productId,
                   @RequestParam("number") Integer count) {

     // 請求參數
     MultiValueMap<String, String> queryParams = new LinkedMultiValueMap<>();
     queryParams.add("userId", userId.toString());
     queryParams.add("productId", productId.toString());
     queryParams.add("number", count.toString());
     queryParams.add("money", count.toString());

     // 構造請求
     UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://seata-order/orderform/create").queryParams(queryParams);
     restTemplate.postForObject(builder.toUriString(), null, Void.class);

     // 構造請求
     builder = UriComponentsBuilder.fromHttpUrl("http://seata-stock/stock/deduct").queryParams(queryParams);
     restTemplate.postForObject(builder.toUriString(), null, Void.class);

     // 構造請求 restTemplate 加上了@LoadBalanced注解以後,必須使用應用名來通路指定服務,而不是IP位址
     builder = UriComponentsBuilder.fromHttpUrl("http://seata-user/user/debit").queryParams(queryParams);
     restTemplate.postForObject(builder.toUriString(), null, Void.class);


     return "success";
 }      

再次測試

http://127.0.0.1:9000/buy?userId=1&productId=1&number=4      

資料庫情況正常,沒有發生變化

spring cloud 項目用戶端的日志

seata-buy微服務

Spring Cloud Alibaba之seata 分布式事務

seata-stock微服務因為商品庫存不足抛出異常

Spring Cloud Alibaba之seata 分布式事務

seata-order微服務本來執行了sql 語句插入訂單資料,後來還是復原了,因為seata-stock微服務未執行成功,抛出了異常。

Spring Cloud Alibaba之seata 分布式事務

seata-server 日志

每個微服務都會向seata-server的TC 注冊一個RM和TM

遇到異常,先復原分支事務成功,然後log復原全局事務成功

References:

  • ​​http://seata.io/zh-cn/​​
  • ​​https://github.com/alibaba/spring-cloud-alibaba/wiki/%E7%89%88%E6%9C%AC%E8%AF%B4%E6%98%8E​​
  • ​​https://www.jianshu.com/p/fe8c48f38382​​
  • ​​https://github.com/seata/seata/wiki/%E6%A6%82%E8%A7%88​​

繼續閱讀