天天看点

Spring Cloud Alibaba之seata 分布式事务

这篇博客涉及到nacos和seata-server的搭建和基本使用,可以看如下两篇

​​Docker 部署 Seata Server(使用nacos 做为注册中心和配置中心)​​Nacos的简单介绍以及服务注册与发现功能的基本使用

Seata简单介绍

Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。

Spring Cloud Alibaba之seata 分布式事务

Seata术语

  • TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚。
  • TM (Transaction Manager) - 事务管理器: 定义全局事务的范围:开始全局事务、提交或回滚全局事务。
  • RM (Resource Manager) - 资源管理器: 管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

TC 为单独部署的server服务端,TM和RM 是嵌入到应用中的client客户端

处理流程

  1. TM请求TC开启一个全局事务。TC会生成一个XID作为该全局事务的编号。
  2. XID,会在微服务的调用链路中传播,保证将多个微服务的子事务关联在一起。
  3. RM请求TC将本地事务注册为全局事务的分支事务,通过全局事务的XID进行关联。
  4. TM请求TC告诉XID对应的全局事务是进行提交还是回滚。
  5. TC驱动RM们将XID对应的自己的本地事务进行提交还是回滚。
Spring Cloud Alibaba之seata 分布式事务

开启全局事务

以@GlobalTransactional为入口,GlobalTransactionalInterceptor为切入点,

TM会向TC发起一个请求(服务端使用的netty)开启一个全局事务,生成全局事务的XID,通过服务调用链路传播

开启分支事务

执行业务代码,准备开启分支事务。

分支事务开启的原理:

1.由于seata对底层的DataSource,Connection等使用DataSourceProxy,ConnectionProxy代理

2.当进行数据库操作的时候,ConnectionProxy会判断是否包含全局事务

2.1 包含全局食物

2.1.1 RM向TC发起请求注册分支事务

2.1.2 插入回滚日志(undo_log表,业务库必须新建这个表)

2.1.3 事务提交

2.1.4 向TC上报事务状态

2.2 不包含全局事务

2.2.1 事务提交

全局事务提交

当业务逻辑执行没问题的话,就需要执行全局事务的提交。

1.TM向TC发起全局事务提交请求

2.TC收到之后,会向各个分支事务发起事务提交请求

3.分支事务接收到请求,只需要删除全局事务的undo_log记录就可以了

全局事务回滚

当业务逻辑执行发生异常,就需要执行全局事务的回滚。

1.TM向TC发起全局事务回滚请求

2.TC收到之后,会向各个分支事务发起事务回滚请求

3.分支事务接收到请求,只需要根据XID对应的undo_log表记录进行回滚即可(记录执行前后的记录)

重要的类

  • TC : DefaultCoordinator
  • TM : GlobalTransaction、GlobalTransactionalInterceptor、TransactionalTemplate
  • RM : DataSourceProxy、ConnectProxy

本地事务和分布式事务

当只有一个应用进程的时候,修改一个数据库,就不会造成分布式事务问题。

本地事务使用Spring 事务 – @Transactional就可以解决本地的事务问题

当分布式微服务兴起,一个服务会引起其他多个微服务的耦合操作。其他微服务会去不同的数据库做CRUD,这样跨进程和跨数据库所导致的事务问题就是分布式事务,如果处理的不好,就会影响全局的数据一致性问题。

比如:有三个服务A,B,C。我们调用A服务的时候,A服务会调用B服务和C服务,而这三个服务又有各自的数据库,A,B,C三个服务会分别对自己的数据库做CRUD操作。要是三个都执行成功,就没什么问题,但是B服务可能因为宕机或者其他异常而不能执行,但是这个时候A,C服务都正常执行了,这样全局的数据就不一致了。

总结来说就是一次业务操作需要垮多个数据源或需要垮多个系统进行远程调用,就会产生分布式事务问题

​​版本说明​​

组件版本关系

每个 Spring Cloud Alibaba 版本及其自身所适配的各组件对应版本(经过验证,自行搭配各组件版本不保证可用)如下表所示(最新版本用*标记):

Spring Cloud Alibaba之seata 分布式事务

毕业版本依赖关系(推荐使用)

下表为按时间顺序发布的 Spring Cloud Alibaba 以及对应的适配 Spring Cloud 和 Spring Boot 版本关系(由于 Spring Cloud 版本命名有调整,所以对应的 Spring Cloud Alibaba 版本号也做了对应变化)

Spring Cloud Alibaba之seata 分布式事务

示例

大致说明

总共有四个微服务模块,将四个微服务模块都注册到nacos server 上面

seata-buy : 用户购买商品,所以要和seata-order,seata-stock和seata-user三个微服务交互。

seata-order: 用户购买商品的时候,将订单保存到orderform数据库

seata-stock:用户购买商品的时候,stock数据库里指定ID的商品数量减去用户购买的数量

seata-user: 用户购买商品的时候,user数据库里指定用户ID的用户减去购买所花的钱财

这样就是一个分布式服务的场景,不过我的数据表都是保存在一个数据库里面的,数据库没有做分布式

建表

# 创建seata 数据库
CREATE DATABASE seata;
# 进入使用数据库 seata
USE seata;

# 创建商品stock表
DROP TABLE IF EXISTS `stock`;
CREATE TABLE `stock` (
     `id` int(11) NOT NULL AUTO_INCREMENT,
     `number` int(11) DEFAULT 0,
     PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

# 创建 订单orderform表
DROP TABLE IF EXISTS `orderform`;
CREATE TABLE `orderform` (
     `id` int(11) NOT NULL AUTO_INCREMENT,
     `user_id` int(11) DEFAULT NULL,
     `product_id` int(11) DEFAULT NULL,
     `number` int(11) DEFAULT 0,
     `money` int(11) DEFAULT 0,
     PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

# 创建用户user表
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
    `id` int(11) NOT NULL AUTO_INCREMENT,
    `money` int(11) DEFAULT 0,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

# 手动插入数据
INSERT INTO user (id, money) VALUES (1,3);

# 手动插入数据
insert into stock (id,number) values (1,3);      

父项目pom.xml文件规定依赖版本

我使用的数据库是mysql 8,其他组件版本下面都有,下面是一个父module,用于管理各组件版本,便于管理下面的子module(各微服务模块的组件版本统一比较好)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xt</groupId>
    <artifactId>springcloud</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>seata-buy</module>
        <module>seata-order</module>
        <module>seata-stock</module>
        <module>seata-user</module>
    </modules>


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.12.RELEASE</version>
    </parent>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <spring-cloud-alibaba-version>2.2.7.RELEASE</spring-cloud-alibaba-version>
        <spring-cloud-version>Hoxton.SR12</spring-cloud-version>
        <mysql-connector-java-version>8.0.26</mysql-connector-java-version>
        <lombok-version>1.18.22</lombok-version>
        <mybatis-plus-boot-starter-version>3.4.2</mybatis-plus-boot-starter-version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring-cloud-alibaba-version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud-version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>com.baomidou</groupId>
                <artifactId>mybatis-plus-boot-starter</artifactId>
                <version>${mybatis-plus-boot-starter-version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>${lombok-version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql-connector-java-version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
        
    </dependencyManagement>
</project>      

seata-buy子项目

交易请求的接口,与下面三个微服务项目交互

结构图

Spring Cloud Alibaba之seata 分布式事务

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud</artifactId>
        <groupId>com.xt</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>seata-buy</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency>
        
    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.*</include>
                </includes>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.*</include>
                </includes>
            </resource>
        </resources>
    </build>

</project>      

application.yml

server:
  port: 9000

spring:
  application:
    name: seata-buy
  cloud:
    nacos:
      # 将这个服务注册到nacos 上面去
      discovery:
        server-addr: nacos IP:8848
    alibaba:
      seata:
        tx-service-group: changsha_tx_group  # 配置事务分组
        service:
          vgroup-mapping:
            changsha_tx_group: default

seata:
  registry:
    #配置seata 的注册中心,告诉seata client 怎么去访问seata server(TC,里面运行着这个事务协调器)
    type: nacos
    nacos:
      server-addr: nacos IP:8848  # seata-server 所在的nacos服务地址
      application: seata-server     # 服务名
      username: nacos
      password: nacos
      group: SEATA_GROUP   # seata-server 所在的分组
  config:
    type: nacos
    nacos:
      server-addr: nacos IP:8848
      group: SEATA_GROUP      

BuyController

package com.xt.springcloud.controller;

import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;

@RestController
public class BuyController {

    @Autowired
    private final RestTemplate restTemplate;

    public BuyController(RestTemplate restTemplate) {
        this.restTemplate = restTemplate;
    }

    @PostMapping("/buy")
    @GlobalTransactional
    public String buy(@RequestParam("userId") Integer userId,
                      @RequestParam("productId") Integer productId,
                      @RequestParam("number") Integer count) {

        // 请求参数
        MultiValueMap<String, String> queryParams = new LinkedMultiValueMap<>();
        queryParams.add("userId", userId.toString());
        queryParams.add("productId", productId.toString());
        queryParams.add("number", count.toString());
        queryParams.add("money", count.toString());

        // 构造请求
        UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://seata-order/orderform/create").queryParams(queryParams);
        restTemplate.postForObject(builder.toUriString(), null, Void.class);

        // 构造请求
        builder = UriComponentsBuilder.fromHttpUrl("http://seata-stock/stock/deduct").queryParams(queryParams);
        restTemplate.postForObject(builder.toUriString(), null, Void.class);

        // 构造请求 restTemplate 加上了@LoadBalanced注解以后,必须使用应用名来访问指定服务,而不是IP地址
        builder = UriComponentsBuilder.fromHttpUrl("http://seata-user/user/debit").queryParams(queryParams);
        restTemplate.postForObject(builder.toUriString(), null, Void.class);
        return "success";
    }

}      

SeataBuyApplication

package com.xt.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
@EnableDiscoveryClient
public class SeataBuyApplication {

    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    public static void main(String[] args) {
        SpringApplication.run(SeataBuyApplication.class);
    }

}      

seata-order子项目

交易进行时,生成用户购买指定商品的订单,然后入库

结构图

Spring Cloud Alibaba之seata 分布式事务

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud</artifactId>
        <groupId>com.xt</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>seata-order</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>

        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.2</version>
        </dependency>
        
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency>
    </dependencies>

</project>      

application.yml

server:
  port: 9001

spring:
  application:
    name: seata-order

  cloud:
    nacos:
      # 将这个服务注册到nacos 上面去
      discovery:
        server-addr: nacos IP:8848
    alibaba:
      seata:
        tx-service-group: changsha_tx_group  # 配置事务分组,异地容灾使用
        service:
          vgroup-mapping:
            changsha_tx_group: default

  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    username:
    password: 
    url: jdbc:mysql://mysql IP:3306/seata?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
    hikari:
      max-lifetime: 30000
#  jpa:
#    show-sql: true


# 配置日志输出 使用默认控制台打印
mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl


seata:
  registry:
    #配置seata 的注册中心,告诉seata client 怎么去访问seata server(TC,里面运行着这个事务协调器)
    type: nacos
    nacos:
      server-addr: nacos IP:8848  # seata-server 所在的nacos服务地址
      application: seata-server     # 服务名
      username: nacos
      password: nacos
      group: SEATA_GROUP   # seata-server 所在的分组
  config:
    type: nacos
    nacos:
      server-addr: nacos IP:8848
      group: SEATA_GROUP      

OrderFormController

package com.xt.springcloud.controller;

import com.xt.springcloud.service.IOrderFormService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("/orderform")
public class OrderFormController {

    @Resource
    private IOrderFormService orderService;

    @PostMapping("/create")

    public void create(@RequestParam("userId") Integer userId,
                       @RequestParam("productId") Integer productId,
                       @RequestParam("number") Integer count,
                       @RequestParam("money") Integer money) {
        orderService.create(userId, productId, count, money);
    }
}      

OrderForm

package com.xt.springcloud.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@TableName( "orderform") 
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderForm {
    // 订单id

    @TableId(type = IdType.AUTO)
    public Integer id;

    // 用户id
    public Integer userId;

    // 商品id
    public Integer productId;

    // 商品购买数量
    public Integer number;

    // 订单金额
    public Integer money;
}      

OrderFormMapper

package com.xt.springcloud.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

import com.xt.springcloud.entity.OrderForm;
import org.springframework.stereotype.Repository;

@Repository
public interface OrderFormMapper extends BaseMapper<OrderForm> {
}      

IOrderFormService

package com.xt.springcloud.service;


public interface IOrderFormService {

    void create(int userId, int productId, int number, int money);
}      

OrderFormServiceImpl

package com.xt.springcloud.service.impl;

import com.xt.springcloud.entity.OrderForm;

import com.xt.springcloud.mapper.OrderFormMapper;
import com.xt.springcloud.service.IOrderFormService;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service
public class OrderFormServiceImpl implements IOrderFormService {

    @Resource
    private OrderFormMapper orderFormMapper;

    @Override
    public void create(int userId, int productId, int number, int money) {
        // 生成订单
        OrderForm order = new OrderForm();
        order.setUserId(userId);
        order.setProductId(productId);
        order.setNumber(number);
        order.setMoney(money);
        System.out.println(order.toString());
        orderFormMapper.insert(order);

    }
}      

SeataOrderFormApplication

package com.xt.springcloud;


import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.xt.springcloud.mapper")
public class SeataOrderFormApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeataOrderFormApplication.class);
    }
}      

seata-stock子项目

交易进行时,扣除指定商品的库存

Spring Cloud Alibaba之seata 分布式事务

pom.xml

和上一个子项目的依赖都一样,我就不贴了

application.yml

也和上一个差不多,改一下服务端口号和应用名称就行

server:
  port: 9002

spring:
  application:
    name: seata-stock      

StockController

package com.xt.springcloud.controller;

import com.xt.springcloud.service.IStockService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("/stock")
public class StockController {

    @Resource
    private IStockService storageService;

    @PostMapping("/deduct")
    public void deduct(@RequestParam("productId") Integer productId,
                       @RequestParam("number") Integer count) {
        storageService.deduct(productId, count);
    }
}      

Stock

package com.xt.springcloud.entity;

import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

@TableName("stock")
@Data
public class Stock {

    // 商品id
    public Integer id;

    // 库存
    public Integer number;
}      

StockMapper

package com.xt.springcloud.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.xt.springcloud.entity.Stock;
import org.springframework.stereotype.Repository;

@Repository
public interface StockMapper extends BaseMapper<Stock> {
}      

IStockService

package com.xt.springcloud.service;


public interface IStockService {

    void deduct(int productId, int number);
}      

StockServiceImpl

package com.xt.springcloud.service.impl;

import com.xt.springcloud.entity.Stock;
import com.xt.springcloud.mapper.StockMapper;
import com.xt.springcloud.service.IStockService;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Optional;


@Service
public class StockServiceImpl implements IStockService {

    @Resource
    private StockMapper stockMapper;

    @Override
    public void deduct(int productId, int number) {
        Optional<Stock> byId = Optional.ofNullable(stockMapper.selectById(productId));

        if(byId.isPresent()) {
            Stock storage = byId.get();
            if(storage.getNumber() >= number) {
                // 减库存
                storage.setNumber(storage.getNumber() - number);
                stockMapper.insert(storage);
            }
            else {
                throw new RuntimeException("该商品库存不足!");
            }
        }
        else {
            throw new RuntimeException("该商品不存在!");
        }
    }
}      

SeataStockApplication

package com.xt.springcloud;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@EnableDiscoveryClient
@SpringBootApplication
@MapperScan("com.xt.springcloud.mapper")
public class SeataStockApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeataStockApplication.class);
    }
}      

seata-user子项目

交易进行时,扣除指定用户账户里面的钱

Spring Cloud Alibaba之seata 分布式事务

pom.xml文件和上一子项目依赖都一样

application.yml 和上一子项目也差不多,就改一下服务端口号和应用名

server:
  port: 9003

spring:
  application:
    name: seata-user      

UserController

package com.xt.springcloud.controller;

import com.xt.springcloud.service.IUserService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("/user")
public class UserController {

    @Resource
    private IUserService userService;

    @PostMapping("/debit")
    public void debit(@RequestParam("userId") Integer userId,
                      @RequestParam("money") Integer money) {
        userService.debit(userId, money);
    }
}      

User

package com.xt.springcloud.entity;

import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import javax.persistence.*;

@TableName("user")
@Data
public class User {

    // 用户id
    public Integer id;
    // 用户余额
    public Integer money;
}      

UserMapper

package com.xt.springcloud.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.xt.springcloud.entity.User;
import org.springframework.stereotype.Repository;


@Repository
public interface UserMapper extends BaseMapper<User> {
}      

IUserService

package com.xt.springcloud.service;


public interface IUserService {

    void debit(int userId, int money);
}      

UserServiceImpl

package com.xt.springcloud.service.impl;

import com.xt.springcloud.entity.User;
import com.xt.springcloud.mapper.UserMapper;
import com.xt.springcloud.service.IUserService;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Optional;

@Service
public class UserServiceImpl implements IUserService {

    @Resource
    private UserMapper userMapper;

    @Override
    public void debit(int userId, int money) {
        Optional<User> byId = Optional.ofNullable(userMapper.selectById(userId));
        if(byId.isPresent()) {
            User user = byId.get();
            if(user.getMoney() >= money) {
                // 减余额
                user.setMoney(user.getMoney() - money);
                userMapper.insert(user);
            }
            else {
                throw new RuntimeException("该用户余额不足!");
            }
        }
        else {
            throw new RuntimeException("没有该用户!");
        }
    }
}      

SeataUserApplication

package com.xt.springcloud;


import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.xt.springcloud.mapper")
public class SeataUserApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeataUserApplication.class);
    }

}      

测试(没引入seata之前)

没使用seata分布式事务之前,先手动插入两条数据

# 手动插入数据
INSERT INTO user (id, money) VALUES (1,3);

# 手动插入数据
insert into stock (id,number) values (1,3);      

然后使用postman发起请求测试/buy,让他请求其余三个微服务

http://127.0.0.1:9000/buy?userId=1&productId=1&number=4      

seata-stock微服务抛出异常

Spring Cloud Alibaba之seata 分布式事务

但是seata-order微服务却正常运行

Spring Cloud Alibaba之seata 分布式事务

这样就导致了全局数据不一致的问题。明明这个交易没有成功,但订单却已经入库。而货物的数量和用户的钱都没有发生变化用户1的钱没有发生变化,还是3

Spring Cloud Alibaba之seata 分布式事务

商品1的库存没有发生变化,还是3

Spring Cloud Alibaba之seata 分布式事务

但是订单数据库却多了一笔交易记录

Spring Cloud Alibaba之seata 分布式事务

这种情况开启本地事务也是没用的,他不能协调多个进程之间的事务关系,只能保证自己的操作是事务的。

Seata 引入(AT模式)

集成seata

每个微服务下面都添加这条依赖

<dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
  </dependency>      

我们这里使用seata 的AT模式

微服务对应的数据库中要新建一个undo_log,是当事务没有运行成功的时候做回滚用的

通过已经运行的sql进行记录日志,成功了删掉日志,失败了通过undo_log日志来回滚数据,再删掉日志

(如果是多个数据库,记得每个微服务对应的数据库都要建立一个undo_log表)

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';      

增加配置信息

seata:
  registry:
    #配置seata 的注册中心,告诉seata client 怎么去访问seata server(TC,里面运行着这个事务协调器)
    type: nacos
    nacos:
      server-addr: IP:8848  # seata-server 所在的nacos服务地址
      application: seata-server     # 服务名
      username: nacos
      password: nacos
      group: SEATA_GROUP   # seata-server 所在的分组
  config:
    type: nacos
    nacos:
      server-addr: IP:8848
      group: SEATA_GROUP

spring:
  cloud:
    alibaba:
      seata:
        tx-service-group: changsha_tx_group  # 配置事务分组,异地容灾使用
        service:
          vgroup-mapping:
            changsha_tx_group: default      

在buy的这个方法上加上@GlobalTransactional注解,因为他与其他微服务的方法有交互依赖

以@GlobalTransactional为入口,GlobalTransactionalInterceptor为切入点,

TM会向TC发起一个请求(服务端使用的netty)开启一个全局事务,生成全局事务的XID,通过服务调用链路传播

@PostMapping("/buy")
 @GlobalTransactional
 public String buy(@RequestParam("userId") Integer userId,
                   @RequestParam("productId") Integer productId,
                   @RequestParam("number") Integer count) {

     // 请求参数
     MultiValueMap<String, String> queryParams = new LinkedMultiValueMap<>();
     queryParams.add("userId", userId.toString());
     queryParams.add("productId", productId.toString());
     queryParams.add("number", count.toString());
     queryParams.add("money", count.toString());

     // 构造请求
     UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://seata-order/orderform/create").queryParams(queryParams);
     restTemplate.postForObject(builder.toUriString(), null, Void.class);

     // 构造请求
     builder = UriComponentsBuilder.fromHttpUrl("http://seata-stock/stock/deduct").queryParams(queryParams);
     restTemplate.postForObject(builder.toUriString(), null, Void.class);

     // 构造请求 restTemplate 加上了@LoadBalanced注解以后,必须使用应用名来访问指定服务,而不是IP地址
     builder = UriComponentsBuilder.fromHttpUrl("http://seata-user/user/debit").queryParams(queryParams);
     restTemplate.postForObject(builder.toUriString(), null, Void.class);


     return "success";
 }      

再次测试

http://127.0.0.1:9000/buy?userId=1&productId=1&number=4      

数据库情况正常,没有发生变化

spring cloud 项目客户端的日志

seata-buy微服务

Spring Cloud Alibaba之seata 分布式事务

seata-stock微服务因为商品库存不足抛出异常

Spring Cloud Alibaba之seata 分布式事务

seata-order微服务本来执行了sql 语句插入订单数据,后来还是回滚了,因为seata-stock微服务未执行成功,抛出了异常。

Spring Cloud Alibaba之seata 分布式事务

seata-server 日志

每个微服务都会向seata-server的TC 注册一个RM和TM

遇到异常,先回滚分支事务成功,然后log回滚全局事务成功

References:

  • ​​http://seata.io/zh-cn/​​
  • ​​https://github.com/alibaba/spring-cloud-alibaba/wiki/%E7%89%88%E6%9C%AC%E8%AF%B4%E6%98%8E​​
  • ​​https://www.jianshu.com/p/fe8c48f38382​​
  • ​​https://github.com/seata/seata/wiki/%E6%A6%82%E8%A7%88​​

继续阅读