天天看点

ShardingSphere与Seata分布式事务ShardingSphere与Seata分布式事务

ShardingSphere与Seata分布式事务

本篇文章源码基于4.0.1版本

ShardingSphere除了支持本地事务,还支持XA事务和BASE 事务

Seata流程

Base事务是最终一致性事务,Seata就是基于Base理论设计的,它的角色有事务管理器TransactionManager(TM)、资源管理器ResourceManager(RM)和事务协调者TransactionCoordinator(TC)。他们各司其职,TM负责发起事务和结束事务,TC负责维护分布式事务,RM负责运行本地事务

具体步骤:

  1. TM向TC申请开启全局事务,生成全局的唯一ID
  2. RM向TC注册分支事务,执行分支事务并提交
  3. TM根据TC中所有分支事务的执行情况,发起全局提交或回滚
  4. TC对这个全局ID对应的所有事务进行提交或回滚

在使用Seata的时候,需要在resources文件夹下增加 seata.conf 配置文件

文件内容:

client {
    application.id = xpp-base
    transaction.service.group = xpp-base-group
}
           

这个配置文件中定义的是事务管理器客户端和资源管理器客户端

分片上下文运行时实例初始化的时候会调用ShardingTransactionManager接口的init()方法,实现类SeataATShardingTransactionManager实现这个方法

初始化事务管理器和资源管理器

SeataATShardingTransactionManager的init()方法:

public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) {
        initSeataRPCClient();
        for (ResourceDataSource each : resourceDataSources) {
            dataSourceMap.put(each.getOriginalName(), new DataSourceProxy(each.getDataSource()));
        }
    }
           
  1. 调用自身的initSeataRPCClient()方法,初始化Seata使用的事务管理器和资源管理器客户端
  2. 创建数据源代理对象,放入map集合中

我们具体看一下initSeataRPCClient()方法:

private final FileConfiguration configuration = new FileConfiguration("seata.conf");

    private void initSeataRPCClient() {
        String applicationId = configuration.getConfig("client.application.id");
        Preconditions.checkNotNull(applicationId, "please config application id within seata.conf file");
        String transactionServiceGroup = configuration.getConfig("client.transaction.service.group", "default");
        TMClient.init(applicationId, transactionServiceGroup);
        RMClient.init(applicationId, transactionServiceGroup);
    }
           

从这一段代码我们就可以看出来我们为什么要定义seata.conf文件了,根据配置的应用id和事务服务组来初始化事务管理器和资源管理器

事务操作

开启事务:

SeataATShardingTransactionManager的begin()方法:

public void begin() {
        SeataTransactionHolder.set(GlobalTransactionContext.getCurrentOrCreate());
        SeataTransactionHolder.get().begin();
        SeataTransactionBroadcaster.collectGlobalTxId();
    }
           
  1. 通过GlobalTransactionContext创建GlobalTransaction,并放入SeataTransactionHolder类中,使用ThreadLocal存储
  2. 获取SeataTransactionHolder中的GlobalTransaction对象,调用它的begin()方法开启事务
  3. 调用Seata事务广播器SeataTransactionBroadcaster的collectGlobalTxId()方法,判断是否在全局事务中,如果在的话把全局事务ID放入缓存中。

总结

❤️ 感谢大家

  1. 欢迎关注我❤️,点赞👍🏻,评论🤤,转发🙏
  2. 有不当之处欢迎批评指正。