RabbitMQ进阶-消息确认机制之Confirm机制
文章目录
-
-
- RabbitMQ进阶-消息确认机制之Confirm机制
-
- 1.RabbitMQ可靠性
- 2.RabbitMQ通道Confirm模式
-
- 2.1 生产者Confirm概念
- 2.2 消费者Confirm概念
- 3.生产者Confirm实战
-
- 3.1 普通Confirm模式
-
- 3.1.1 普通Confirm模式实战
- 3.1.2 普通Confirm 流程
- 3.1.2 普通Confirm 结果分析
- 3.2 批量Confirm模式
-
- 3.2.1 批量Confirm模式实战
- 3.2.2 批量Confirm 消息流程
- 3.2.3 批量Confirm 结果分析
- 3.3 异步 Confirm模式
-
- 3.3.1 异步Confirm模式原理
- 3.3.2 异步Confirm模式实战
- 3.3.3 异步Confirm结果分析
- 4.生产者Confirm性能对比
-
1.RabbitMQ可靠性
如何保证消息成功发送?
- 当消息的生产者者发送消息后,消息到底有没有正确到达broker代理服务器呢?
- 什么情况下,能够让我们知道生产者生产的消息正确到达broker了?
- RabbitMQ如何保证消息的成功投递呢?
解决以上问题,RabbitMQ采用两种方式
- AMQP协议事务方式
- Channel配置Confirm模式
上篇文章,我们详细讲一下 RabbitMQ系列(十二)RabbitMQ进阶-消息确认机制之事务机制
有人可能会有疑问,既然我们已经通过AMQP的协议事务方式保证了消息可以可靠的到达Broker,为什么还要配置Channel通道Confirm呢?
采用事务机制实现会降低RabbitMQ的消息吞吐量,为了更加高效的解决可靠性,引入了Channel的Confirm模式,值得注意的AMQP的transaction事务模式的channel,不能再设置成confirm模式的,即这两种模式是不能共存的
下面详解一下Confirm模式
2.RabbitMQ通道Confirm模式
Channel的Confirm模式分为两种,分别为
- 生产者Produce端的Confirm模式
- 消费者Consumer端的Confirm模式
2.1 生产者Confirm概念
生产者将信道设置成confirm模式,有以下特征
- 一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始)
- 一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就保证生产者知道消息已经正确到达目的队列
- 如果消息和队列是可持久化的,那么broker的确认消息会将消息写入磁盘之后触发
- broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号唯一ID
- broker可以设置basic.ack的multiple域,表示到这个序列号Tag之前的所有消息都已经得到了处理。
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。
2.2 消费者Confirm概念
为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。我们会在 下面章节详细描述消费者Confirm概念
RabbitMQ管理平台界面上可以看到当前队列中Ready状态和Unacknowledged状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者但是未收到ack信号的消息数
3.生产者Confirm实战
客户端实现生产者confirm有三种编程方式:普通Confirm,多条消息串行,批量Confirm就是普通Confirm的升级,可以批量处理ACK消息,前两种Confirm都是靠// channel.waitConfirms阻塞方法,直到mq服务器确认,开始下一条或者下一批,而消息异步Confirm
-
普通Confirm模式:
每发送一条消息后,调用waitForConfirms()方法,等待服务器端Confirm。实际上是一种串行Confirm了,每publish一条消息之后就等待服务端Confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传;
-
批量Confirm模式
批量Confirm模式,每发送一批消息之后,调用waitForConfirms()方法,等待服务端Confirm,这种批量确认的模式极大的提高了Confirm效率,坏处是如果出现Confirm返回false或者超时的情况,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降;
-
异步Confirm模式
提供一个回调方法,服务端Confirm了一条或者多条消息后Client端会回调这个方法。
3.1 普通Confirm模式
普通confirm模式最简单,publish一条消息后,等待服务器端confirm
如果服务端返回false或者超时时间内未返回,客户端进行消息重传。本次只关注生产者,发送消息到Broker暂时不关注消费者
开启confirm模式,通过调用channel的confirmSelect方法将channel设置为confirm模式
3.1.1 普通Confirm模式实战
代码实战
package confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;
import java.time.LocalDate;
import java.time.LocalTime;
public class SingleConfirmProducer {
/**
* 队列名称
*/
public static final String QUEUE_TEST = "queue_test";
/**
* 队列RoutingKey
*/
public static final String RK_QUEUE_TEST = "rk.queue_test";
/**
* 生产 Direct直连 交换机的MQ消息
*/
public static void produceDirectMessage() throws Exception {
// 获取到连接以及mq通道
Connection connection = MqConnectUtil.getConnectionDefault();
// 从连接中创建通道
Channel channel = connection.createChannel();
/*声明 直连交换机 交换机 String exchange,
* 参数明细
* 1、交换机名称
* 2、交换机类型,direct
*/
channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());
/*声明队列
* 参数明细:
* 1、队列名称
* 2、是否持久化
* 3、是否独占此队列
* 4、队列不用是否自动删除
* 5、参数
*/
channel.queueDeclare(QUEUE_TEST, true, false, false, null);
/*交换机和队列绑定String queue, String exchange, String routingKey
* 参数明细
* 1、队列名称
* 2、交换机名称
* 3、路由key rk.subscribe_queue_direct
*/
channel.queueBind(QUEUE_TEST, ExchangeTypeEnum.DIRECT.getName(), RK_QUEUE_TEST);
// 定义发送者普通Confirm
channel.confirmSelect();
long start = System.currentTimeMillis();
//发送 3条消息
for (int i = 0; i < 3; i++) {
//定义消息内容(发布多条消息)
String message = "第 id=" + i + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();
/* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
* exchange - 交换机 DirectExchange
* queuename - 队列信息
* props - 参数信息
* message 消息体 byte[]类型
*/
channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), RK_QUEUE_TEST, null, message.getBytes());
//等待Confirm
if (channel.waitForConfirms()) {
System.out.println("接收到Broker Confirm确认,消息发送成功了");
} else {
System.out.println("消息发送失败,请尝试重发");
}
}
System.out.println("执行waitForConfirms耗费时间: " + (System.currentTimeMillis() - start) + "ms");
//关闭通道和连接
channel.close();
connection.close();
}
public static void main(String[] args) throws Exception {
produceDirectMessage();
}
}
发送消息后,我们在Channel.close()处,打个断点,看一下Channel的Confirm,可以看到 unConfirm数为0,表示所有消息都收到broker的Confirm了,确认3条消息花费了51ms
3.1.2 普通Confirm 流程
我们用WireShark抓包,看一下AMQP协议消息都干了啥,跟着数据包,我们梳理下流程
- Connection 建立连接
- Channel 开启通道
- 声明交换机Exchange
- 声明队列Queue
- 绑定队列 QueueBind
- Channel开启Confirm
- 发送消息1 Basic.Pulish
- 确认Confirm Basic.Ack
- 发送消息2 Basic.Pulish
- 确认Confirm Basic.Ack
- 发送消息3 Basic.Pulish
- 确认Confirm Basic.Ack
- Channel关闭
- Connect链接关闭
分别有以上14步,可以看出 如果调用了channel.confirmSelect方法,默认情况下是直接将no-wait设置成false的,也就是默认情况下broker是必须回传confirm.select-ok的,看下 Confirm.select 的报文中,设置了Nowait:False
3.1.2 普通Confirm 结果分析
从上图数据包中可以看到 第7步-第12步 分别开启了Confirm模式,每一条消息消息,都收到Confirm ACK 确认消息后,才开始下一条发送,所以普通Confirm模式归根到底是串行发送模式,发一条、确认一条、继续下一条,这种方式明显效率很低啊
3.2 批量Confirm模式
客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来publish 发送消息
然后等待服务器端confirm, 相比普通confirm模式,批量极大提升confirm效率
但是 一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且当消息经常丢失时,批量confirm性能不升反降,会拖垮整个MQ的处理效率
3.2.1 批量Confirm模式实战
批量Confirm与普通Confirm的区别在于 普通的模式是发一条 watiForConfirms,收到确认后发下一条,但是批量的就是发100条,然后等待确认watiForConfirms
package confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;
import java.time.LocalDate;
import java.time.LocalTime;
public class BatchConfirmProducer {
/**
* 队列名称
*/
public static final String QUEUE_TEST = "queue_test";
/**
* 队列RoutingKey
*/
public static final String RK_QUEUE_TEST = "rk.queue_test";
/**
* 生产 Direct直连 交换机的MQ消息
*/
public static void produceDirectMessage() throws Exception {
// 获取到连接以及mq通道
Connection connection = MqConnectUtil.getConnectionDefault();
// 从连接中创建通道
Channel channel = connection.createChannel();
/*声明 直连交换机 交换机 String exchange,
* 参数明细
* 1、交换机名称
* 2、交换机类型,direct
*/
channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());
/*声明队列
* 参数明细:
* 1、队列名称
* 2、是否持久化
* 3、是否独占此队列
* 4、队列不用是否自动删除
* 5、参数
*/
channel.queueDeclare(QUEUE_TEST, true, false, false, null);
/*交换机和队列绑定String queue, String exchange, String routingKey
* 参数明细
* 1、队列名称
* 2、交换机名称
* 3、路由key rk.subscribe_queue_direct
*/
channel.queueBind(QUEUE_TEST, ExchangeTypeEnum.DIRECT.getName(), RK_QUEUE_TEST);
// 定义发送者普通Confirm
channel.confirmSelect();
long start = System.currentTimeMillis();
//发送 3条消息
for (int i = 0; i < 10; i++) {
//定义消息内容(发布多条消息)
String message = "第 id=" + i + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();
/* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
* exchange - 交换机 DirectExchange
* queuename - 队列信息
* props - 参数信息
* message 消息体 byte[]类型
*/
channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), RK_QUEUE_TEST, null, message.getBytes());
}//批量发消息结束
//等待Broker Confirm
if (channel.waitForConfirms(1)) {
System.out.println("接收到Broker Confirm确认,消息发送成功了");
} else {
System.out.println("消息发送失败,请尝试重发");
}
System.out.println("执行waitForConfirms耗费时间: " + (System.currentTimeMillis() - start) + "ms");
//关闭通道和连接
channel.close();
connection.close();
}
public static void main(String[] args) throws Exception {
produceDirectMessage();
}
}
3.2.2 批量Confirm 消息流程
可以看到程序批量发送完消息,Confirm完毕
我们来看下WireShark 抓包,看下AMQP 消息经过了什么
前半部分一样,到Confirm.select开启事务
3.2.3 批量Confirm 结果分析
1. 一次性发了6条BasicPublish ,BasicAck一次
Tag:2 ,Multiple:true 表示 小于2的序列号的消息,一次性处理确认完毕,即处理了第 1,2 两条消息
2. 又发了1条第7条消息,BasicAck一次,第二次Ack
Tag:4 Multiple:true 表示 小于4的序列号的消息,一次性处理确认完毕,即处理了 第 3,4 两条消息
3. 又发了1条 第8条,BasicAck一次,第三次Ack
Tag:5 Multiple:false ,注意此处是False !!!!!
表示 序列号为5 的单条 消息,一次性处理确认完毕,即处理了第 5 条消息
4. 又发了1条 第9条,接连着 三次Ack
第4次ACK,Tag:7 Multiple:true, 即处理了第 6、7 条消息
第5次ACK,Tag:8 Multiple:false, 即处理了第8 条消息
第6次ACK,Tag:9 Multiple:false, 即处理了第9 条消息
第4次ACK 抓包
第5次ACK 抓包
第6次ACK 抓包
5. 又发了1条 第10条,BasicAck一次,第7次Ack
Tag:10 Multiple:false , 即处理了第10 条消息
至此~ 批量发送的10条消息 Confirm 处理完毕
批量ACK并没有什么规律,有时候 10条消息,可能1次ACK 就处理完了10条消息,有时候就像上面那样,处理了7次ACK才Confirm完10条消息
同样的程序,再抓包一次,可以看到只有1次ACK,一次性处理了10条消息的Confirm
3.3 异步 Confirm模式
3.3.1 异步Confirm模式原理
异步confirm模式的编程实现最复杂,原理如下:
- Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合
- 生产者每publish一条数据,集合中元素加1
- 每回调一次handleAck方法,我们维护的unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录,这块根据Multiple来决定批量还是单条删除确认的消息编号
- 我们维护的unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。
我们先看一下SDK中关于waitForConfirms的实现
3.3.2 异步Confirm模式实战
- 我们通过 Collections.synchronizedSortedSet(new TreeSet()); 新建了存储消息seqNO编号的有序Set集合
- 我们通过 设置 channel.addConfirmListener(new ConfirmListener() {} 来监听ACK或者NACK消息
- 我们通过 channel.getNextPublishSeqNo(); 获取消息唯一编号、加入到有序Set集合中
- handleNack、handleAck 的 long deliveryTag及boolean multiple 判断到底是否确认 单条或者小于Tag的多条消息
代码实战
package confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;
import java.io.IOException;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
public class AsyncConfirmProducer {
/**
* 异步Confirm队列
*/
public static final String ASYNC_QUEUE_NAME = "async_queue_test";
/**
* 异步Confirm队列 RoutingKey
*/
public static final String RK_ASYNC_QUEUE_NAME = "rk.async_queue_test";
public static void produceDirectMessage() throws Exception {
// 获取到连接以及mq通道
Connection connection = MqConnectUtil.getConnectionDefault();
// 从连接中创建通道
Channel channel = connection.createChannel();
/*声明 直连交换机 交换机 String exchange,
* 参数明细
* 1、交换机名称
* 2、交换机类型,direct
*/
channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());
/*声明队列
* 参数明细:
* 1、队列名称
* 2、是否持久化
* 3、是否独占此队列
* 4、队列不用是否自动删除
* 5、参数
*/
channel.queueDeclare(ASYNC_QUEUE_NAME, true, false, false, null);
/*交换机和队列绑定String queue, String exchange, String routingKey
* 参数明细
* 1、队列名称
* 2、交换机名称
* 3、路由key rk.subscribe_queue_direct
*/
channel.queueBind(ASYNC_QUEUE_NAME, ExchangeTypeEnum.DIRECT.getName(), RK_ASYNC_QUEUE_NAME);
// 定义发送者普通Confirm
channel.confirmSelect();
// 创建set
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
// 监听事务
channel.addConfirmListener(new ConfirmListener() {
// handleNack
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
System.out.println(" handleNack Multiple: " + multiple + " Batch ACK <= Tag:" + deliveryTag + " confirmSet中剩余" + confirmSet.toString());
} else {
confirmSet.remove(deliveryTag);
System.out.println("handleNack Multiple: " + multiple + " Single ACK Tag:" + deliveryTag + " confirmSet中剩余" + confirmSet.toString());
}
}
// 没有问题
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// TODO Auto-generated method stub
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
System.out.println("handleAck : " + multiple + " Batch ACK <= Tag:" + deliveryTag + " confirmSet中剩余" + confirmSet.toString());
} else {
confirmSet.remove(deliveryTag);
System.out.println("handleAck : " + multiple + " Single ACK Tag:" + deliveryTag + " confirmSet中剩余" + confirmSet.toString());
}
}
});
//发送 10条消息
for (int i = 0; i < 10; i++) {
//定义消息内容(发布多条消息)
String message = "第 id=" + i + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();
/* 发送消息 String exchange, String routingKey, BasicProperties props, byte[] body
* exchange - 交换机 DirectExchange
* queuename - 队列信息
* props - 参数信息
* message 消息体 byte[]类型
*/
//获取消息的唯一编号,要在发消息前获取,发送消息后,就会自动+1,是吓一跳消息
long seqNo = channel.getNextPublishSeqNo();
//发送消息
channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), RK_ASYNC_QUEUE_NAME, null, message.getBytes());
//!!!!此处获取是错误的,第一条 发送消息后就是这个的nextPublishSeqNo就是2了
//long seqNo1 = channel.getNextPublishSeqNo();
confirmSet.add(seqNo);
}//批量发消息结束
//此处休眠10s,方式还没有ACK消息,通道和链接已经关闭
Thread.sleep(10000);
//关闭通道和连接
channel.close();
connection.close();
}
public static void main(String[] args) throws Exception {
produceDirectMessage();
}
}
代码运行结果,可以消息是批量发出的,异步有三次ACK确认消息
乍一看,好像和批量一样,仔细想象一样么?
批量是我要收到上一批的确认消息,才能进行下一批
但是异步是我不管你上一批消息确认不确认,发送者我只管发送,失败了改重发的重发,该处理的处理,完全分离,消息的确认通过异步处理,这就是差别
3.3.3 异步Confirm结果分析
通过WireShark 抓包我们可以看到3次ACK
第一次ACK: Tag:4 Multiple:true, 即处理了第 1、2、3、4 条消息,confirmSet中剩余[5, 6, 7, 8, 9, 10]
第二次ACK: Tag:7 Multiple:true, 即处理了第 5、6、7 条消息,confirmSet中剩余[8, 9, 10]
第三次ACK: Tag:10 Multiple:true, 即处理了第8、9、10 条消息,confirmSet中没了,消息处理完毕
也验证了我们的日志,是正确的
至此生产者Confirm已经完毕,能够保证消息可以到达Broker,但是那消息如何确保可达消费者呢?这就需要消费者Confirm来处理了
4.生产者Confirm性能对比
我们做一下实验分别采用以下四种方式进行producer实验,比对各个模式下的发送性能,来发送1000条消息试一下。
- 事务模式
- 普通confirm模式
- 批量confirm模式
- 异步confirm模式
结果:
模式 | 消息条数 | 花费时间 |
---|---|---|
事务模式(tx) | 1000 | 461 ms |
普通confirm模式 | 1000 | 343 ms |
批量confirm模式(batch) | 1000 | 185ms |
异步confirm模式(async) | 1000 | 189 ms |
总结:
- 事务模式性能是最差的
- 普通confirm模式性能比事务模式稍微好点,远不如批量及异步
- 批量confirm模式和异步效率差不不大,但是批量Confirm模式的问题在于confirm之后返回false之后进行重发这样会使性能严重降低
- 异步confirm模式(async)编程模型较为复杂,但是重发效率应该比批量高
至此 ~ 生产者Confirm模式我们介绍完毕
下一篇我们介绍消费者的Confirm模式 RabbitMQ系列(十四)RabbitMQ进阶-消息确认机制之Confirm机制-消费者