天天看點

ShardingSphere與Seata分布式事務ShardingSphere與Seata分布式事務

ShardingSphere與Seata分布式事務

本篇文章源碼基于4.0.1版本

ShardingSphere除了支援本地事務,還支援XA事務和BASE 事務

Seata流程

Base事務是最終一緻性事務,Seata就是基于Base理論設計的,它的角色有事務管理器TransactionManager(TM)、資料總管ResourceManager(RM)和事務協調者TransactionCoordinator(TC)。他們各司其職,TM負責發起事務和結束事務,TC負責維護分布式事務,RM負責運作本地事務

具體步驟:

  1. TM向TC申請開啟全局事務,生成全局的唯一ID
  2. RM向TC注冊分支事務,執行分支事務并送出
  3. TM根據TC中所有分支事務的執行情況,發起全局送出或復原
  4. TC對這個全局ID對應的所有事務進行送出或復原

在使用Seata的時候,需要在resources檔案夾下增加 seata.conf 配置檔案

檔案内容:

client {
    application.id = xpp-base
    transaction.service.group = xpp-base-group
}
           

這個配置檔案中定義的是事務管理器用戶端和資料總管用戶端

分片上下文運作時執行個體初始化的時候會調用ShardingTransactionManager接口的init()方法,實作類SeataATShardingTransactionManager實作這個方法

初始化事務管理器和資料總管

SeataATShardingTransactionManager的init()方法:

public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) {
        initSeataRPCClient();
        for (ResourceDataSource each : resourceDataSources) {
            dataSourceMap.put(each.getOriginalName(), new DataSourceProxy(each.getDataSource()));
        }
    }
           
  1. 調用自身的initSeataRPCClient()方法,初始化Seata使用的事務管理器和資料總管用戶端
  2. 建立資料源代理對象,放入map集合中

我們具體看一下initSeataRPCClient()方法:

private final FileConfiguration configuration = new FileConfiguration("seata.conf");

    private void initSeataRPCClient() {
        String applicationId = configuration.getConfig("client.application.id");
        Preconditions.checkNotNull(applicationId, "please config application id within seata.conf file");
        String transactionServiceGroup = configuration.getConfig("client.transaction.service.group", "default");
        TMClient.init(applicationId, transactionServiceGroup);
        RMClient.init(applicationId, transactionServiceGroup);
    }
           

從這一段代碼我們就可以看出來我們為什麼要定義seata.conf檔案了,根據配置的應用id和事務服務組來初始化事務管理器和資料總管

事務操作

開啟事務:

SeataATShardingTransactionManager的begin()方法:

public void begin() {
        SeataTransactionHolder.set(GlobalTransactionContext.getCurrentOrCreate());
        SeataTransactionHolder.get().begin();
        SeataTransactionBroadcaster.collectGlobalTxId();
    }
           
  1. 通過GlobalTransactionContext建立GlobalTransaction,并放入SeataTransactionHolder類中,使用ThreadLocal存儲
  2. 擷取SeataTransactionHolder中的GlobalTransaction對象,調用它的begin()方法開啟事務
  3. 調用Seata事務廣播器SeataTransactionBroadcaster的collectGlobalTxId()方法,判斷是否在全局事務中,如果在的話把全局事務ID放入緩存中。

總結

❤️ 感謝大家

  1. 歡迎關注我❤️,點贊👍🏻,評論🤤,轉發🙏
  2. 有不當之處歡迎批評指正。