天天看点

分布式事务极速解决企业级分布式事务之spingCloud+tx-lcn

极速解决企业级分布式事务之spingCloud+tx-lcn

https://blog.csdn.net/qq_33449307/article/details/102550878中我们实现了[springBoot+jpa+jta+atomikos实现分布式事务,模拟多数据源],这种方式基于2PC的,了解2PC,3PC会知道这些方式存在一些问题:

1.引入了中间件中心化协调者 操作复杂化 一旦中心协调者挂了 就会出现很多问题

2.互联网行业对数据的绝对一致性没有传统企业那么高(2PC 3PC 都是强一致性的)

3.性能问题

在互联网企业中,面向用户更需要的是短暂的响应时间,即 高可用,强一致性可以降低

常用的解决方案的实现:

1.避免分布式事务 (很难做到)

2.tcc事务(补偿机制 最终一致性)

3.MQ消息队列(最终一致性)

4.最大努力通知 利用回调函数去解决(比如:支付宝的支付)www.xxx.com/pay return “success”;//必须返回一个success给支付宝相关接口,否则会一直通知你,直到你告诉它成功了 ,它才不会通知你

回调的方式有 同步 异步 (查询掉单:orderId)

我们将要实现的springcloud集成tx-lcn 解决企业级分布式事务

tx-LCN

官网:http://www.txlcn.org/zh-cn/docs/preface.html 官网文档可以仔细阅读以下,方便了解tx-lcn

官网上给的demo分别用dubbo和springCloud两种方式集成的.常用dubbo的同学可以去研究一下官网demo.废话不多说 开始我们的应用.

[注意这里我们使用的是tx-lcn的5.0.2版本 5.0.x版本的跟4.x版本的 有很大区别,5.0.x版本的支持三种事务方式(默认三种,可自定义扩展)]

1. LCN模式是通过代理Connection的方式实现对本地事务的操作,然后在由TxManager统一协调控制事务。当本地事务提交回滚或者关闭连接时将会执行假操作,该代理的连接将由LCN连接池管理。

该模式对代码的嵌入性为低。

该模式仅限于本地存在连接对象且可通过连接对象控制事务的模块。

该模式下的事务提交与回滚是由本地事务方控制,对于数据一致性上有较高的保障。

该模式缺陷在于代理的连接需要随事务发起方一共释放连接,增加了连接占用的时间。

2. TCC事务机制相对于传统事务机制(X/Open XA Two-Phase-Commit),其特征在于它不依赖资源管理器(RM)对XA的支持,而是通过对(由业务系统提供的)业务逻辑的调度来实现分布式事务。主要由三步操作,Try: 尝试执行业务、 Confirm:确认执行业务、 Cancel: 取消执行业务。

该模式对代码的嵌入性高,要求每个业务需要写三种步骤的操作。

该模式对有无本地事务控制都可以支持使用面广。

数据一致性控制几乎完全由开发者控制,对业务开发难度要求高

3. TXC模式命名来源于淘宝,实现原理是在执行SQL之前,先查询SQL的影响数据,然后保存执行的SQL快走信息和创建锁。当需要回滚的时候就采用这些记录数据回滚数据库,目前锁实现依赖redis分布式锁控制

该模式同样对代码的嵌入性低。

该模式仅限于对支持SQL方式的模块支持。

该模式由于每次执行SQL之前需要先查询影响数据,因此相比LCN模式消耗资源与时间要多。

该模式不会占用数据库的连接资源。

具体可参考官网文档

1.搭建springCloud项目

正常搭建springCloud工程:

实现的工能是 下单操作----->>>>增加订单 减少库存

搭建工程:lcn-demo 子工程为 lcn-eureka 注册中心 / lcn-stock 库存服务 / lcn-orders 订单服务

在lcn-orders里利用feign调用lcn-stock 完成 增加订单并减少库存的操作

部分代码(只看service ,controller 与dao 自定义 本案例使用jpa操作数据库):

实体类,数据表可自行定义,注意数据表引擎 不要用MyISAM 该类型不支持事务 要用 InnoDB类型的

相关实体类:

OrderEntity:

@Entity
@Table(name = "orders",schema = "lcndemo")
@Data
public class OrderEntity {
    @Id
    @Column(name = "id")
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Integer id;
    private String orderNo;//订单号
}
           

StockEntity:

@Entity
@Table(name = "stock",schema = "lcndemo")
@Data
public class StockEntity {
    @Id
    @Column(name = "id")
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Integer id;
    private Long amount;
}
           

lcn-stock 的测试service:

@Service
public class StockService {
    @Autowired
    StockRepository stockRepository;

    @Transactional(rollbackFor = Exception.class)//本地事务
    public String updateStock(Integer amount) {
        Optional<StockEntity> stockEntityOptional = stockRepository.findById(1); //实体类可自行定义 
        //判断查询结果是否存在
        if (!stockEntityOptional.isPresent()) throw new RuntimeException("物品库存不存在");
        StockEntity stockEntity = stockEntityOptional.get();
        stockEntity.setAmount(stockEntity.getAmount() - amount);
        stockRepository.save(stockEntity);
		 if(3==amount)//人为制造异常
             throw new RuntimeException("调用失败");
        return "success";
    }
           

lcn-orders 的测试service:

@Service
public class OrderService {
    @Autowired
    OrderRepository orderRepository;
    @Autowired
    StockClient stockClient; //feign调用的lcn-stock

    @Transactional(rollbackFor = Exception.class)
  
    public String addOrder(Integer amount) {
        //增加 订单
        OrderEntity orderEntity = new OrderEntity(); //这里是实体类 自   
        orderEntity.setOrderNo(new Date().getTime() + "");
        orderRepository.save(orderEntity);
        //人为模拟异常
        if(2==amount) throw new RuntimeException("不可执行的操作");//这里通常用的是 自定义异常

        //减少库存
       stockClient.updateStock(amount);
  
    return "success";
    }
}
           

这里可以人为制造异常.

当amount=2时

if(2==amount) throw new RuntimeException("不可执行的操作");

本地事务会回滚 ,不会有问题

当amount=3时

if(3==amount) throw new RuntimeException("调用失败");

此时已经已经保存过了order

orderRepository.save(orderEntity);

lcn-stock里的事务回滚 但是 保存的order不会回滚,此时就会 order增加,但库存没有减少,事务回滚异常.

解决办法:引入tx-lcn

依赖:redis(这里你要有redis(测试可以是单机,线上用集群))

搭建TxManager服务lcn-tm (官网下载的源码里的搭建TxManager服务中有很多类和配置,过于繁琐,所以自己搭建更好一点):

在我们的父pom.xml中引入tx-lcn的依赖:

<dependency>
                <groupId>com.codingapi.txlcn</groupId>
                <artifactId>txlcn-tc</artifactId>
                <version>5.0.2.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>com.codingapi.txlcn</groupId>
                <artifactId>txlcn-tm</artifactId>
                <version>5.0.2.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>com.codingapi.txlcn</groupId>
                <artifactId>txlcn-txmsg-netty</artifactId>
                <version>5.0.2.RELEASE</version>
            </dependency>
           

搭建的lcn-tm的pom.xml依赖:

<dependencies>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>com.codingapi.txlcn</groupId>
            <artifactId>txlcn-tm</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>
           

启动类 增加注解**@EnableTransactionManagerServer**:

@SpringBootApplication
@EnableDiscoveryClient  //注册到Eureka
@EnableTransactionManagerServer  //指定是tm服务
public class TransactionManagerApplication {
    public static void main(String[] args) {
        SpringApplication.run(TransactionManagerApplication.class, args);
    }
}
           

配置文件:

在resource目录中新建文件 application.properties (一定是要用.properties 文件) 再新建一个文件application-tm.yml (也可以不建,全写到 application.properties 中,我这里觉得.properties太麻烦 所以不必须卸载 application.properties 中的 我把它写在yml文件中)

解释一下为什么一定要用application.properties文件:

properties比yml优先级别高,这里如果我们不要 application.properties 而是用yml文件,启动时就会默认加载com.codingapi.txlcn:txlcn-tm下的application.properties导致自定义的配置文件无效

application.properties:都是一些常规设置 数据库相关配置 jpa相关配置

server.port=8083
spring.application.name=TransactionManager
spring.profiles.active=tm 		//激活application-tm.yml文件 启动时会读取该文件
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://39.105.157.149:3306/lcndemo
spring.datasource.username=root
spring.datasource.password=123456
spring.jpa.hibernate.ddl-auto=none
spring.jpa.show-sql=true
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect
spring.jpa.properties.hibernate.format_sql=true
eureka.client.registry-fetch-interval-seconds=5
eureka.client.service-url.defaultZone=http://localhost:8080/eureka/
eureka.instance.preferIpAddress=true
eureka.instance.instance-id=${spring.application.name}:${spring.cloud.client.ip-address}:${spring.application.instance_id:${server.port}}
eureka.instance.lease-renewal-interval-in-seconds=5
eureka.instance.lease-expiration-duration-in-seconds=10
           

application-tm.yml:tm的相关配置

#tx-lcn 相关配置
tx-lcn:
  manager:
    host: 127.0.0.1 #TM监听ip  默认为:127.0.0.1
    port: 8070 #TM监听socket端口 默认为${server.port}-100
    heart-time: 300000 #心跳检测时间(ms) 默认为300000
    dtx-time: 8000  #分布式事务执行总时间ms,默认为36000
    concurrent-level: 128 #事务处理并发等级,默认为机器逻辑核心数5倍
    admin-key: 123456 #tm后台默认登录密码 可以不设置  默认为codingapi
    dtx-lock-time: ${tx-lcn.manager.dtx-time} #分布式事务锁超时时间 默认为-1,当-1时会用tx-lcn.manager.dtx-time的时间
  message:
    netty:
      attr-delay-time: ${tx-lcn.manager.dtx-time} #参数延迟删除时间单位ms  默认为dtx-time值
  logger:
    enabled: true #开启事务
    driver-class-name: ${spring.datasource.driver-class-name}
    jdbc-url: ${spring.datasource.url}
    username: ${spring.datasource.username}
    password: ${spring.datasource.password}
#redis信息  线上请使用redis Cluster
spring:
  redis:
    host: 39.105.157.149 #redis地址ip
    port: 6379 #端口
    password: xxxx #密码
           

至此tm搭建完毕;线上的时候可以搭建tm集群

修改lcn-order 和lcn-stock:

在每个微服务的项目引入依赖:
           
<dependency>
            <groupId>com.codingapi.txlcn</groupId>
            <artifactId>txlcn-tc</artifactId>
        </dependency>

        <dependency>
            <groupId>com.codingapi.txlcn</groupId>
            <artifactId>txlcn-txmsg-netty</artifactId>
        </dependency>
           

每个微服务的启动类上增加注解:@EnableDistributedTransaction 表示是tx-lcn的客户端

修改配置文件application.yml 在每个微服务的该文件中增加配置 spring.profiles.active: tc

在每个微服务的resource目录中新建文件application-tc.yml文件,文件内容:

application-tc.yml:

#配置tx-lnc相关
## tx-manager 配置
tx-lcn:
  ribbon:
    loadbalancer:
      dtx:
        enabled: true #是否启动LCN负载均衡策略(优化选项,开启与否,功能不受影响)
  client:
    # tx-manager 的配置地址,可以指定TM集群中的任何一个或多个地址
    # tx-manager 下集群策略,每个TC都会从始至终<断线重连>与TM集群保持集群大小个连接。
    # TM方,每有TM进入集群,会找到所有TC并通知其与新TM建立连接。
    # TC方,启动时按配置与集群建立连接,成功后,会再与集群协商,查询集群大小并保持与所有TM的连接
    manager-address: 127.0.0.1:8070如果tm是集群 这里可以配置集群  格式用都好隔开  跟eureka客户端配置eureka集群时一样
    chain-level: 3 # 调用链长度等级,默认值为3(优化选项。系统中每个请求大致调用链平均长度,估算值。)
   # tm-rpc-timeout: 2000 #该参数为tc与tm通讯时的最大超时时间,单位ms。该参数不需要配置会在连接初始化时由tm返回。
   # dtx-time: 8000 # 该参数为分布式事务的最大时间,单位ms。该参数不允许TC方配置,会在连接初始化时由tm返回。
    #machine-id: 1 #该参数为雪花算法的机器编号,所有TC不能相同。该参数不允许配置,会在连接初始化时由tm返回。
    dtx-aspect-order: 0 #该参数为事务方法注解切面的orderNumber,默认值为0.
    resource-order: 0 #该参数为事务连接资源方法切面的orderNumber,默认值为0.
  #aspect:
   # log:
    #  file-path: logs/.txlcn/lcn-stock # 该参数是分布式事务框架存储的业务切面信息。采用的是h2数据库。绝对路径。该参数默认的值为{user.dir}/.txlcn/{application.name}-{application.port}
  logger: 
    enabled: true #开启事务  开启之后 会自动在对应的库中创建一张表:t_logger 用来记录事务日志
    driver-class-name: ${spring.datasource.driver-class-name} 
    jdbc-url: ${spring.datasource.url}
    username: ${spring.datasource.username}
    password: ${spring.datasource.password}
           

修改对应的service:

lcn-stock 的测试service:

开头说的三种模式分别对应的注解是:

@TccTransaction //tcc模式

@LcnTransaction //lcn模式

@TxcTransaction //txc模式

这里我们使用tcc模式做测试:tcc模式 是补偿机制,保证数据最终一致性,所以需要我们自己写补偿机制 ,既是提交方法confirmRpc(我在原本的逻辑中用jpa已经把事务提交了) 和回滚方法cancelRpc (根据自己的需要进行回滚)

注解中的:propagation = DTXPropagation.SUPPORTS 表示是事务参与方(非发起者) 如果不是事务发起端 一定要配置(默认的表示事务发起端),否则会报错

@Service
public class StockService {
    @Autowired
    StockRepository stockRepository;

    @Transactional(rollbackFor = Exception.class)//本地事务
    @TccTransaction(propagation = DTXPropagation.SUPPORTS,cancelMethod = "cancelRpc",confirmMethod = "confirmRpc")//事务参与方
   // @LcnTransaction   //lcn模式
   // @TxcTransaction   //txc模式
    public String updateStock(Integer amount) {
        Optional<StockEntity> stockEntityOptional = stockRepository.findById(1);
        //判断查询结果是否存在
        if (!stockEntityOptional.isPresent()) throw new RuntimeException("物品库存不存在");
        StockEntity stockEntity = stockEntityOptional.get();
        stockEntity.setAmount(stockEntity.getAmount() - amount);
        stockRepository.save(stockEntity);
       /* if(3==amount)//认为制造异常
             throw new RuntimeException("调用失败");*/
        return "success";
    }
    public void confirmRpc(Integer amount ) {
        System.out.println("tcc-confirm:"+amount);
    }
    public void cancelRpc(Integer amount) {
        System.out.println("回滚事务");
        Optional<StockEntity> stockEntityOptional = stockRepository.findById(1);
        //判断查询结果是否存在
        if (!stockEntityOptional.isPresent()) throw new RuntimeException("物品库存不存在");
        StockEntity stockEntity = stockEntityOptional.get();
        stockEntity.setAmount(stockEntity.getAmount() +amount);
        stockRepository.save(stockEntity);

    }
}
           

lcn-orders 的测试service:

这里我们使用lcn模式

可以用@LcnTransaction 也可以用@TxTransaction (该注解默认用的是lnc模式)

@Service
public class OrderService {
    @Autowired
    OrderRepository orderRepository;
    @Autowired
    StockClient stockClient;

    @Transactional(rollbackFor = Exception.class)
   //@TccTransaction//(propagation = DTXPropagation.SUPPORTS)
    @LcnTransaction 
   // @TxTransaction
    public String addOrder(Integer amount) {
        //增加 订单
        OrderEntity orderEntity = new OrderEntity();
        orderEntity.setOrderNo(new Date().getTime() + "");
        orderRepository.save(orderEntity);
        //人为模拟异常
        if(2==amount) throw new RuntimeException("不可执行的操作");//这里通常用的是 自定义异常

        //减少库存
       String result = stockClient.updateStock(amount);//
    if(3==amount)//认为制造异常
             throw new RuntimeException("调用失败");
    return "success";
    }
}
           

修改之后我们启动 eureka tm stock order 四个服务:

我们可以通过访问tm所在服务的端口进入展示tm详情页: http://127.0.0.1:8083 (8083是我的tm所在服务的端口) 根据自己设置的后台登录密码进行登录后 可以看到tm的详细信息 管理了多少个tc等;

接下来可以访问接口进行调用 下单 并减少库存 然后认为模拟异常 通过查询数据表可以知道 事务回滚了 只有下单成功的时候 ,库存才会按指定减少.

t_logger中会记录事务数据,可以进行查找;

测试可以是单机 线上推荐用集群

也可以通过zull 或者Nginx进行负载均衡的设置

[该篇只是应用,会继续进行分析tx-lcn的原理]

继续阅读