极速解决企业级分布式事务之spingCloud+tx-lcn
https://blog.csdn.net/qq_33449307/article/details/102550878中我们实现了[springBoot+jpa+jta+atomikos实现分布式事务,模拟多数据源],这种方式基于2PC的,了解2PC,3PC会知道这些方式存在一些问题:
1.引入了中间件中心化协调者 操作复杂化 一旦中心协调者挂了 就会出现很多问题
2.互联网行业对数据的绝对一致性没有传统企业那么高(2PC 3PC 都是强一致性的)
3.性能问题
在互联网企业中,面向用户更需要的是短暂的响应时间,即 高可用,强一致性可以降低
常用的解决方案的实现:
1.避免分布式事务 (很难做到)
2.tcc事务(补偿机制 最终一致性)
3.MQ消息队列(最终一致性)
4.最大努力通知 利用回调函数去解决(比如:支付宝的支付)www.xxx.com/pay return “success”;//必须返回一个success给支付宝相关接口,否则会一直通知你,直到你告诉它成功了 ,它才不会通知你
回调的方式有 同步 异步 (查询掉单:orderId)
我们将要实现的springcloud集成tx-lcn 解决企业级分布式事务
tx-LCN
官网:http://www.txlcn.org/zh-cn/docs/preface.html 官网文档可以仔细阅读以下,方便了解tx-lcn
官网上给的demo分别用dubbo和springCloud两种方式集成的.常用dubbo的同学可以去研究一下官网demo.废话不多说 开始我们的应用.
[注意这里我们使用的是tx-lcn的5.0.2版本 5.0.x版本的跟4.x版本的 有很大区别,5.0.x版本的支持三种事务方式(默认三种,可自定义扩展)]
1. LCN模式是通过代理Connection的方式实现对本地事务的操作,然后在由TxManager统一协调控制事务。当本地事务提交回滚或者关闭连接时将会执行假操作,该代理的连接将由LCN连接池管理。
该模式对代码的嵌入性为低。
该模式仅限于本地存在连接对象且可通过连接对象控制事务的模块。
该模式下的事务提交与回滚是由本地事务方控制,对于数据一致性上有较高的保障。
该模式缺陷在于代理的连接需要随事务发起方一共释放连接,增加了连接占用的时间。
2. TCC事务机制相对于传统事务机制(X/Open XA Two-Phase-Commit),其特征在于它不依赖资源管理器(RM)对XA的支持,而是通过对(由业务系统提供的)业务逻辑的调度来实现分布式事务。主要由三步操作,Try: 尝试执行业务、 Confirm:确认执行业务、 Cancel: 取消执行业务。
该模式对代码的嵌入性高,要求每个业务需要写三种步骤的操作。
该模式对有无本地事务控制都可以支持使用面广。
数据一致性控制几乎完全由开发者控制,对业务开发难度要求高
3. TXC模式命名来源于淘宝,实现原理是在执行SQL之前,先查询SQL的影响数据,然后保存执行的SQL快走信息和创建锁。当需要回滚的时候就采用这些记录数据回滚数据库,目前锁实现依赖redis分布式锁控制
该模式同样对代码的嵌入性低。
该模式仅限于对支持SQL方式的模块支持。
该模式由于每次执行SQL之前需要先查询影响数据,因此相比LCN模式消耗资源与时间要多。
该模式不会占用数据库的连接资源。
具体可参考官网文档
1.搭建springCloud项目
正常搭建springCloud工程:
实现的工能是 下单操作----->>>>增加订单 减少库存
搭建工程:lcn-demo 子工程为 lcn-eureka 注册中心 / lcn-stock 库存服务 / lcn-orders 订单服务
在lcn-orders里利用feign调用lcn-stock 完成 增加订单并减少库存的操作
部分代码(只看service ,controller 与dao 自定义 本案例使用jpa操作数据库):
实体类,数据表可自行定义,注意数据表引擎 不要用MyISAM 该类型不支持事务 要用 InnoDB类型的
相关实体类:
OrderEntity:
@Entity
@Table(name = "orders",schema = "lcndemo")
@Data
public class OrderEntity {
@Id
@Column(name = "id")
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String orderNo;//订单号
}
StockEntity:
@Entity
@Table(name = "stock",schema = "lcndemo")
@Data
public class StockEntity {
@Id
@Column(name = "id")
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private Long amount;
}
lcn-stock 的测试service:
@Service
public class StockService {
@Autowired
StockRepository stockRepository;
@Transactional(rollbackFor = Exception.class)//本地事务
public String updateStock(Integer amount) {
Optional<StockEntity> stockEntityOptional = stockRepository.findById(1); //实体类可自行定义
//判断查询结果是否存在
if (!stockEntityOptional.isPresent()) throw new RuntimeException("物品库存不存在");
StockEntity stockEntity = stockEntityOptional.get();
stockEntity.setAmount(stockEntity.getAmount() - amount);
stockRepository.save(stockEntity);
if(3==amount)//人为制造异常
throw new RuntimeException("调用失败");
return "success";
}
lcn-orders 的测试service:
@Service
public class OrderService {
@Autowired
OrderRepository orderRepository;
@Autowired
StockClient stockClient; //feign调用的lcn-stock
@Transactional(rollbackFor = Exception.class)
public String addOrder(Integer amount) {
//增加 订单
OrderEntity orderEntity = new OrderEntity(); //这里是实体类 自
orderEntity.setOrderNo(new Date().getTime() + "");
orderRepository.save(orderEntity);
//人为模拟异常
if(2==amount) throw new RuntimeException("不可执行的操作");//这里通常用的是 自定义异常
//减少库存
stockClient.updateStock(amount);
return "success";
}
}
这里可以人为制造异常.
当amount=2时
if(2==amount) throw new RuntimeException("不可执行的操作");
本地事务会回滚 ,不会有问题
当amount=3时
if(3==amount) throw new RuntimeException("调用失败");
此时已经已经保存过了order
orderRepository.save(orderEntity);
lcn-stock里的事务回滚 但是 保存的order不会回滚,此时就会 order增加,但库存没有减少,事务回滚异常.
解决办法:引入tx-lcn
依赖:redis(这里你要有redis(测试可以是单机,线上用集群))
搭建TxManager服务lcn-tm (官网下载的源码里的搭建TxManager服务中有很多类和配置,过于繁琐,所以自己搭建更好一点):
在我们的父pom.xml中引入tx-lcn的依赖:
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-tc</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-tm</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-txmsg-netty</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
搭建的lcn-tm的pom.xml依赖:
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-tm</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
启动类 增加注解**@EnableTransactionManagerServer**:
@SpringBootApplication
@EnableDiscoveryClient //注册到Eureka
@EnableTransactionManagerServer //指定是tm服务
public class TransactionManagerApplication {
public static void main(String[] args) {
SpringApplication.run(TransactionManagerApplication.class, args);
}
}
配置文件:
在resource目录中新建文件 application.properties (一定是要用.properties 文件) 再新建一个文件application-tm.yml (也可以不建,全写到 application.properties 中,我这里觉得.properties太麻烦 所以不必须卸载 application.properties 中的 我把它写在yml文件中)
解释一下为什么一定要用application.properties文件:
properties比yml优先级别高,这里如果我们不要 application.properties 而是用yml文件,启动时就会默认加载com.codingapi.txlcn:txlcn-tm下的application.properties导致自定义的配置文件无效
application.properties:都是一些常规设置 数据库相关配置 jpa相关配置
server.port=8083
spring.application.name=TransactionManager
spring.profiles.active=tm //激活application-tm.yml文件 启动时会读取该文件
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://39.105.157.149:3306/lcndemo
spring.datasource.username=root
spring.datasource.password=123456
spring.jpa.hibernate.ddl-auto=none
spring.jpa.show-sql=true
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect
spring.jpa.properties.hibernate.format_sql=true
eureka.client.registry-fetch-interval-seconds=5
eureka.client.service-url.defaultZone=http://localhost:8080/eureka/
eureka.instance.preferIpAddress=true
eureka.instance.instance-id=${spring.application.name}:${spring.cloud.client.ip-address}:${spring.application.instance_id:${server.port}}
eureka.instance.lease-renewal-interval-in-seconds=5
eureka.instance.lease-expiration-duration-in-seconds=10
application-tm.yml:tm的相关配置
#tx-lcn 相关配置
tx-lcn:
manager:
host: 127.0.0.1 #TM监听ip 默认为:127.0.0.1
port: 8070 #TM监听socket端口 默认为${server.port}-100
heart-time: 300000 #心跳检测时间(ms) 默认为300000
dtx-time: 8000 #分布式事务执行总时间ms,默认为36000
concurrent-level: 128 #事务处理并发等级,默认为机器逻辑核心数5倍
admin-key: 123456 #tm后台默认登录密码 可以不设置 默认为codingapi
dtx-lock-time: ${tx-lcn.manager.dtx-time} #分布式事务锁超时时间 默认为-1,当-1时会用tx-lcn.manager.dtx-time的时间
message:
netty:
attr-delay-time: ${tx-lcn.manager.dtx-time} #参数延迟删除时间单位ms 默认为dtx-time值
logger:
enabled: true #开启事务
driver-class-name: ${spring.datasource.driver-class-name}
jdbc-url: ${spring.datasource.url}
username: ${spring.datasource.username}
password: ${spring.datasource.password}
#redis信息 线上请使用redis Cluster
spring:
redis:
host: 39.105.157.149 #redis地址ip
port: 6379 #端口
password: xxxx #密码
至此tm搭建完毕;线上的时候可以搭建tm集群
修改lcn-order 和lcn-stock:
在每个微服务的项目引入依赖:
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-tc</artifactId>
</dependency>
<dependency>
<groupId>com.codingapi.txlcn</groupId>
<artifactId>txlcn-txmsg-netty</artifactId>
</dependency>
每个微服务的启动类上增加注解:@EnableDistributedTransaction 表示是tx-lcn的客户端
修改配置文件application.yml 在每个微服务的该文件中增加配置 spring.profiles.active: tc
在每个微服务的resource目录中新建文件application-tc.yml文件,文件内容:
application-tc.yml:
#配置tx-lnc相关
## tx-manager 配置
tx-lcn:
ribbon:
loadbalancer:
dtx:
enabled: true #是否启动LCN负载均衡策略(优化选项,开启与否,功能不受影响)
client:
# tx-manager 的配置地址,可以指定TM集群中的任何一个或多个地址
# tx-manager 下集群策略,每个TC都会从始至终<断线重连>与TM集群保持集群大小个连接。
# TM方,每有TM进入集群,会找到所有TC并通知其与新TM建立连接。
# TC方,启动时按配置与集群建立连接,成功后,会再与集群协商,查询集群大小并保持与所有TM的连接
manager-address: 127.0.0.1:8070如果tm是集群 这里可以配置集群 格式用都好隔开 跟eureka客户端配置eureka集群时一样
chain-level: 3 # 调用链长度等级,默认值为3(优化选项。系统中每个请求大致调用链平均长度,估算值。)
# tm-rpc-timeout: 2000 #该参数为tc与tm通讯时的最大超时时间,单位ms。该参数不需要配置会在连接初始化时由tm返回。
# dtx-time: 8000 # 该参数为分布式事务的最大时间,单位ms。该参数不允许TC方配置,会在连接初始化时由tm返回。
#machine-id: 1 #该参数为雪花算法的机器编号,所有TC不能相同。该参数不允许配置,会在连接初始化时由tm返回。
dtx-aspect-order: 0 #该参数为事务方法注解切面的orderNumber,默认值为0.
resource-order: 0 #该参数为事务连接资源方法切面的orderNumber,默认值为0.
#aspect:
# log:
# file-path: logs/.txlcn/lcn-stock # 该参数是分布式事务框架存储的业务切面信息。采用的是h2数据库。绝对路径。该参数默认的值为{user.dir}/.txlcn/{application.name}-{application.port}
logger:
enabled: true #开启事务 开启之后 会自动在对应的库中创建一张表:t_logger 用来记录事务日志
driver-class-name: ${spring.datasource.driver-class-name}
jdbc-url: ${spring.datasource.url}
username: ${spring.datasource.username}
password: ${spring.datasource.password}
修改对应的service:
lcn-stock 的测试service:
开头说的三种模式分别对应的注解是:
@TccTransaction //tcc模式
@LcnTransaction //lcn模式
@TxcTransaction //txc模式
这里我们使用tcc模式做测试:tcc模式 是补偿机制,保证数据最终一致性,所以需要我们自己写补偿机制 ,既是提交方法confirmRpc(我在原本的逻辑中用jpa已经把事务提交了) 和回滚方法cancelRpc (根据自己的需要进行回滚)
注解中的:propagation = DTXPropagation.SUPPORTS 表示是事务参与方(非发起者) 如果不是事务发起端 一定要配置(默认的表示事务发起端),否则会报错
@Service
public class StockService {
@Autowired
StockRepository stockRepository;
@Transactional(rollbackFor = Exception.class)//本地事务
@TccTransaction(propagation = DTXPropagation.SUPPORTS,cancelMethod = "cancelRpc",confirmMethod = "confirmRpc")//事务参与方
// @LcnTransaction //lcn模式
// @TxcTransaction //txc模式
public String updateStock(Integer amount) {
Optional<StockEntity> stockEntityOptional = stockRepository.findById(1);
//判断查询结果是否存在
if (!stockEntityOptional.isPresent()) throw new RuntimeException("物品库存不存在");
StockEntity stockEntity = stockEntityOptional.get();
stockEntity.setAmount(stockEntity.getAmount() - amount);
stockRepository.save(stockEntity);
/* if(3==amount)//认为制造异常
throw new RuntimeException("调用失败");*/
return "success";
}
public void confirmRpc(Integer amount ) {
System.out.println("tcc-confirm:"+amount);
}
public void cancelRpc(Integer amount) {
System.out.println("回滚事务");
Optional<StockEntity> stockEntityOptional = stockRepository.findById(1);
//判断查询结果是否存在
if (!stockEntityOptional.isPresent()) throw new RuntimeException("物品库存不存在");
StockEntity stockEntity = stockEntityOptional.get();
stockEntity.setAmount(stockEntity.getAmount() +amount);
stockRepository.save(stockEntity);
}
}
lcn-orders 的测试service:
这里我们使用lcn模式
可以用@LcnTransaction 也可以用@TxTransaction (该注解默认用的是lnc模式)
@Service
public class OrderService {
@Autowired
OrderRepository orderRepository;
@Autowired
StockClient stockClient;
@Transactional(rollbackFor = Exception.class)
//@TccTransaction//(propagation = DTXPropagation.SUPPORTS)
@LcnTransaction
// @TxTransaction
public String addOrder(Integer amount) {
//增加 订单
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderNo(new Date().getTime() + "");
orderRepository.save(orderEntity);
//人为模拟异常
if(2==amount) throw new RuntimeException("不可执行的操作");//这里通常用的是 自定义异常
//减少库存
String result = stockClient.updateStock(amount);//
if(3==amount)//认为制造异常
throw new RuntimeException("调用失败");
return "success";
}
}
修改之后我们启动 eureka tm stock order 四个服务:
我们可以通过访问tm所在服务的端口进入展示tm详情页: http://127.0.0.1:8083 (8083是我的tm所在服务的端口) 根据自己设置的后台登录密码进行登录后 可以看到tm的详细信息 管理了多少个tc等;
接下来可以访问接口进行调用 下单 并减少库存 然后认为模拟异常 通过查询数据表可以知道 事务回滚了 只有下单成功的时候 ,库存才会按指定减少.
t_logger中会记录事务数据,可以进行查找;
测试可以是单机 线上推荐用集群
也可以通过zull 或者Nginx进行负载均衡的设置
[该篇只是应用,会继续进行分析tx-lcn的原理]