天天看點

seata使用

分布式事務産生的背景

分布式架構演進之 - 資料庫的水準拆分

業務資料庫起初是單庫單表,但随着業務資料規模的快速發展,資料量越來越大,單庫單表逐漸成為瓶頸。是以我們對資料庫進行了水準拆分,将原單庫單表拆分成資料庫分片。

如下圖所示,分庫分表之後,原來在一個資料庫上就能完成的寫操作,可能就會跨多個資料庫,這就産生了跨資料庫事務問題。

seata使用

分布式架構演進之 - 業務服務化拆分

在業務發展初期,“一塊大餅”的單業務系統架構,能滿足基本的業務需求。但是随着業務的快速發展,系統的通路量和業務複雜程度都在快速增長,單系統架構逐漸成為業務發展瓶頸,解決業務系統的高耦合、可伸縮問題的需求越來越強烈。

按照面向服務架構(SOA)的設計原則,将單業務系統拆分成多個業務系統,降低了各系統之間的耦合度,使不同的業務系統專注于自身業務,更有利于業務的發展和系統容量的伸縮。

seata使用

業務系統按照服務拆分之後,一個完整的業務往往需要調用多個服務,如何保證多個服務間的資料一緻性成為一個難題。

分布式事務 Seata 介紹

Seata(Simple Extensible Autonomous Transaction Architecture,簡單可擴充自治事務架構)是 2019 年 1 月份阿裡巴巴共同開源的分布式事務解決方案。Seata 開源半年左右,目前已經有超過 1.1 萬 star,社群非常活躍。我們熱忱歡迎大家參與到 Seata 社群建設中,一同将 Seata 打造成開源分布式事務标杆産品。

Seata:https://github.com/seata/seata

seata使用

分布式事務 Seata 産品子產品

如下圖所示,Seata 中有三大子產品,分别是 TM、RM 和 TC。其中 TM 和 RM 是作為 Seata 的用戶端與業務系統內建在一起,TC 作為 Seata 的服務端獨立部署。

seata使用

在 Seata 中,分布式事務的執行流程:

  • TM 開啟分布式事務(TM 向 TC 注冊全局事務記錄);
  • 按業務場景,編排資料庫、服務等事務内資源(RM 向 TC 彙報資源準備狀态 );
  • TM 結束分布式事務,事務一階段結束(TM 通知 TC 送出/復原分布式事務);
  • TC 彙總事務資訊,決定分布式事務是送出還是復原;
  • TC 通知所有 RM 送出/復原 資源,事務二階段結束;

分布式事務 Seata 解決方案

Seata 會有 4 種分布式事務解決方案,分别是 AT 模式、TCC 模式、Saga 模式和 XA 模式。

在本地搭建一個TC服務(事務協調者).

下載下傳seata的安裝包

官網(http://seata.io/zh-cn/)

​seata-server-1.1.0.zip​

​,解壓即可使用.

配置

打開解壓目錄下的conf/registry.conf檔案如下

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  # 可以把seata-server了解為一個服務,它需要把自己注冊到某個注冊中心上去,友善使用seata的服務來找到自己
    #在這裡就是指定注冊中心的類型,由于我們項目用的是eureka,是以這裡我選擇eureka,即這一堆配置就下面一個eureka生效了
    #這裡預設的是file,即檔案,選了檔案就可以不用搭注冊中心,直接從檔案裡讀取服務清單
    #複制之後一定要改一改
  type = "eureka"  


  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
  eureka { #"隻有我生效啦"
    serviceUrl = "http://localhost:10086/eureka"  #eureka位址
    application = "seata_tc_server"    #在eureka裡顯示的名字
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}


config {
    #在這裡選擇配置中心,這裡我們選擇file
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"


  nacos {
    serverAddr = "localhost"
    namespace = ""
    group = "SEATA_GROUP"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    app.id = "seata-server"
    apollo.meta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
      #由于選擇了file,是以這裡生效了
    name = "file.conf"
  }
}      

是以接下來看一下​

​file.conf​

​檔案

## transaction log store, only used in seata-server
store {
  ## store mode: file、db
    #選擇配置中心的存儲模式,由于選擇file存到檔案裡(性能高)會變為二進制流不好觀察,是以選擇資料庫
     #複制之後一定要改一改
  mode = "db"


  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }


  ## database store property
  db {
      #選擇了資料庫必定要做出一些配置,資料庫裡一定要有這3張表
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "dbcp"
    ## mysql/oracle/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://192.168.206.99:3306/seata"
    user = "root"
    password = "root"
    minConn = 1
    maxConn = 10
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
  }
}      

建表SQL如下:

CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;


-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME,
    `gmt_modified`      DATETIME,
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;


-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(96),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;      

啟動

如果是linux環境(要有JRE),執行​

​seata-server.sh​

如果是windows環境,執行​

​seata-server.bat​

改造微服務

隻要是需要用到seata(分布式事務)的服務,都要做類似的配置.

引入依賴

我這裡是springboot項目,是以我先在父pom中聲明了.如下

<properties>    
        <alibaba.seata.version>2.1.0.RELEASE</alibaba.seata.version>
        <seata.version>1.1.0</seata.version>
    </properties>


    <dependencyManagement>
        <dependencies>
            <!--seata-->
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-seata</artifactId>
                <version>${alibaba.seata.version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>seata-all</artifactId>
                        <groupId>io.seata</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <artifactId>seata-all</artifactId>
                <groupId>io.seata</groupId>
                <version>${seata.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>      

接下來隻要在需要seata的微服務裡添加依賴就好了.

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-seata</artifactId>
</dependency>
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-all</artifactId>
</dependency>      

添加配置

spring:
  cloud:
    alibaba:
      seata:
        tx-service-group: test_tx_group # 定義事務組的名稱      

在resources目錄下添加2個檔案​

​file.conf​

​​和​

​registry.conf​

​registry.conf​

​和前面的一樣,直接複制過來就好.

file.conf裡的内容不同了,新的内容如下:

transport {  # tcp udt unix-domain-socket  type = "TCP"  #NIO NATIVE  server = "NIO"  #enable heartbeat  heartbeat = true  # the client batch send request enable  enableClientBatchSendRequest = true  #thread factory for netty  threadFactory {    bossThreadPrefix = "NettyBoss"    workerThreadPrefix = "NettyServerNIOWorker"    serverExecutorThread-prefix = "NettyServerBizHandler"    shareBossWorker = false    clientSelectorThreadPrefix = "NettyClientSelector"    clientSelectorThreadSize = 1    clientWorkerThreadPrefix = "NettyClientWorkerThread"    # netty boss thread size,will not be used for UDT    bossThreadSize = 1    #auto default pin or 8    workerThreadSize = "default"  }  shutdown {    # when destroy server, wait seconds    wait = 3  }  serialization = "seata"  compressor = "none"}service {#這裡注意,等号前後都是配置,前面是yml裡配置的事務組,後面是register.conf裡定義的seata-server  vgroupMapping.test_tx_group = "seata_tc_server"  #only support when registry.type=file, please don't set multiple addresses  seata_tc_server.grouplist = "127.0.0.1:8091"  #degrade, current not support  enableDegrade = false  #disable seata  disableGlobalTransaction = false}client {  rm {    asyncCommitBufferLimit = 10000    lock {      retryInterval = 10      retryTimes = 30      retryPolicyBranchRollbackOnConflict = true    }    reportRetryCount = 5    tableMetaCheckEnable = false    reportSuccessEnable = false  }  tm {    commitRetryCount = 5    rollbackRetryCount = 5  }  undo {    dataValidation = true    logSerialization = "jackson"    logTable = "undo_log"  }  log {    exceptionRate = 100  }}      

配置解讀:

  • ​transport​

    ​:與TC互動的一些配置
  • ​heartbeat​

    ​:client和server通信心跳檢測開關
  • ​enableClientBatchSendRequest​

    ​:用戶端事務消息請求是否批量合并發送
  • ​service​

    ​:TC的位址配置,用于擷取TC的位址
  • ​test_tx_group​

    ​:是事務組名稱,要與application.yml中配置一緻,
  • ​seata_tc_server​

    ​:是TC服務端叢集的名稱,将來通過注冊中心擷取TC位址
  • ​enableDegrade​

    ​:服務降級開關,預設關閉。如果開啟,當業務重試多次失敗後會放棄全局事務
  • ​disableGlobalTransaction​

    ​:全局事務開關,預設false。false為開啟,true為關閉
  • ​vgroupMapping.test_tx_group = "seata_tc_server"​

    ​:
  • ​default.grouplist​

    ​:這個當注冊中心為file的時候,才用到
  • ​client​

    ​:用戶端配置
  • ​exceptionRate​

    ​:出現復原異常時的日志記錄頻率,預設100,百分之一機率。復原失敗基本是髒資料,無需輸出堆棧占用硬碟空間
  • ​dataValidation​

    ​:是否開啟二階段復原鏡像校驗,預設true
  • ​logSerialization​

    ​:undo序列化方式,預設Jackson
  • ​logTable​

    ​:自定義undo表名,預設是​

    ​undo_log​

  • ​commitRetryCount​

    ​:一階段全局送出結果上報TC重試次數,預設1
  • ​rollbackRetryCount​

    ​:一階段全局復原結果上報TC重試次數,預設1
  • ​asynCommitBufferLimit​

    ​:二階段送出預設是異步執行,這裡指定異步隊列的大小
  • ​lock​

    ​:全局鎖配置
  • ​reportRetryCount​

    ​:一階段結果上報TC失敗後重試次數,預設5次
  • ​retryInterval​

    ​:校驗或占用全局鎖重試間隔,預設10,機關毫秒
  • ​retryTimes​

    ​:校驗或占用全局鎖重試次數,預設30次
  • ​retryPolicyBranchRollbackOnConflict​

    ​:分支事務與其它全局復原事務沖突時鎖政策,預設true,優先釋放本地鎖讓復原成功
  • ​rm​

    ​:資料總管配
  • ​tm​

    ​:事務管理器配置
  • ​undo​

    ​:undo_log的配置
  • ​log​

    ​:日志配置

代理DataSource

由于在一階段是通過攔截sql分析語義來生成復原政策,原來的資料源已經不夠用了,得換個牛逼的.在服務裡建立一個配置類.

  • 如果是使用的是mybatis
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


import javax.sql.DataSource;


@Configuration
public class DataSourceProxyConfig {


    @Bean
    public SqlSessionFactory sqlSessionFactoryBean(DataSource dataSource) throws Exception {
        // 因為使用的是mybatis,這裡定義SqlSessionFactoryBean
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        // 配置資料源代理
        sqlSessionFactoryBean.setDataSource(new DataSourceProxy(dataSource));
        return sqlSessionFactoryBean.getObject();
    }
}      

如果使用的是mybatis-plus

import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


import javax.sql.DataSource;


@Configuration
public class DataSourceProxyConfig {


    @Bean
    public SqlSessionFactory sqlSessionFactoryBean(DataSource dataSource) throws Exception {
        // 訂單服務中引入了mybatis-plus,是以要使用特殊的SqlSessionFactoryBean
        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        // 代理資料源
        sqlSessionFactoryBean.setDataSource(new DataSourceProxy(dataSource));
        // 生成SqlSessionFactory
        return sqlSessionFactoryBean.getObject();
    }
}      

加上注解

給事務發起者的方法上加上​

​@GlobalTransactional​

​​即可,其它的參與者隻要加​

​@Transactional​

​就好了.