天天看點

【分布式技術專題】「架構實踐于案例分析」總結和盤點目前常用分布式事務特别及問題分析(Seata-終)

分布式事務中間件對⽐與選擇

  • tx-lcn
  • EasyTransaction
  • ByteTCC
  • Seata

Seata實作分布式事務

我們主要以Seata的分布式事務架構進行介紹分析,相關的并且針對于其三種模式進行分别說明介紹。

搭建Seata Server

前往​​https://github.com/seata/seata/releases​​​ 下載下傳Seata安裝包,本書使⽤Seata 1.0.0。将⽬錄切換⾄Seata根⽬錄,根據作業系統,執⾏對應指令,即可啟動Seata Server。
Linux/Unix/Mac
sh ./bin/seata-server.sh      
Windows
bin\seata-server.bat      
啟動時,也可指定參數
$ sh ./bin/seata-server.sh -p 8091 -h 127.0.0.1 -m file      

⽀持的參數如下表所示

【分布式技術專題】「架構實踐于案例分析」總結和盤點目前常用分布式事務特别及問題分析(Seata-終)
Seata AT模式
  • ⼀階段:業務資料和復原⽇志記錄在同⼀個本地事務中送出,釋放本地鎖和連接配接資源。
  • ⼆階段:送出異步化,⾮常快速地完成。復原通過⼀階段的復原⽇志進⾏反向補償。

官⽅⽂檔

​​http://seata.io/zh-cn/docs/dev/mode/at-mode.html​​

代碼示範

Maven依賴

<dependency>
   <groupId>com.alibaba.cloud</groupId>
   <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>      

配置

seata:
   tx-service-group: content-center-seata-service-group
   service:
    vgroup-mapping:
        content-center-seata-service-group: seata-cluster
        grouplist:
           seata-cluster: 127.0.0.1:8091
           disable-global-transaction: false      

配置說明

  • tx-service-group:事務分組,預設是 ${​​spring.application.name​​}-seata-service-group ,唯⼀即可。
  • vgroup-mapping:事務分組映射,表示 tx-service-group 對應到哪個Seata Server叢集。
  • key是tx-service-group的值,value是叢集名稱,唯⼀即可
  • grouplist:叢集中所包含的Seata Server的位址清單,key是vgroup-mapping中value的值,
  • value是Seata Server的位址清單
  • disable-global-transaction:是否開啟全局事務開關,預設false。
在Seata1.0.0中,該配置⽆法正常讀取,這是⼀個Bug,詳⻅ ​​https://github.com/seata/seata/issues/2114​​​ ,好在,該配置的預設值就是false,是以不影響使⽤。

建立Seata事務記錄表

-- auto-generated definition
create table undo_log (
id bigint auto_increment comment 'increment id' primary key,
branch_id bigint not null comment 'branch transaction id',
xid varchar(100) not null comment 'global transaction id',
context varchar(128) not null comment 'undo_log context,such as serialization',
rollback_info longblob not null comment 'rollback info',
log_status int not null comment '0:normal status,1:defense status',
log_created datetime not null comment 'create datetime',
log_modified datetime not null comment 'modify datetime',
constraint ux_undo_log
unique (xid, branch_id)
) comment 'AT transaction mode undo table' charset = utf8;      

Controller代碼:

private final ShareSeataService shareSeataService;
@PutMapping("/audit/seata1/{id}")
public Share auditByIdSeata1(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO) {
     return this.shareSeataService.auditById(id, auditDTO);
}      

Service代碼:

  • 稽核中心服務代碼(含調用使用者中心代碼接口)
@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareSeataService {
    private final ShareMapper shareMapper;
    private final UserCenterFeignClient userCenterFeignClient;

   @GlobalTransactional(rollbackFor = Exception.class)
    public Share auditById(Integer id, ShareAuditDTO auditDTO) {
        if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) { 
            userCenterFeignClient.addBonus(id, 50);
           // 故意抛異常,如果⽤戶中⼼側也能復原,說明實作了分布式事務
          // throw new IllegalArgumentException("發⽣異常...");
        }
        this.auditByIdInDB(id, auditDTO);
        return this.shareMapper.selectByPrimaryKey(id);
   }      
  • 稽核中心服務代碼(執行操作更新資料庫機制)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
        Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString()).reason(auditDTO.getReason())
       .build();
        this.shareMapper.updateByPrimaryKeySelective(share);
    }      
@GlobalTransactional 注解⽤來建立分布式事務。

被調⽤⽅代碼

Maven依賴

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>      

配置

seata:
    tx-service-group: user-center-seata-service-group
      service:
      vgroup-mapping:
         user-center-seata-service-group: seata-cluster
         grouplist:
            seata-cluster: 127.0.0.1:8091
            disable-global-transaction: false      
可以看出來,差距主要展現在tx-service-group的值。

Controller代碼:

@GetMapping("/add-bonus/{id}/{bonus}")
public User addBonus(@PathVariable Integer id, @PathVariable Integer bonus) {
    this.userService.addBonus(id, bonus);
    return this.userService.findById(id);
}      

Service代碼:

@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class UserService {
    private final UserMapper userMapper;
    private final BonusEventLogMapper bonusEventLogMapper;
    public User findById(Integer id) {
    // select * from user where id = #{id}
        return this.userMapper.selectByPrimaryKey(id);
    }

public void addBonus(Integer userId, Integer bonus) {
    // 1. 為⽤戶加積分
    User user = this.userMapper.selectByPrimaryKey(userId);
    user.setBonus(user.getBonus() + bonus);
    this.userMapper.updateByPrimaryKeySelective(user);
    // 2. 記錄⽇志到bonus_event_log表⾥⾯
    this.bonusEventLogMapper.insert(BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE").createTime(new Date()).description("投稿加積分..").build());
    log.info("積分添加完畢...");
 }
}      
Seata TCC模式
  • ⼀階段 prepare ⾏為
  • ⼆階段 commit 或 rollback ⾏為

需要實作的3個⽅法:

  • ⼀階段:
  • ⽤于業務預處理的⽅法,即 Try 階段、的⽅法,⽐如當機⽤戶的部分餘額等等;
  • ⼆階段:
  • ⽤于送出業務的⽅法,即 Commit ⽅法,⽐如扣除⽤戶之前當機的部分餘額;
  • ⽤于復原業務的⽅法,即 Rollback ⽅法,⽐如返還之前當機的⽤戶餘額;

官⽅⽂檔

​​http://seata.io/zh-cn/docs/dev/mode/tcc-mode.html​​

代碼示範

行為操作接口

@LocalTCC
public interface TccActionOne {
   @TwoPhaseBusinessAction(name = "TccActionOne", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepare(BusinessActionContext actionContext, int a);
    boolean commit(BusinessActionContext actionContext);
    boolean rollback(BusinessActionContext actionContext);
}      

接口實作類

  • 實作類1
@Component
public class TccActionOneImpl implements TccActionOne {
    @Override
    public boolean prepare(BusinessActionContext actionContext, int a) {
        // 這⾥是本地玩的,也可以調⽤其他微服務的接⼝
        String xid = actionContext.getXid();
        System.out.println("TccActionOne prepare, xid:" + xid);
        return true;
    }
    @Override
    public boolean commit(BusinessActionContext actionContext) {
        // 這⾥是本地玩的,也可以調⽤其他微服務的接⼝
        String xid = actionContext.getXid();
        System.out.println("TccActionOne commit, xid:" + xid);
        ResultHolder.setActionOneResult(xid, "T");
        return true;
    }
    @Override
    public boolean rollback(BusinessActionContext actionContext) {
        // 這⾥是本地玩的,也可以調⽤其他微服務的接⼝
        String xid = actionContext.getXid();
        System.out.println("TccActionOne rollback, xid:" + xid);
        ResultHolder.setActionOneResult(xid, "R");
        return true;
    }
}      
@LocalTCC
public interface TccActionTwo {
    @TwoPhaseBusinessAction(name = "TccActionTwo", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepare(BusinessActionContext actionContext, int a);
    boolean commit(BusinessActionContext actionContext);
    boolean rollback(BusinessActionContext actionContext);
}      
  • 實作類2
@Component
public class TccActionTwoImpl implements TccActionTwo {
       @Override
       public boolean prepare(BusinessActionContext actionContext, String b) {
              // 這⾥是本地玩的,也可以調⽤其他微服務的接⼝
              String xid = actionContext.getXid();
              System.out.println("TccActionTwo prepare, xid:" + xid);
              return true;
       }
       @Override
       public boolean commit(BusinessActionContext actionContext) {
              // 這⾥是本地玩的,也可以調⽤其他微服務的接⼝
              String xid = actionContext.getXid();
              System.out.println("TccActionTwo commit, xid:" + xid);
              ResultHolder.setActionTwoResult(xid, "T");
              return true;
       }
       @Override
       public boolean rollback(BusinessActionContext actionContext) {
              // 這⾥是本地玩的,也可以調⽤其他微服務的接⼝
              String xid = actionContext.getXid();
              System.out.println("TccActionTwo rollback, xid:" + xid);
              ResultHolder.setActionTwoResult(xid, "R");
              return true;
       }
}      
  • 聚合實作服務業務實作類執行
@Service
public class ShareSeataService{

    @Autowired
    TccActionOne tccActionOne;

    @Autowired
    TccActionTwo tccActionTwo;

    @GlobalTransactional
    public void tccTransactionCommit(Map<String, String> paramMap) {
        //第一個TCC 事務參與者
        boolean result = tccActionOne.prepare(null, "one");
        if (!result) {
            paramMap.put("xid",RootContext.getXID());
            throw new RuntimeException("TccActionOne failed.");
        }
        List list = new ArrayList();
        list.add("c1");
        list.add("c2");
        result = tccActionTwo.prepare(null, "two");
        if (!result) {
            paramMap.put("xid",RootContext.getXID());
            throw new RuntimeException("TccActionTwo failed.");
        }
       paramMap.put("xid",RootContext.getXID());
        return ;
    }
}

// 復原的代碼相似,就不寫了      
  • 執行調用點
@Slf4j
@RestController
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareAdminController {
    private final ShareSeataService shareSeataService;
    @GetMapping("tcc-commit")
    public String tccTransactionCommit() {
        Map<String, String> map = new HashMap<>();
        this.shareSeataService.tccTransactionCommit(map);
        String xid = map.get("xid");
        // 結果T
        return ResultHolder.getActionOneResult(xid);
    }
    @GetMapping("/tcc-rollback")
    public String tccTransactionRollback() {
       Map<String, String> map = new HashMap<>();
       try {
          this.shareSeataService.tccTransactionRollback(map);
       } catch (Throwable t) {
          log.warn("事務復原..", t);
       }
       String xid = map.get("xid");
       // 結果R
       return ResultHolder.getActionOneResult(xid);
   }
}      

定義狀态機⽂件:

{
    "Name": "reduceInventoryAndBalance",
    "Comment": "reduce inventory then reduce balance in a transaction",
    "StartState": "ReduceInventory",
    "Version": "0.0.1",
    "States": {
        "ReduceInventory": {
            "Type": "ServiceTask",
            "ServiceName": "inventoryAction",
            "ServiceMethod": "reduce",
            "CompensateState": "CompensateReduceInventory",
            "Next": "ChoiceState",
            "Input": [
                "$.[businessKey]",
                "$.[count]"
            ],
            "Output": {
                "reduceInventoryResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",
                "#root == false": "FA",
                "$Exception{java.lang.Throwable}": "UN"
            }
        },
        "ChoiceState": {
            "Type": "Choice",
            "Choices": [
                {
                    "Expression": "[reduceInventoryResult] == true",
                    "Next": "ReduceBalance"
                }
            ],
            "Default": "Fail"
        },
        "ReduceBalance": {
            "Type": "ServiceTask",
            "ServiceName": "balanceAction",
            "ServiceMethod": "reduce",
            "CompensateState": "CompensateReduceBalance",
            "Input": [
                "$.[businessKey]",
                "$.[amount]",
                {
                    "throwException": "$.[mockReduceBalanceFail]"分布式事務27
                }
            ],
            "Output": {
                "compensateReduceBalanceResult": "$.#root"
            },
            "Status": {
                "#root == true": "SU",
                "#root == false": "FA",
                "$Exception{java.lang.Throwable}": "UN"
            },
            "Catch": [
                {
                    "Exceptions": [
                        "java.lang.Throwable"
                    ],
                    "Next": "CompensationTrigger"
                }
            ],
            "Next": "Succeed"
        },
        "CompensateReduceInventory": {
            "Type": "ServiceTask",
            "ServiceName": "inventoryAction",
            "ServiceMethod": "compensateReduce",
            "Input": [
                "$.[businessKey]"
            ]
        },
        "CompensateReduceBalance": {
            "Type": "ServiceTask",
            "ServiceName": "balanceAction",
            "ServiceMethod": "compensateReduce",
            "Input": [
                "$.[businessKey]"
            ]
        },
        "CompensationTrigger": {
            "Type": "CompensationTrigger",
            "Next": "Fail"
        },
        "Succeed": {
            "Type": "Succeed"
        },
        "Fail": {
            "Type": "Fail",
            "ErrorCode": "PURCHASE_FAILED",
            "Message": "purchase failed"
        }
    }
}      

測試代碼:

public class LocalSagaTransactionStarter {
        public static void main(String[] args) {
            AbstractApplicationContext applicationContext = new ClassPathXmlApplication
            Context(new String[] {"spring/seata-saga.xml"});
            StateMachineEngine stateMachineEngine = (StateMachineEngine) applicationCon
            text.getBean("stateMachineEngine");
            transactionCommittedDemo(stateMachineEngine);
            transactionCompensatedDemo(stateMachineEngine);
            new ApplicationKeeper(applicationContext).keep();
        }
        private static void transactionCommittedDemo(StateMachineEngine stateMachineEng
ine) {
            Map<String, Object> startParams = new HashMap<>(3);
            String businessKey = String.valueOf(System.currentTimeMillis());
            startParams.put("businessKey", businessKey);
            startParams.put("count", 10);
            startParams.put("amount", new BigDecimal("100"));
//sync test
            StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduce
                    InventoryAndBalance", null, businessKey, startParams);
                    Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transactio
                            n execute failed. XID: " + inst.getId());
            System.out.println("saga transaction commit succeed. XID: " + inst.getId())
            ;
            inst = stateMachineEngine.getStateMachineConfig().getStateLogStore().getSta
            teMachineInstanceByBusinessKey(businessKey, null);
            Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transactio
                    n execute failed. XID: " + inst.getId());
//async test
            businessKey = String.valueOf(System.currentTimeMillis());
            inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBala
                    nce", null, businessKey, startParams, CALL_BACK);
                    waittingForFinish(inst);
            Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transactio
                    n execute failed. XID: " + inst.getId());
            分布式事務
            29
            System.out.println("saga transaction commit succeed. XID: " + inst.getId())
            ;
        }
        private static void transactionCompensatedDemo(StateMachineEngine stateMachineE
ngine) {
            Map<String, Object> startParams = new HashMap<>(4);
            String businessKey = String.valueOf(System.currentTimeMillis());
            startParams.put("businessKey", businessKey);
            startParams.put("count", 10);
            startParams.put("amount", new BigDecimal("100"));
            startParams.put("mockReduceBalanceFail", "true");
//sync test
            StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduce
                    InventoryAndBalance", null, businessKey, startParams);
//async test
                    businessKey = String.valueOf(System.currentTimeMillis());
            inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBala
                    nce", null, businessKey, startParams, CALL_BACK);
                    waittingForFinish(inst);
            Assert.isTrue(ExecutionStatus.SU.equals(inst.getCompensationStatus()), "sag
                    a transaction compensate failed. XID: " + inst.getId());
            System.out.println("saga transaction compensate succeed. XID: " + inst.getI
                    d());
        }
        private static volatile Object lock = new Object();
        private static AsyncCallback CALL_BACK = new AsyncCallback() {
            @Override
            public void onFinished(ProcessContext context, StateMachineInstance stateMach
ineInstance) {
                synchronized (lock){
                    lock.notifyAll();
                }
            }
            @Override
            public void onError(ProcessContext context, StateMachineInstance stateMachine
Instance, Exception exp) {
                synchronized (lock){
                    lock.notifyAll();
                }
            }
        };
        private static void waittingForFinish(StateMachineInstance inst){

            synchronized (lock){
                if(ExecutionStatus.RU.equals(inst.getStatus())){
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}      

Seata saga模式

Saga 事務:最終一緻性
方案簡介
處理流程

Saga事務基本協定

  1. 每個 Saga 事務由一系列幂等的有序子事務(sub-transaction) Ti 組成。
  2. 每個 Ti 都有對應的幂等補償動作 Ci,補償動作用于撤銷 Ti 造成的結果。
  3. 可以看到,和 TCC 相比,Saga 沒有“預留”動作,它的 Ti 就是直接送出到庫。
代碼實作
// 接⼝略
public class BalanceActionImpl implements BalanceAction {
    private static final Logger LOGGER = LoggerFactory.getLogger(BalanceActionImpl.class);
    @Override
    public boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params) {
       if(params != null && "true".equals(params.get("throwException"))){
         throw new RuntimeException("reduce balance failed");
       }
       LOGGER.info("reduce balance succeed, amount: " + amount + ", businessKey:" + businessKey);
       return true;
    }
      @Override
      public boolean compensateReduce(String businessKey, Map<String, Object> params) {
            if(params != null && "true".equals(params.get("throwException"))){
                  throw new RuntimeException("compensate reduce balance failed");
            }
            LOGGER.info("compensate reduce balance succeed, businessKey:" + businessKey);
            return true;
      }
}
// 接⼝略
public class InventoryActionImpl implements InventoryAction {
     private static final Logger LOGGER = LoggerFactory.getLogger(InventoryActionImpl.class);
     @Override
     public boolean reduce(String businessKey, int count) {
          LOGGER.info("reduce inventory succeed, count: " + count + ", businessKey:" + businessKey);
          return true;
     }
     @Override
     public boolean compensateReduce(String businessKey) {
          LOGGER.info("compensate reduce inventory succeed, businessKey:" + businessKey);
          return true;
     }
}      

Seata XA模式

官⽅⽂檔
代碼示範
  • 添加Seata的依賴 & 配置
  • 你平時的JDBC代碼怎麼寫,依然怎麼寫。

4種模式對⽐與選擇

AT

優勢:

  • 使⽤簡單:對業務侵⼊性⼩

缺點:

  • 性能中等
  • 有全局鎖

适⽤場景:

  • 适⽤于對性能沒有特别⾼的要求的場景
  • 适⽤于不希望對業務進⾏改造的場景
TCC

優勢

  • 性能會⽐ AT 模式⾼很多

缺點

  • 适⽤場景:

    适⽤于核⼼系統等對性能有很⾼要求的場景

Saga

優勢:

  • ⼀階段送出本地資料庫事務,⽆鎖,⾼性能;
  • 參與者可以采⽤事務驅動異步執⾏,⾼吞吐;
  • 補償服務即正向服務的“反向”,易于了解,易于實作;

缺點:

  • Saga 模式由于⼀階段已經送出本地資料庫事務,且沒有進⾏“預留”動作,是以不能保證隔離性。後續會講到對于缺乏隔離性的應對措施。

适⽤場景:

  • 業務流程⻓/多
  • 參與者包含其他公司或遺留系統服務,⽆法提供 TCC 模式要求的三個接⼝
  • 典型業務系統:如⾦融⽹絡(與外部⾦融機構對接)、互聯⽹微貸、管道整合、分布式架構服務
  • 內建等業務系統
  • 銀⾏業⾦融機構使⽤⼴泛
XA

優勢:

  • ⽆侵⼊

缺點:

  • 性能較差
  • 需要資料庫⽀持XA

适⽤場景:

  • 強⼀緻性的解決⽅案,适⽤于對⼀緻性要求⾮常⾼的場景(使⽤較少)