文章目录
- 1:事务消息原理
- 2:代码开发
-
- 1:启动rocketmq
- 2:producer代码开发
-
- 1:pom
- 2:创建消息发送者-TransactionProducer
- 3:创建执行本地事务和事务状态回查-OrderTransactionListener
- 4:代码分析
- 3:consumer
-
- 1:创建消息监听者
- 2:创建消息监听业务
- 3:死信队列处理
1:事务消息原理
分布式事务之可靠消息最终一致性通常使用rocketmq的事务消息机制来实现
Half Message,半消息
暂时不能被 Consumer消费的消息。Producer已经把消息发送到 Broker端,但是此消息的状态被标记为不能投递,处于这种状态下的消息称为半消息。事实上,该状态下的消息会被放在一个叫做 RMQ_SYS_TRANS_HALF_TOPIC的主题下。
当 Producer端对它二次确认后,也就是 Commit之后,Consumer端才可以消费到;那么如果是Rollback,该消息则会被删除,永远不会被消费到。
事务状态回查
我们想,可能会因为网络原因、应用问题等,导致Producer端一直没有对这个半消息进行确认,那么这时候 Broker服务器会定时扫描这些半消息,主动找Producer端查询该消息的状态。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLzYDO1EmY4MWYiBjZkBDZiBzY4QDZ5MGZxIDN2UTY4U2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
其大致流程如下
1、Producer 发送事务消息
Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。
本例中,Producer 发送 ”更改订单状态“ 到MQ Server。
2、MQ Server回应消息发送成功
MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。
3、Producer 执行本地事务
Producer 端执行业务代码逻辑,通过本地数据库事务控制。
本例中,Producer 执行账户余额扣除。
4、消息投递
若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将”更改订单状态“ 状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息;
若Producer 本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删除”更改订单状态“消息 。
MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。
5、事务回查
如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。
6.在回查机制中,如果Producer经过检查发现自己本地业务完成,那么想MQ Server发送commit标记,MQ Server收到后将此消息更改为可消费。如果接到Producer的rollback标识,则将此消息删除
2:代码开发
1:启动rocketmq
1:启动rocketmq的nameserver
rocketmq-al1-4.5.0-bin-release\rocketmq-al1-4.5.0-bin-release\bin>start mqnamesrv.cmd
2:启动broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
2:producer代码开发
1:pom
!--rocket mq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
2:创建消息发送者-TransactionProducer
通过 RocketMQ发送消息,需先创建一个消息发送者。值得注意的是,如果发送事务消息,在这里我们的创建的实例必须是 TransactionMQProducer。
package com.mashibing.producer.config;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
public class TransactionProducer {
private String producerGroup = "order_trans_group";
// 事务消息
private TransactionMQProducer producer;
//用于执行本地事务和事务状态回查的监听器
@Autowired
OrderTransactionListener orderTransactionListener;
//执行任务的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));
@PostConstruct
public void init(){
producer = new TransactionMQProducer(producerGroup);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setSendMsgTimeout(Integer.MAX_VALUE);
producer.setExecutorService(executor);
producer.setTransactionListener(orderTransactionListener);
this.start();
}
private void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
//事务消息发送
public TransactionSendResult send(String data, String topic) throws MQClientException {
Message message = new Message(topic,data.getBytes());
return this.producer.sendMessageInTransaction(message, null);
}
}
3:创建执行本地事务和事务状态回查-OrderTransactionListener
package com.mashibing.producer.config;
import com.alibaba.fastjson.JSONObject;
import com.mashibing.producer.dao.TransactionLogDao;
import com.mashibing.producer.entity.OrderBase;
import com.mashibing.producer.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class OrderTransactionListener implements TransactionListener {
@Autowired
OrderService orderService;
@Autowired
TransactionLogDao transactionLogDao;
/**
* 发送half msg 返回send ok后调用的方法
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
log.info("开始执行本地事务....");
LocalTransactionState state;
try{
String body = new String(message.getBody());
OrderBase order = JSONObject.parseObject(body, OrderBase.class);
orderService.createOrder(order,message.getTransactionId());
// 返回commit后,消息能被消费者消费
state = LocalTransactionState.COMMIT_MESSAGE;
// state = LocalTransactionState.ROLLBACK_MESSAGE;
// state = LocalTransactionState.UNKNOW;
// TimeUnit.MINUTES.sleep(1);
log.info("本地事务已提交。{}",message.getTransactionId());
}catch (Exception e){
log.info("执行本地事务失败。{}",e);
state = LocalTransactionState.ROLLBACK_MESSAGE;
}
return state;
}
/**
* 回查 走的方法
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
// 回查多次失败 人工补偿。提醒人。发邮件的。
log.info("开始回查本地事务状态。{}",messageExt.getTransactionId());
LocalTransactionState state;
String transactionId = messageExt.getTransactionId();
if (transactionLogDao.selectCount(transactionId)>0){
state = LocalTransactionState.COMMIT_MESSAGE;
}else {
state = LocalTransactionState.UNKNOW;
}
log.info("结束本地事务状态查询:{}",state);
return state;
}
}
4:代码分析
在通过 producer.sendMessageInTransaction发送事务消息后,如果消息发送成功,就会调用到这里的executeLocalTransaction方法,来执行本地事务。在这里,它会完成订单数据和事务日志的插入。
该方法返回值 LocalTransactionState 代表本地事务状态,它是一个枚举类。
public enum LocalTransactionState {
//提交事务消息,消费者可以看到此消息
COMMIT_MESSAGE,
//回滚事务消息,消费者不会看到此消息
ROLLBACK_MESSAGE,
//事务未知状态,需要调用事务状态回查,确定此消息是提交还是回滚
UNKNOW;
}
那么, checkLocalTransaction 方法就是用于事务状态查询。在这里,我们通过事务ID查询transaction_log这张表,如果可以查询到结果,就提交事务消息;如果没有查询到,就返回未知状态。
注意,这里还涉及到另外一个问题。如果是返回未知状态,RocketMQ Broker服务器会以1分钟的间隔时间不断回查,直至达到事务回查最大检测数,如果超过这个数字还未查询到事务状态,则回滚此消息。
当然,事务回查的频率和最大次数,我们都可以配置。在 Broker 端,可以通过这样来配置它:
brokerConfig.setTransactionCheckInterval(10000); //回查频率10秒一次
brokerConfig.setTransactionCheckMax(3); //最大检测次数为3
3:consumer
1:创建消息监听者
package com.mashibing.producer.compent;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class Consumer {
String consumerGroup = "consumer-group";
DefaultMQPushConsumer consumer;
@Autowired
OrderListener orderListener;
@PostConstruct
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("order","*");
consumer.registerMessageListener(orderListener);
// 2次失败 就进死信队列
consumer.setMaxReconsumeTimes(2);
consumer.start();
}
}
2:创建消息监听业务
package com.mashibing.producer.compent;
import com.alibaba.fastjson.JSONObject;
import com.mashibing.producer.entity.OrderBase;
import com.mashibing.producer.service.PointsService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class OrderListener implements MessageListenerConcurrently {
@Autowired
PointsService pointsService;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
log.info("消费者线程监听到消息。");
try{
System.out.println(1/0);
for (MessageExt message:list) {
log.info("开始处理订单数据,准备增加积分....");
OrderBase order = JSONObject.parseObject(message.getBody(), OrderBase.class);
pointsService.increasePoints(order);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
log.error("处理消费者数据发生异常。{}",e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
3:死信队列处理
因为rocketmq消费者默认消费次数为16次,如果到达16次后还没有被消费,就会进入死信队列;这时候我们可以再写一个监听者,对死信队列进行监听,如果死信队列中有了数值,那我们可以对此情况进行发邮件等消息通知,然后人工干预;
死信队列名:%DLQ%consumer-group