天天看点

分布式事务解决方案之TCC一、什么是TCC事务二、TCC解决方案三、Hmily实现TCC事务 四、总结

一、什么是TCC事务

TCC是Try、Confirm、Cancel三个词语的缩写,

TCC要求每个分支事务实现三个操作:预处理Try、确认Confirm、撤销Cancel。

Try操作做业务检查及资源预留,Confirm做业务确认操作,Cancel实现一个与Try相反的操作即回滚操作。

TM首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所有分支事务的Cancel操作,

若try操作全部成功,TM将会发起所有分支事务的Confirm操作,其中Confirm/Cancel操作若执行失败,TM会进行重试。

分布式事务解决方案之TCC一、什么是TCC事务二、TCC解决方案三、Hmily实现TCC事务 四、总结
 分支事务失败的情况:
分布式事务解决方案之TCC一、什么是TCC事务二、TCC解决方案三、Hmily实现TCC事务 四、总结

TCC分为三个阶段:

1. Try 阶段是做业务检查(一致性)及资源预留(隔离),此阶段仅是一个初步操作,它和后续的Confirm 一起才能真正构成一个完整的业务逻辑。

2. Confirm 阶段是做确认提交,Try阶段所有分支事务执行成功后开始执行 Confirm。通常情况下,采用TCC则认为 Confirm阶段是不会出错的。

即:只要Try成功,Confirm一定成功。若Confirm阶段真的出错了,需引入重试机制或人工处理。

3. Cancel 阶段是在业务执行错误需要回滚的状态下执行分支事务的业务取消,预留资源释放。

通常情况下,采用TCC则认为Cancel阶段也是一定成功的。若Cancel阶段真的出错了,需引入重试机制或人工处理。

4. TM事务管理器

TM事务管理器可以实现为独立的服务,也可以让全局事务发起方充当TM的角色,TM独立出来是为了成为公用组件,是为了考虑系统结构和软件复用。

TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条,用来记录事务上下文,追踪和记录状态,

由于 Confirm 和 Cancel 失败需进行重试,因此需要实现为幂等,幂等性是指同一个操作无论请求多少次,其结果都相同。

二、TCC解决方案

目前市面上的TCC框架众多比如下面这几种:

(以下数据采集日为2021年08月04日)

框架名称 Github地址 star数量
tcc-transaction https://github.com/changmingxie/tcc-transaction 5200
Hmily https://github.com/dromara/hmily 3700
ByteTCC https://github.com/liuyangming/ByteTCC 2700
EasyTransaction https://github.com/QNJR-GROUP/EasyTransaction 2300

上一节所讲的Seata也支持TCC,但Seata的TCC模式对Spring Cloud并没有提供支持。我们的目标是理解TCC的原理以及事务协调运作的过程,

因此更请倾向于轻量级易于理解的框架,因此最终确定了Hmily。

Hmily是一个高性能分布式事务TCC开源框架。基于Java语言来开发(JDK1.8),支持Dubbo,Spring Cloud等RPC框架进行分布式事务。它目前支持以下特性:

1.支持嵌套事务(Nested transaction support)。

2.采用disruptor框架进行事务日志的异步读写,与RPC框架的性能毫无差别。

3.支持SpringBoot-starter 项目启动,使用简单。

4.RPC框架支持 : dubbo,motan,springcloud。

5.本地事务存储支持 : redis,mongodb,zookeeper,file,mysql。

6.事务日志序列化支持 :java,hessian,kryo,protostuff。

7.采用Aspect AOP 切面思想与Spring无缝集成,天然支持集群。

8.RPC事务恢复,超时异常恢复等。

Hmily利用AOP对参与分布式事务的本地方法与远程方法进行拦截处理,通过多方拦截,事务参与者能透明的调用到另一方的Try、Confirm、Cancel方法;

传递事务上下文;并记录事务日志,酌情进行补偿,重试等。

Hmily不需要事务协调服务,但需要提供一个数据库(mysql/mongodb/zookeeper/redis/file)来进行日志存储。

Hmily实现的TCC服务与普通的服务一样,只需要暴露一个接口,也就是它的Try业务。

Confirm/Cancel业务逻辑,只是因为全局事务提交/回滚的需要才提供的,因此Confirm/Cancel业务只需要被Hmily TCC事务框架发现即可,不需要被调用它的其他业务服务所感知。

官网介绍:https://dromara.org/website/zh-cn/docs/hmily/index.html

TCC需要注意三种异常处理分别是空回滚、幂等、悬挂

空回滚:

在没有调用 TCC 资源 Try 方法的情况下,调用了二阶段的 Cancel 方法,Cancel 方法需要识别出这是一个空回滚,然后直接返回成功。

出现原因是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行Try阶段,当故障恢复后,

分布式事务进行回滚则会调用二阶段的Cancel方法,从而形成空回滚。

解决思路是关键就是要识别出这个空回滚。

思路很简单就是需要知道一阶段是否执行,如果执行了,那就是正常回滚;如果没执行,那就是空回滚。

前面已经说过TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条。再额外增加一张分支事务记录表,其中有全局事务 ID 和分支事务 ID,

第一阶段 Try 方法里会插入一条记录,表示一阶段执行了。Cancel 接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。

幂等:

通过前面介绍已经了解到,为了保证TCC二阶段提交重试机制不会引发数据不一致,要求 TCC 的二阶段 Try、Confirm 和 Cancel 接口保证幂等,

这样不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致数据不一致等严重问题。

解决思路在上述“分支事务记录”中增加执行状态,每次执行前都查询该状态。

悬挂:

悬挂就是对于一个分布式事务,其二阶段 Cancel 接口比 Try 接口先执行。

出现原因是在 RPC 调用分支事务try时,先注册分支事务,再执行RPC调用,如果此时 RPC 调用的网络发生拥堵,

通常 RPC 调用是有超时时间的,RPC 超时以后,TM就会通知RM回滚该分布式事务,可能回滚完成后,RPC 请求才到达参与者真正执行,而一个 Try 方法预留的业务资源,

只有该分布式事务才能使用,该分布式事务第一阶段预留的业务资源就再也没有人能够处理了,对于这种情况,我们就称为悬挂,即业务资源预留后没法继续处理。

解决思路是如果二阶段执行完成,那一阶段就不能再继续执行。在执行一阶段事务时判断在该全局事务下,“分支事务记录”表中是否已经有二阶段事务记录,如果有则不执行Try。

举例,场景为 A 转账 30 元给 B,A和B账户在不同的服务。

方案1:

账户A

try:
        检查余额是否够30元
        扣减30元
confirm:
        空
cancel:
        增加30元
           
账户B
try:
        增加30元
confirm:
        空
cancel:
        减少30元
           

方案1说明:

1)账户A,这里的余额就是所谓的业务资源,按照前面提到的原则,在第一阶段需要检查并预留业务资源,

因此,我们在扣钱 TCC 资源的 Try 接口里先检查 A 账户余额是否足够,如果足够则扣除 30 元。

Confirm 接口表示正式提交,由于业务资源已经在 Try 接口里扣除掉了,那么在第二阶段的 Confirm 接口里可以什么都不用做。

Cancel接口的执行表示整个事务回滚,账户A回滚则需要把 Try 接口里扣除掉的 30 元还给账户。

2)账号B,在第一阶段 Try 接口里实现给账户B加钱,Cancel 接口的执行表示整个事务回滚,账户B回滚则需要把Try 接口里加的 30 元再减去。

方案1的问题分析:

1)如果账户A的try没有执行在cancel则就多加了30元。

2)由于try,cancel、confirm都是由单独的线程去调用,且会出现重复调用,所以都需要实现幂等。

3)账号B在try中增加30元,当try执行完成后可能会其它线程给消费了。

4)如果账户B的try没有执行在cancel则就多减了30元。

问题解决:

1)账户A的cancel方法需要判断try方法是否执行,正常执行try后方可执行cancel。

2)try,confirm,cancel方法实现幂等。

3)账号B在try方法中不允许更新账户金额,在confirm中更新账户金额。

4)账户B的cancel方法需要判断try方法是否执行,正常执行try后方可执行cancel。

优化方案:

账户A

try:
        try幂等校验
        try悬挂处理
        检查余额是否够30元
        扣减30元
confirm:        
        空
cancel:
        cancel幂等校验
        cancel空回滚处理
        增加可用余额30元
           
账户B
try:
        空
confirm:
        confirm幂等校验
        正式增加30元
cancel:
        空
           

三、Hmily实现TCC事务

3.1 业务说明

本实例通过Hmily实现TCC分布式事务,模拟两个账户的转账交易过程。

两个账户分别在不同的银行(张三在bank1、李四在bank2),bank1、bank2是两个微服务。

交易过程是,张三给李四转账指定金额。

上述交易步骤,要么一起成功,要么一起失败,必须是一个整体性的事务。

分布式事务解决方案之TCC一、什么是TCC事务二、TCC解决方案三、Hmily实现TCC事务 四、总结

3.2 程序组成部分

数据库:MySQL-5.7.25

JDK:64位 jdk1.8.0_201

微服务:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE

Hmily:hmily-springcloud.2.0.4-RELEASE

微服务及数据库的关系 :

tcc-demo-bank1 银行1,操作张三账户, 连接数据库bank1

tcc-demo-bank2 银行2,操作李四账户,连接数据库bank2

服务注册中心:eureka-server

3.3 创建数据库

创建hmily数据库,用于存储hmily框架记录的数据。
create database `hmily` character set 'utf8' collate 'utf8_general_ci';
           
创建bank1库,并导入以下表结构和数据(包含张三账户)
create database `bank1` character set 'utf8' collate 'utf8_general_ci';

drop table if exists `account_info`;
create table `account_info` (
`id` bigint(20) not null auto_increment,
`account_name` varchar(100) character set utf8 collate utf8_bin null default null comment '户
主姓名',
`account_no` varchar(100) character set utf8 collate utf8_bin null default null comment '银行
卡号',
`account_password` varchar(100) character set utf8 collate utf8_bin null default null comment
'帐户密码',
`account_balance` double null default null comment '帐户余额',
primary key (`id`) using btree
) engine = innodb auto_increment = 5 character set = utf8 collate = utf8_bin row_format =
dynamic;
insert into `account_info` values (2, '张三的账户', '1', '', 10000);
           
 创建bank2库,并导入以下表结构和数据(包含李四账户)
create database `bank2` character set 'utf8' collate 'utf8_general_ci';

create table `account_info` (
`id` bigint(20) not null auto_increment,
`account_name` varchar(100) character set utf8 collate utf8_bin null default null comment '户
主姓名',
`account_no` varchar(100) character set utf8 collate utf8_bin null default null comment '银行
卡号',
`account_password` varchar(100) character set utf8 collate utf8_bin null default null comment
'帐户密码',
`account_balance` double null default null comment '帐户余额',
primary key (`id`) using btree
) engine = innodb auto_increment = 5 character set = utf8 collate = utf8_bin row_format =
dynamic;
insert into `account_info` values (3, '李四的账户', '2', null, 0);
           
每个数据库都创建try、confirm、cancel三张日志表:
create table `local_try_log` (
`tx_no` varchar(64) not null comment '事务id',
`create_time` datetime default null,
primary key (`tx_no`)
) engine=innodb default charset=utf8;
 
create table `local_confirm_log` (
`tx_no` varchar(64) not null comment '事务id',
`create_time` datetime default null
) engine=innodb default charset=utf8;
 
create table `local_cancel_log` (
`tx_no` varchar(64) not null comment '事务id',
`create_time` datetime default null,
primary key (`tx_no`)
) engine=innodb default charset=utf8;
           

3.4 eureka-server

eureka-server是服务注册中心,测试工程将自己注册至discover-server。

3.5 构建案例工程tcc-demo-bank

3.5.1 构建tcc-demo-bank

两个测试工程如下:

tcc-demo-bank1 银行1,操作张三账户,连接数据库bank1

tcc-demo-bank2 银行2,操作李四账户,连接数据库bank2

3.5.2 引入maven依赖

依赖下载不下来

<dependency>
    <groupId>org.dromara</groupId>
    <artifactId>hmily‐springcloud</artifactId>
    <version>2.0.4‐RELEASE</version>
</dependency>
           

3.5.3 配置hmily

server:
  port: 5555

# 注册到eureka服务端的微服务名称
spring:
  application:
    name: tcc-demo-bank1
  datasource:
    url: jdbc:mysql://localhost:3306/bank1?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
    username: root
    password: root
    type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    initialSize: 5
    minIdle: 5
    maxActive: 20
    maxWait: 60000
    timeBetweenEvictionRunsMillis: 60000
    minEvictableIdleTimeMillis: 300000
    validationQuery: SELECT user()
    testWhileIdle: true
    testOnBorrow: false
    testOnReturn: false
    poolPreparedStatements: true
    connection-properties: druid.stat.mergeSql:true;druid.stat.slowSqlMillis:5000


eureka:
  instance:
    # 将ip注册到eureka server上,ip替代hostname
    prefer-ip-address: true
    # 显示微服务的服务实例id
    instance-id: tcc-demo-bank1-${server.port}
  client:
    register-with-eureka: true
    fetchRegistry: true
    service-url:
      # 注册到eureka服务端的地址
      defaultZone: http://127.0.0.1:2222/eureka

org:
  dromara:
    hmily :
      serializer : kryo
      recoverDelayTime : 128
      retryMax : 30
      scheduledDelay : 128
      scheduledThreadMax : 10
      repositorySupport : db
      started: true
      hmilyDbConfig :
        driverClassName : com.mysql.cj.jdbc.Driver
        url : jdbc:mysql://localhost:3306/hmily?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
        username : root
        password : root

ribbon:
  # 设置连接超时时间 default 2000
  ConnectTimeout: 6000
  # 设置读取超时时间  default 5000
  ReadTimeout: 6000
  # 对所有操作请求都进行重试  default false
  OkToRetryOnAllOperations: true
  # 切换实例的重试次数  default 1
  MaxAutoRetriesNextServer: 2
  # 对当前实例的重试次数 default 0
  MaxAutoRetries: 1
           

3.5.4 配置类

新增配置类接收application.yml中的Hmily配置信息,并创建HmilyTransactionBootstrap Bean:
@Configuration
@EnableAspectJAutoProxy(proxyTargetClass=true)
public class HmilyConfiguration {

    @Autowired
    private Environment env;

    @Bean
    public HmilyTransactionBootstrap hmilyTransactionBootstrap(HmilyInitService hmilyInitService){
        HmilyTransactionBootstrap hmilyTransactionBootstrap = new HmilyTransactionBootstrap(hmilyInitService);
        hmilyTransactionBootstrap.setSerializer(env.getProperty("org.dromara.hmily.serializer"));
        hmilyTransactionBootstrap.setRecoverDelayTime(Integer.parseInt(env.getProperty("org.dromara.hmily.recoverDelayTime")));
        hmilyTransactionBootstrap.setRetryMax(Integer.parseInt(env.getProperty("org.dromara.hmily.retryMax")));
        hmilyTransactionBootstrap.setScheduledDelay(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledDelay")));
        hmilyTransactionBootstrap.setScheduledThreadMax(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledThreadMax")));
        hmilyTransactionBootstrap.setRepositorySupport(env.getProperty("org.dromara.hmily.repositorySupport"));
        hmilyTransactionBootstrap.setStarted(Boolean.parseBoolean(env.getProperty("org.dromara.hmily.started")));
        HmilyDbConfig hmilyDbConfig = new HmilyDbConfig();
        hmilyDbConfig.setDriverClassName(env.getProperty("org.dromara.hmily.hmilyDbConfig.driverClassName"));
        hmilyDbConfig.setUrl(env.getProperty("org.dromara.hmily.hmilyDbConfig.url"));
        hmilyDbConfig.setUsername(env.getProperty("org.dromara.hmily.hmilyDbConfig.username"));
        hmilyDbConfig.setPassword(env.getProperty("org.dromara.hmily.hmilyDbConfig.password"));
        hmilyTransactionBootstrap.setHmilyDbConfig(hmilyDbConfig);
        return hmilyTransactionBootstrap;
    }

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource ds1() {
        DruidDataSource druidDataSource = new DruidDataSource();
        return druidDataSource;
    }
}
           
启动类增加@EnableAspectJAutoProxy并增加org.dromara.hmily的扫描项:
@ComponentScan({"com.best", "org.dromara.hmily"})
@EnableEurekaClient
@EnableFeignClients
@SpringBootApplication(exclude = MongoAutoConfiguration.class) // 取消数据源自动创建的配置
public class EurekaTccDemoBank1Application {
    public static void main(String[] args) {
        SpringApplication.run(EurekaTccDemoBank1Application.class, args);
    }
}
           

3.6 tcc-demo-bank1

tcc-demo-bank1实现try和cancel方法,如下:
try:
    try幂等校验
    try悬挂处理
    检查余额是够扣减金额
    扣减金额
confirm:
    空
cancel:
    cancel幂等校验
    cancel空回滚处理
    增加可用余额
           

3.6.1 DAO

@Mapper
@Component
public interface AccountInfoDao {

    @Update("update account_info set account_balance = account_balance + #{amount} where account_balance >= #{amount} and account_no = #{accountNo}")
    int subtractAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);

    @Update("update account_info set account_balance = account_balance + #{amount} where account_no = #{accountNo} ")
    int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);

    /**
     * 增加某分支事务try执行记录
     *
     * @param localTradeNo 本地事务编号
     */
    @Insert("insert into local_try_log values(#{txNo}, now());")
    int addTry(@Param("txNo") String localTradeNo);

    /**
     * 增加某分支事务Confirm执行记录
     *
     * @param localTradeNo 本地事务编号
     */
    @Insert("insert into local_confirm_log values(#{txNo}, now());")
    int addConfirm(@Param("txNo") String localTradeNo);

    /**
     * 增加某分支事务Cancel执行记录
     *
     * @param localTradeNo 本地事务编号
     */
    @Insert("insert into local_cancel_log values(#{txNo}, now());")
    int addCancel(@Param("txNo") String localTradeNo);

    /**
     * 查询分支事务try是否已执行
     *
     * @param localTradeNo 本地事务编号
     */
    @Select("select count(1) from local_try_log where tx_no = #{txNo} ")
    int isExistTry(@Param("txNo") String localTradeNo);

    /**
     * 查询分支事务confirm是否已执行
     *
     * @param localTradeNo 本地事务编号
     */
    @Select("select count(1) from local_confirm_log where tx_no = #{txNo} ")
    int isExistConfirm(@Param("txNo") String localTradeNo);

    /**
     * 查询分支事务cancel是否已执行
     *
     * @param localTradeNo 本地事务编号
     */
    @Select("select count(1) from local_cancel_log where tx_no = #{txNo} ")
    int isExistCancel(@Param("txNo") String localTradeNo);
}
           

3.6.2 try和cancel方法

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

    @Resource
    private AccountInfoDao accountInfoDao;

    @Resource
    private Bank2Client bank2Client;

    // 张三转账,就是tcc的try方法
    @Override
    @Transactional
    // 只要标记@Hmily就是try方法,在注解中指定confirm、cancel两个方法的名字
    @Hmily(confirmMethod = "confirm", cancelMethod = "cancel")
    public void updateAccountBalance(String accountNo, Double amount) {

        // 获取全局事务id
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();

        log.info("bank1 try begin 开始执行...xid:{}", transId);

        int existTry = accountInfoDao.isExistTry(transId);

        // 幂等判断 判断local_try_log表中是否有try日志记录,如果有则不再执行
        if (existTry > 0) {
            log.info("bank1 try 已经执行,无需重复执行,xid:{}", transId);
            return;
        }

        // try悬挂处理,如果cancel、confirm有一个已经执行了,try不再执行
        if (accountInfoDao.isExistConfirm(transId) > 0 || accountInfoDao.isExistCancel(transId) > 0) {
            log.info("bank1 try悬挂处理  cancel或confirm已经执行,不允许执行try,xid:{}", transId);
            return;
        }

        // 扣减金额
        if (accountInfoDao.subtractAccountBalance(accountNo, amount * -1) <= 0) {
            //扣减失败
            throw new RuntimeException("bank1 try 扣减金额失败,xid:{}" + transId);
        }

        // 增加本地事务try成功记录,用于幂等判断
        accountInfoDao.addTry(transId);

        // 远程调用bank2
        if (!bank2Client.transfer(amount)) {
            throw new RuntimeException("bank1 远程调用李四微服务失败,xid:{}" + transId);
        }
        if (amount == 2) {
            throw new RuntimeException("人为制造异常,xid:{}" + transId);
        }
        log.info("bank1 try end 结束执行...xid:{}", transId);
    }

    // confirm方法
    @Override
    @Transactional
    public void confirm(String accountNo, Double amount) {
        // 获取全局事务id
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
        log.info("bank1 confirm begin 开始执行...xid:{},accountNo:{},amount:{}", transId, accountNo, amount);
    }

    // cancel方法
    @Override
    @Transactional
    public void cancel(String accountNo, Double amount) {

        //获取全局事务id
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();

        log.info("bank1 cancel begin 开始执行...xid:{}", transId);

        //  cancel幂等校验,已经执行过了,什么也不用做
        if (accountInfoDao.isExistCancel(transId) > 0) {
            log.info("bank1 cancel 已经执行,无需重复执行,xid:{}", transId);
            return;
        }

        // cancel空回滚处理,如果try没有执行,cancel不允许执行
        if (accountInfoDao.isExistTry(transId) <= 0) {
            log.info("bank1 空回滚处理,try没有执行,不允许cancel执行,xid:{}", transId);
            return;
        }

        // 再将金额加回账户
        accountInfoDao.addAccountBalance(accountNo, amount);

        // 添加cancel日志,用于幂等性控制标识
        accountInfoDao.addCancel(transId);
        log.info("bank1 cancel end 结束执行...xid:{}", transId);
    }

}
           

3.6.4 feignClient

@FeignClient(value = "tcc-demo-bank2", fallback = Bank2ClientFallback.class)
public interface Bank2Client {

    // 远程调用李四的微服务
    @GetMapping("/bank2/transfer")
    @Hmily
    Boolean transfer(@RequestParam("amount") Double amount);
}
           
@Component
public class Bank2ClientFallback implements Bank2Client {
    @Override
    public Boolean transfer(Double amount) {
        return false;
    }
}
           

3.6.5 Controller

@RestController
@RequestMapping("/bank1")
public class Bank1Controller {

    @Resource
    private AccountInfoService accountInfoService;

    @GetMapping("/transfer")
    public Boolean transfer(@RequestParam("amount") Double amount){
        accountInfoService.updateAccountBalance("1", amount);
        return true;
    }
}
           

3.7 tcc-demo-bank2

dtx-tcc-demo-bank2实现如下功能:
try:
    空
confirm:
    confirm幂等校验
    正式增加金额
cancel:
    空
           

3.7.1 DAO

@Mapper
@Component
public interface AccountInfoDao {

    @Update("update account_info set account_balance = account_balance ‐ #{amount} where account_balance > #{amount} and account_no = #{accountNo} ")
    int subtractAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);

    @Update("update account_info set account_balance = account_balance + #{amount} where account_no = #{accountNo} ")
    int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);

    /**
     * 增加某分支事务try执行记录
     *
     * @param localTradeNo 本地事务编号
     */
    @Insert("insert into local_try_log values(#{txNo}, now());")
    int addTry(@Param("txNo") String localTradeNo);

    /**
     * 增加某分支事务Confirm执行记录
     *
     * @param localTradeNo 本地事务编号
     */
    @Insert("insert into local_confirm_log values(#{txNo}, now());")
    int addConfirm(@Param("txNo") String localTradeNo);

    /**
     * 增加某分支事务Cancel执行记录
     *
     * @param localTradeNo 本地事务编号
     */
    @Insert("insert into local_cancel_log values(#{txNo}, now());")
    int addCancel(@Param("txNo") String localTradeNo);

    /**
     * 查询分支事务try是否已执行
     *
     * @param localTradeNo 本地事务编号
     */
    @Select("select count(1) from local_try_log where tx_no = #{txNo} ")
    int isExistTry(@Param("txNo") String localTradeNo);

    /**
     * 查询分支事务confirm是否已执行
     *
     * @param localTradeNo 本地事务编号
     */
    @Select("select count(1) from local_confirm_log where tx_no = #{txNo} ")
    int isExistConfirm(@Param("txNo") String localTradeNo);

    /**
     * 查询分支事务cancel是否已执行
     *
     * @param localTradeNo 本地事务编号
     */
    @Select("select count(1) from local_cancel_log where tx_no = #{txNo} ")
    int isExistCancel(@Param("txNo") String localTradeNo);
}
           

3.7.2 实现confirm方法

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

    @Resource
    private AccountInfoDao accountInfoDao;

    @Override
    @Transactional
    @Hmily(confirmMethod = "confirmMethod", cancelMethod = "cancelMethod")
    public void updateAccountBalance(String accountNo, Double amount) {
        // 获取全局事务id
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
        log.info("******** Bank2 Service Begin try ..." + transId);
    }

    @Override
    @Transactional
    public void confirmMethod(String accountNo, Double amount) {
        // 获取全局事务id
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();

        // 幂等性校验,已经执行过了,什么也不用做
        log.info("******** Bank2 Service commit... " + transId);
        if (accountInfoDao.isExistConfirm(transId) > 0) {
            log.info("******** Bank2 confirm 已经执行,无需重复执行...xid:{}", transId);
            return;
        }

        // 正式增加金额
        accountInfoDao.addAccountBalance(accountNo, amount);

        // 增加一条confirm日志,用于幂等校验
        accountInfoDao.addConfirm(transId);
    }

    @Override
    @Transactional
    public void cancelMethod(String accountNo, Double amount) {
        // 获取全局事务id
        String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
        log.info("******** Bank2 Service Begin cancel... " + transId);
    }
}
           

3.7.3 Controller

@RestController
@RequestMapping("/bank2")
public class Bank2Controller {

    @Resource
    private AccountInfoService accountInfoService;

    // 接收张三的转账
    @GetMapping("/transfer")
    public Boolean transfer(@RequestParam("amount") Double amount) {

        // 李四增加金额
        accountInfoService.updateAccountBalance("2", amount);

        return true;
    }
}
           

3.8 测试场景

张三向李四转账成功。

李四事务失败,张三事务回滚成功。

张三事务失败,李四分支事务回滚成功。

分支事务超时测试。

http://localhost:5555/bank1/transfer?amount=1

http://localhost:5555/bank1/transfer?amount=2

分布式事务解决方案之TCC一、什么是TCC事务二、TCC解决方案三、Hmily实现TCC事务 四、总结

 四、总结

如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的处理,需要通过业务逻辑来实现。

这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能。

而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。

此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。

视频教程、参考博客、gitt源码​​​​​​​

继续阅读