天天看點

阿裡開源分布式事務架構seata實踐(原fescar) springboot +durid+mybitas+自有rpc架構

本文章僅作為seata接入文檔,seata原理和源碼請自行轉至github https://github.com/seata/seata 官方文檔位址 https://github.com/seata/seata/wiki/Home_Chinese

1 由于系統演進,大佬覺得 需要做微服務,腦子一拍開始對原來的系統進行微服務改造,

在改造過程中,分布式事務不可避免,tcc mq等等概念研究一遍後,結合時間成本,發現阿裡gts 比較适合,無奈需要接入外網遂放棄,後來偶然發現seata 開源版gts 嘗試接入項目

先放一張流程圖

阿裡開源分布式事務架構seata實踐(原fescar) springboot +durid+mybitas+自有rpc架構

接入流程

1 首先去

官網git

下載下傳一份源碼,我下載下傳的是0.5.2版本。

2 在本地解壓加載到idea下載下傳jar包後直接啟動server項目中的啟動類Server.java ,在調試過程中發現netty存在有時記憶體不夠問題,遂增加啟動參數-XX:MaxDirectMemorySize=1024m

3 server 基于netty開發目前隻支援單節點啟動,記憶體大小沒有進行壓力測試,seata配置檔案為registry.conf 附上關鍵配置

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "file"
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul
  type = "file"
  file {
    name = "file.conf"
  }
}
           

type 指定配置方式 目前支援file 、nacos 、eureka、redis、zk、consul、etcd3、sofa

預設配置為file 在接入過程中也使用file 配置類型

file.conf 和registry.conf 目前都在resource目錄下

附上file.conf 關鍵配置

service {
  #vgroup->rgroup
  vgroup_mapping.my_test_tx_group = "default"
  #only support single node
  default.grouplist = "127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
}
           

接入過程 vgroup_mapping.my_test_tx_group = "default" 修改配置為自定義配置vgroup_mapping.my_group= "default"

具體原則不清楚

4 架構圖中的tc搭建完成,下一步搭建RM 也就是微服務的原子系統 引入seata 的jar包

<!--架構問題,指定durid版本-->
<properties>
        <druid.version>1.1.10</druid.version>
        <seata.version>0.5.0</seata.version>
 </properties>
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring</artifactId>
    <version>${seata.version}</version>
</dependency>
           

5 ,按照官方文檔,需要将資料源替換為seata資料源,本項目是springboot+durid+mybitas 直接上代碼 關鍵代碼 DataSourceProxy proxy = new DataSourceProxy(datasource);

@Configuration
public class DruidConfig {
    @Value("${spring.datasource.url}")
    private String dbUrl;

    @Value("${spring.datasource.username}")
    private String username;

    @Value("${spring.datasource.password}")
    private String password;

    @Value("${spring.datasource.driver-class-name}")
    private String driverClassName;

    @Value("${spring.datasource.initialSize}")
    private int initialSize;

    @Value("${spring.datasource.minIdle}")
    private int minIdle;

    @Value("${spring.datasource.maxActive}")
    private int maxActive;

    @Value("${spring.datasource.maxWait}")
    private int maxWait;

    @Value("${spring.datasource.timeBetweenEvictionRunsMillis}")
    private int timeBetweenEvictionRunsMillis;

    @Value("${spring.datasource.minEvictableIdleTimeMillis}")
    private int minEvictableIdleTimeMillis;

    @Value("${spring.datasource.validationQuery}")
    private String validationQuery;

    @Value("${spring.datasource.testWhileIdle}")
    private boolean testWhileIdle;

    @Value("${spring.datasource.testOnBorrow}")
    private boolean testOnBorrow;

    @Value("${spring.datasource.testOnReturn}")
    private boolean testOnReturn;

    @Value("${spring.datasource.poolPreparedStatements}")
    private boolean poolPreparedStatements;

    @Value("${spring.datasource.filters}")
    private String filters;

    @Value("${mybatis.mapper-locations}")
    private String mapperLocation;
    @Bean
    @Primary
    public DataSource druidDataSource() {
        DruidDataSource datasource = new DruidDataSource();
        datasource.setUrl(this.dbUrl);
        datasource.setUsername(username);
        datasource.setPassword(password);
        datasource.setDriverClassName(driverClassName);
        datasource.setInitialSize(initialSize);
        datasource.setMinIdle(minIdle);
        datasource.setMaxActive(maxActive);
        datasource.setMaxWait(maxWait);
        datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
        datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
        datasource.setValidationQuery(validationQuery);
        datasource.setTestWhileIdle(testWhileIdle);
        datasource.setTestOnBorrow(testOnBorrow);
        datasource.setTestOnReturn(testOnReturn);
        datasource.setPoolPreparedStatements(poolPreparedStatements);
        DataSourceProxy proxy = new DataSourceProxy(datasource);
        return proxy;
    }
    @Bean(name="sqlSessionFactory")
    public SqlSessionFactoryBean sqlSessionFactory(DataSource dataSource) throws Exception {

        SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        Resource[] mapperXmlResource = resolver.getResources(mapperLocation);
        sqlSessionFactory.setDataSource(dataSource);
        sqlSessionFactory.setMapperLocations(mapperXmlResource);
        return sqlSessionFactory;
    }
}           

6 新增seata 掃描器配置 直接上代碼

@Configuration
public class SeataConfiguration {
    @Value("${spring.application.name}")
    private String applicationId;

    /**
     * 注冊一個StatViewServlet
     *
     * @return global transaction scanner
     */
    @Bean
    public GlobalTransactionScanner globalTransactionScanner() {
        GlobalTransactionScanner globalTransactionScanner = new GlobalTransactionScanner(applicationId,
                "my_group");
        return globalTransactionScanner;
    }
}
           

7

新增攔截器過濾器或者切面等,在業務執行前,攔截每個請求,并擷取XID并綁定,本人是在service前增加切面,并處理資料

seata目前支援dubbo和springcloud 預設XID放在headers中,由于我們的項目使用的自有的rpc架構,是以需要自己手動擷取XID,為了友善

我将XID寫在了body中,自己接入的時候,需要按照需要自行設定

上代碼

String xid = RootContext.getXID();
        String restXid = StringUtil.getStringValue(esbInput.getParams().get("Seata-Xid"));
        boolean bind = false;
        if (StringUtils.isBlank(xid) && StringUtils.isNotBlank(restXid)) {
            RootContext.bind(restXid);
            bind = true;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("bind[" + restXid + "] to RootContext");
            }
        }

try{
                //執行方法體
                object = joinPoint.proceed(args);
            }catch (GeneralException e){//對外接口統一異常捕獲解析
               
          }finally {
                if (bind) {
                    String unbindXid = RootContext.unbind();
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
                    }
                    if (!restXid.equalsIgnoreCase(unbindXid)) {
                        LOGGER.warn("xid in change during http rest from " + restXid + " to " + unbindXid);
                        if (unbindXid != null) {
                            RootContext.bind(unbindXid);
                            LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
                        }
                    }
                }
            }           

8 增加配置檔案 file.conf 和registry.conf 并按照自己的實際情況進行配置,同時按照官方文檔,在原子服務資料庫新增日志回退表

undo_log 建表語句為

DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=94 DEFAULT CHARSET=utf8;           

9 建立TM服務

引入jar包

`

js

<seata.version>0.5.0</seata.version>           
<groupId>io.seata</groupId>
<artifactId>seata-spring</artifactId>
<version>${seata.version}</version>           

`

10

新增scanner 配置 和rm 一緻

@Configuration
public class SeataConfiguration {
    @Value("${spring.application.name}")
    private String applicationId;


    /**
     * 注冊一個StatViewServlet
     *
     * @return global transaction scanner
     */
    @Bean
    public GlobalTransactionScanner globalTransactionScanner() {
        GlobalTransactionScanner globalTransactionScanner = new GlobalTransactionScanner(applicationId,
                "nguc_tx_group");
        return globalTransactionScanner;
    }
}
           

11 開啟事務和事務的送出復原

private void transactionalRollBack(GlobalTransaction globalTransaction,String xid){
        LOGGER.error("分布式事務中斷,事務開始復原");
        try {
            globalTransaction.rollback();
        } catch (TransactionException txe) {
            LOGGER.error("分布式事務復原失敗,全局事務XID : " + xid);
        }
    }
 public XX doTransaction(){
        GlobalTransaction globalTransaction = GlobalTransactionContext.getCurrentOrCreate();

        //begin GlobalTransactional
        try {
            globalTransaction.begin(20000, "test");
        } catch (TransactionException e) {
            LOGGER.error("全局事務開啟失敗")
            return outObject;
        }
        String xid = RootContext.getXID();
        //組合服務标示
        try {
            call(input)
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);

            transactionalRollBack(globalTransaction, xid);

        }
        try {
            globalTransaction.rollback();
            LOGGER.error("全局事務送出成功,全局事務XID : " + xid);
        } catch (TransactionException txe) {
            LOGGER.error("全局事務送出失敗,全局事務XID : " + xid);
        }
        return xx;
    }
           

12 通過TM調用rm服務,并測試復原,可以在commit前添加斷點檢視undo_log中的資料