天天看点

Seata处理分布式事务(聚合 AT、TCC、SAGA 、 XA事务模式)实战

Seata

紧接上文,​​六种常用事务解决方案,你方唱罢,我登场(没有最好只有更好) ​​

咱么介绍了,6中常见的分布式解决方案,不管他们怎么你争我抢,最终都被Seata降维打击。

接下来我们就来说说Seata是怎么做到这么强大的。

Seata处理分布式事务(聚合 AT、TCC、SAGA 、 XA事务模式)实战

Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。在 Seata 开源之前,Seata 对应的内部版本在阿里经济体内部一直扮演着分布式一致性中间件的角色,帮助经济体平稳的度过历年的双11,对各BU业务进行了有力的支撑。经过多年沉淀与积累,商业化产品先后在阿里云、金融云进行售卖。2019.1 为了打造更加完善的技术生态和普惠技术成果,Seata 正式宣布对外开源,开放以来,广受欢迎,不到一年已经成为最受欢迎的分布式事务解决方案。

官方中文网:​​seata.io/zh-cn​​

github项目地址:​​github.com/seata/seata​​

4.1 Seata术语

TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。

TM (Transaction Manager) - 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。

RM (Resource Manager) - 资源管理器

管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

Seata处理分布式事务(聚合 AT、TCC、SAGA 、 XA事务模式)实战

Seata 致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

Seata处理分布式事务(聚合 AT、TCC、SAGA 、 XA事务模式)实战

4.1 Seata AT模式

Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。其中AT模式最受欢迎,使用也非常简单,但它内在的原理不简单。

AT模式的相关资料请参考官方文档说明:​​seata.io/zh-cn/docs/…​​

下图是AT模式的执行流程:

Seata处理分布式事务(聚合 AT、TCC、SAGA 、 XA事务模式)实战

4.1.1 AT模式及工作流程

见官方文档:​​seata.io/zh-cn/docs/…​​

4.1.2 Seata-Server安装

我们在选择用Seata版本的时候,可以先参考下官方给出的版本匹配(Seata版本也可以按自己的要求选择):

​​github.com/alibaba/spr…​​

Spring Cloud Alibaba Version Sentinel Version Nacos Version RocketMQ Version Dubbo Version Seata Version
2.2.5.RELEASE 1.8.0 1.4.1 4.4.0 2.7.8 1.3.0
2.2.3.RELEASE or 2.1.3.RELEASE or 2.0.3.RELEASE 1.8.0 1.3.3 4.4.0 2.7.8 1.3.0
2.2.1.RELEASE or 2.1.2.RELEASE or 2.0.2.RELEASE 1.7.1 1.2.1 4.4.0 2.7.6 1.2.0
2.2.0.RELEASE 1.7.1 1.1.4 4.4.0 2.7.4.1 1.0.0
2.1.1.RELEASE or 2.0.1.RELEASE or 1.5.1.RELEASE 1.7.0 1.1.4 4.4.0 2.7.3 0.9.0
2.1.0.RELEASE or 2.0.0.RELEASE or 1.5.0.RELEASE 1.6.3 1.1.1 4.4.0 2.7.3 0.7.1

我们当前​

​SpringCloud Alibaba​

​​的版本是​

​2.2.5.RELEASE​

​,对应Seata版本是1.3.0,所以我们首先安装Seata-Server1.3.0

我们直接基于docker启动得到:

docker run --name seata-server -p 8091:8091 -d -e SEATA_IP=192.168.200.129 -e SEATA_PORT=8091  --restart=on-failure seataio/seata-server:1.3.0      

4.1.3 集成springcloud-alibaba

我们接下来开始在项目中集成使用Seata的AT模式实现分布式事务控制,关于如何集成,官方也给出了很多例子,可以通过

​​github.com/seata/seata…​​

Seata处理分布式事务(聚合 AT、TCC、SAGA 、 XA事务模式)实战

所以各种集成模式需要大家都自行的去翻看对应的​

​samples​

​。

集成可以按照如下步骤实现:

1:引入依赖包spring-cloud-starter-alibaba-seata
2:配置Seata
3:创建代理数据源
4:@GlobalTransactional全局事务控制      

案例需求:

Seata处理分布式事务(聚合 AT、TCC、SAGA 、 XA事务模式)实战

如上图,如果用户打车成功,需要修改司机状态、下单、记录支付日志,而每个操作都是调用了不同的服务,比如此时​

​hailtaxi-driver​

​​服务执行成功了,但是​

​hailtaxi-order​

​有可能执行失败了,这时候如何实现跨服务事务回滚呢?这就要用到分布式事务。

鉴于我们一般事务都是在​

​service​

​​层进行的管理,所以,改造一下​

​hailtaxi-order​

​​中的​

​OrderInfoController#add​

方法,将业务实现放到对应的​

​Service​

​中

/***
     * 下单
     */
/*@PostMapping
    public OrderInfo add(){
        //修改司机信息  司机ID=1
        Driver driver = driverFeign.status("3",2);
        //创建订单
        OrderInfo orderInfo = new OrderInfo("No"+((int)(Math.random()*10000)), (int)(Math.random()*100), new Date(), "深圳北站", "罗湖港", driver);
        orderInfoService.add(orderInfo);
        return orderInfo;
    }*/

@PostMapping
public OrderInfo add() {
    return orderInfoService.addOrder();
}      

在​

​Service​

​实现中:

@Service
public class OrderInfoServiceImpl  implements OrderInfoService {
     @Autowired
    private DriverFeign driverFeign;

    /**
     * 1、修改司机信息  司机ID=1
     * 2、创建订单
     * @return
     */
    @Override
    public OrderInfo addOrder() {
        //修改司机信息  司机ID=1
        Driver driver = driverFeign.status("1",2);
        //创建订单
        OrderInfo orderInfo = new OrderInfo("No"+((int)(Math.random()*10000)), (int)(Math.random()*100), new Date(), "深圳北站", "罗湖港", driver);
        int count = orderInfoMapper.add(orderInfo);
        System.out.println("====count="+count);
        return orderInfo;
    }
}      

案例实现:

0) 创建​

​undo_log​

​表

在每个数据库中都需要创建该表:

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;      

1)依赖引入

我们首先在​

​hailtaxi-driver​

​​和​

​hailtaxi-order​

​中引入依赖:

<!--seata-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>      

2)配置Seata

依赖引入后,我们需要在项目中配置​

​SeataClient​

​ 端信息,关于SeataClient端配置信息,官方也给出了很多版本的模板,可以参考官方项目:

​​github.com/seata/seata…​​

Seata处理分布式事务(聚合 AT、TCC、SAGA 、 XA事务模式)实战
Seata处理分布式事务(聚合 AT、TCC、SAGA 、 XA事务模式)实战

我们可以选择spring,把​

​application.yml​

​文件直接拷贝到工程中,文件如下:

Seata处理分布式事务(聚合 AT、TCC、SAGA 、 XA事务模式)实战

完整文件内容见:​​github.com/seata/seata…​​

修改后我们在​

​hailtaxi-driver​

​​和​

​hailtaxi-order​

​项目中配置如下:

seata:
  enabled: true
  application-id: ${spring.application.name}
  tx-service-group: my_seata_group
  enable-auto-data-source-proxy: true
  use-jdk-proxy: false
  excludes-for-auto-proxying: firstClassNameForExclude,secondClassNameForExclude
  client:
    rm:
      async-commit-buffer-limit: 1000
      report-retry-count: 5
      table-meta-check-enable: false
      report-success-enable: false
      saga-branch-register-enable: false
      lock:
        retry-interval: 10
        retry-times: 30
        retry-policy-branch-rollback-on-conflict: true
    tm:
      degrade-check: false
      degrade-check-period: 2000
      degrade-check-allow-times: 10
      commit-retry-count: 5
      rollback-retry-count: 5
    undo:
      data-validation: true
      log-serialization: jackson
      log-table: undo_log
      only-care-update-columns: true
    log:
      exceptionRate: 100
  service:
    vgroup-mapping:
      my_seata_group: default
    grouplist:
      default: 192.168.200.129:8091
    enable-degrade: false
    disable-global-transaction: false
  transport:
    shutdown:
      wait: 3
    thread-factory:
      boss-thread-prefix: NettyBoss
      worker-thread-prefix: NettyServerNIOWorker
      server-executor-thread-prefix: NettyServerBizHandler
      share-boss-worker: false
      client-selector-thread-prefix: NettyClientSelector
      client-selector-thread-size: 1
      client-worker-thread-prefix: NettyClientWorkerThread
      worker-thread-size: default
      boss-thread-size: 1
    type: TCP
    server: NIO
    heartbeat: true
    serialization: seata
    compressor: none
    enable-client-batch-send-request: true      

关于配置文件内容参数比较多,我们需要掌握核心部分:

seata_transaction: default:事务分组,前面的seata_transaction可以自定义,通过事务分组很方便找到集群节点信息。
tx-service-group: seata_transaction:指定应用的事务分组,和上面定义的分组前部分保持一致。
default: 192.168.200.129:8091:服务地址,seata-server服务地址。      

注意:

现在配置信息都是托管到nacos中的,所以可以直接将配置存储到nacos中

​hailtaxi-order​

Seata处理分布式事务(聚合 AT、TCC、SAGA 、 XA事务模式)实战

​hailtaxi-driver​

Seata处理分布式事务(聚合 AT、TCC、SAGA 、 XA事务模式)实战

3)代理数据源

通过代理数据源可以保障事务日志数据和业务数据能同步,关于代理数据源早期需要手动创建,但是随着Seata版本升级,不同版本实现方案不一样了,下面是官方的介绍:

1.1.0: seata-all取消属性配置,改由注解@EnableAutoDataSourceProxy开启,并可选择jdk proxy或者cglib proxy
1.0.0: client.support.spring.datasource.autoproxy=true
0.9.0: support.spring.datasource.autoproxy=true      

我们当前的版本是1.3.0,所以我们创建代理数据源只需要在启动类上添加​

​@EnableAutoDataSourceProxy​

​注解即可,

在​

​hailtaxi-order​

​​及​

​hailtaxi-driver​

​的启动类上分别添加该注解:

@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients(basePackages = {"com.itheima.driver.feign"})
@EnableAutoDataSourceProxy
public class OrderApplication {
}      

4)全局事务控制

打车成功创建订单是由客户发起,在​

​hailtaxi-order​

​​中执行,并且feign调用​

​hailtaxi-driver​

​​,所以​

​hailtaxi-order​

​​是全局事务入口,我们在​

​OrderInfoServiceImpl.addOrder()​

​​方法上添加​

​@GlobalTransactional​

​,那么此时该方法就是全局事务的入口,

@Override
@GlobalTransactional
public OrderInfo addOrder() {
    //修改司机信息  司机ID=1
    Driver driver = driverFeign.status("1",2);
    //创建订单
    OrderInfo orderInfo = new OrderInfo("No"+((int)(Math.random()*10000)), (int)(Math.random()*100), new Date(), "深圳北站", "罗湖港", driver);
    int count = orderInfoMapper.add(orderInfo);
    System.out.println("====count="+count);
    return orderInfo;
}      

5)分布式事务测试

1、测试正常情况,启动测试

将​

​id=1​

​的司机状态手动改为1,然后进行测试

2、异常测试,在​

​hailtaxi-order​

​的service方法中添加一个异常,

@Override
@GlobalTransactional
public OrderInfo addOrder() {
    //修改司机信息  司机ID=1
    Driver driver = driverFeign.status("1",2);
    //创建订单
    OrderInfo orderInfo = new OrderInfo("No"+((int)(Math.random()*10000)), (int)(Math.random()*100), new Date(), "深圳北站", "罗湖港", driver);
    int count = orderInfoMapper.add(orderInfo);
    System.out.println("====count="+count);
    //模拟异常
    int i = 1 / 0;
    return orderInfo;
}      

测试前,将​

​id=1​

​的司机状态手动改为1,将订单表清空,再次测试,看状态是否被更新,订单有没有添加,以此验证分布式事务是否控制成功!

4.2 Seata TCC模式

一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:

  • 一阶段 prepare 行为
  • 二阶段 commit 或 rollback 行为
Seata处理分布式事务(聚合 AT、TCC、SAGA 、 XA事务模式)实战

根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction Mode 和 Manual (Branch) Transaction Mode.

AT 模式(​​参考链接 TBD​​)基于 支持本地 ACID 事务 的 关系型数据库:

  • 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
  • 二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
  • 二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。

相应的,TCC 模式,不依赖于底层数据资源的事务支持:

  • 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
  • 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
  • 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。

所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。

TCC实现原理:

有一个 TCC 拦截器,它会封装 Confirm 和 Cancel 方法作为资源(用于后面 TC 来 commit 或 rollback 操作)

封装完,它会本地缓存到 RM (缓存的是方法的描述信息),可以简单认为是放到一个 Map 里面

当 TC 想调用的时候,就可以从 Map 里找到这个方法,用反射调用就可以了

另外,RM 不光是注册分支事务(分支事务是注册到 TC 里的 GlobalSession 中的)

它还会把刚才封装的资源里的重要属性(事务ID、归属的事务组等)以资源的形式注册到 TC 中的 RpcContext

这样,TC 就知道当前全局事务都有哪些分支事务了(这都是分支事务初始化阶段做的事情)

举个例子:RpcContext里面有资源 123,但是 GlobalSession 里只有分支事务 12

于是 TC 就知道分支事务 3 的资源已经注册进来了,但是分支事务 3 还没注册进来

这时若 TM 告诉 TC 提交或回滚,那 GlobalSession 就会通过 RpcContext 找到 1 和 2 的分支事务的位置(比如该调用哪个方法)

当 RM 收到提交或回滚后,就会通过自己的本地缓存找到对应方法,最后通过反射或其他机制去调用真正的 Confirm 或 Cancel

5 Seata注册中心

参看:​​github.com/seata/seata…​​ 可以看到​

​seata​

​支持多种注册中心!

5.1 服务端注册中心配置

服务端注册中心(位于seata-server的registry.conf配置文件中的registry.type参数),为了实现seata-server集群高可用不会使用file类型,一般会采用第三方注册中心,例如zookeeper、redis、eureka、nacos等。

我们这里使用​

​nacos​

​,seata-server的registry.conf配置如下:

由于我们是基于​

​docker​

​​启动的​

​seata​

​​,故可以直接进入到容器内部修改配置文件​

​/resources/registry.conf​

registry {
  # file ...nacos ...eureka...redis...zk...consul...etcd3...sofa
  type = "nacos"

  nacos {
    application = "seata-server"
    serverAddr = "192.168.200.129:8848"
    group = "SEATA_GROUP"
    namespace = "1ebba5f6-49da-40cc-950b-f75c8f7d07b3"
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}      

此时我们再重新启动容器,访问:​​http://192.168.200.129:8848/nacos​​ 看​

​seata​

​是否已注册到nacos中

Seata处理分布式事务(聚合 AT、TCC、SAGA 、 XA事务模式)实战

5.2 客户端注册中心配置

项目中,我们需要使用注册中心,添加如下配置即可(在nacos配置中心的​

​hailtaxi-order.yaml​

​​和​

​hailtaxi-driver-dev.yaml​

​都修改)

参看:​​github.com/seata/seata…​​

registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: 192.168.200.129:8848
      group : "SEATA_GROUP"
      namespace: 1ebba5f6-49da-40cc-950b-f75c8f7d07b3
      username: "nacos"
      password: "nacos"      

此时就可以注释掉配置中的​

​default.grouplist="192.168.200.129:8091"​

Seata处理分布式事务(聚合 AT、TCC、SAGA 、 XA事务模式)实战

完整配置如下:

seata:

enabled: true

application-id: ${spring.application.name}

tx-service-group: my_seata_group

enable-auto-data-source-proxy: true

use-jdk-proxy: false

excludes-for-auto-proxying: firstClassNameForExclude,secondClassNameForExclude

client:

rm:

async-commit-buffer-limit: 1000

report-retry-count: 5

table-meta-check-enable: false

report-success-enable: false

saga-branch-register-enable: false

lock:

retry-interval: 10

retry-times: 30

retry-policy-branch-rollback-on-conflict: true

tm:

degrade-check: false

degrade-check-period: 2000

degrade-check-allow-times: 10

commit-retry-count: 5

rollback-retry-count: 5

undo:

data-validation: true

log-serialization: jackson

log-table: undo_log

only-care-update-columns: true

log:

exceptionRate: 100

service:

vgroup-mapping:

my_seata_group: default

#grouplist:

#default: 192.168.200.129:8091

enable-degrade: false

disable-global-transaction: false

transport:

shutdown:

wait: 3

thread-factory:

boss-thread-prefix: NettyBoss

worker-thread-prefix: NettyServerNIOWorker

server-executor-thread-prefix: NettyServerBizHandler

share-boss-worker: false

client-selector-thread-prefix: NettyClientSelector

client-selector-thread-size: 1

client-worker-thread-prefix: NettyClientWorkerThread

worker-thread-size: default

boss-thread-size: 1

type: TCP

server: NIO

heartbeat: true

serialization: seata

compressor: none

enable-client-batch-send-request: true

registry:

type: nacos

nacos:

application: seata-server

server-addr: 192.168.200.129:8848

group : "SEATA_GROUP"

namespace: 1ebba5f6-49da-40cc-950b-f75c8f7d07b3

username: "nacos"

password: "nacos"

测试:

启动服务再次测试,查看分布式事务是否仍然能控制住!!!

6 Seata高可用

​seata-server​

​ 目前使用的是一个单节点,能否抗住高并发是一个值得思考的问题。生产环境项目几乎都需要确保能扛高并发、具备高可用的能力,因此生产环境项目一般都会做集群。

上面配置也只是将注册中心换成了​

​nacos​

​,而且是单机版的,如果要想实现高可用,就得实现集群,集群就需要做一些动作来保证集群节点间的数据同步(会话共享)等操作

我们需要准备2个​

​seata-server​

​​节点,并且​

​seata-server​

​的事务日志存储模式,共支持3种方式,

1):file【集群不可用】

2):redis

3):db

我们这里选择redis存储会话信息实现共享。

1、启动第二个​

​seata-server​

​节点

docker run --name seata-server-n2 -p 8092:8092 -d -e SEATA_IP=192.168.200.129 -e SEATA_PORT=8092  --restart=on-failure seataio/seata-server:1.3.0      

2、进入容器修改配置文件 ​

​registry.conf​

​,添加注册中心的配置

registry {
  # file ...nacos ...eureka...redis...zk...consul...etcd3...sofa
  type = "nacos"

  nacos {
    application = "seata-server"
    serverAddr = "192.168.200.129:8848"
    group = "SEATA_GROUP"
    namespace = "1ebba5f6-49da-40cc-950b-f75c8f7d07b3"
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}      

3、修改​

​seata-server​

​​ 事务日志的存储模式,​

​resources/file.conf​

​ 改动如下:

我们采用基于redis来存储集群每个节点的事务日志,通过docker允许一个redis

docker run --name redis6.2 --restart=on-failure -p 6379:6379 -d redis:6.2      

然后修改seata-server的file.conf,修改如下:

## transaction log store, only used in seata-server
store {
  ## store mode: file...db...redis
  mode = "redis"

  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://127.0.0.1:3306/seata"
    user = "mysql"
    password = "mysql"
    minConn = 5
    maxConn = 30
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }

  ## redis store property
  redis {
    host = "192.128.200.129"
    port = "6379"
    password = ""
    database = "0"
    minConn = 1
    maxConn = 10
    queryLimit = 100
  }

}      
如果基于DB来存储​

​seata-server​

​​的事务日志数据,则需要创建数据库​

​seata​

​,表信息如下:

​​github.com/seata/seata…​​