重回队列
在消费端限流这篇博客中,我们讲了要使用
RabbitMQ
的
QOS
,就要把
autoAck
设置成
false
,这时我们就要进行手动
ack
了,不然就会有问题,但是之前的博客中我们只实践了手动
ack
,而没有实践过手动
no ack
,当消费端认为消息消费不成功时,便会应答
no ack
,以便说明这个消息我(消费端)没有消费成功,接下来就看RabbitMQ服务器怎么做了,是重新把该条消息放到队列尾部(也就是重回队列),再重新推送,还是不重新推送。
生产端
生产端代码和之前没有太大的变化,这里通过在
AMQP.BasicProperties
中设置
header
,来方便消费端实现
no ack
操作。
package com.kaven.rabbitmq.api.requeue;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Producer {
// 自己服务器的IP
private static String ip = "IP";
// RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一个 "/" 的虚拟主机
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// default exchange 的路由规则: routingKey(test) 将匹配同名的 queue(test)
private static String routingKey = "test";
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 创建Connection
Connection connection = connectionFactory.newConnection();
// 3 创建Channel
Channel channel = connection.createChannel();
// 4 发送消息
for (int i = 0; i < 5; i++) {
Map<String , Object> header = new HashMap<>();
header.put("num" , i);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(header)
.build();
String msg = "RabbitMQ: 重回队列 message" + i;
channel.basicPublish(exchange , routingKey , properties , msg.getBytes());
}
// 5 关闭连接
channel.close();
connection.close();
}
}
消费端
之前的篇章,使用
basicAck()
方法来实现消费端手动
ack
操作,这里我们增加
basicNack()
方法来实现消费端手动
no ack
操作,根据生产端投递过来的
AMQP.BasicProperties properties
中
header
的
key-value
来判断是
ack
还是
no ack
。
package com.kaven.rabbitmq.api.requeue;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class MyConsumer extends DefaultConsumer {
public Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("------------ consumer message -----------");
System.out.println("body:" + new String(body));
// 休眠两秒钟 , 使效果更明显
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)(properties.getHeaders().get("num")) == 0){
// multiple-批量处理为false , 重回队列为true
channel.basicNack(envelope.getDeliveryTag() , false , true);
} else{
// multiple-批量处理为false
channel.basicAck(envelope.getDeliveryTag() , false);
}
}
}
package com.kaven.rabbitmq.api.requeue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
// 自己服务器的IP
private static String ip = "IP";
// RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一个 "/" 的虚拟主机
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// 队列名
private static String queueName = "test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 创建Connection
Connection connection = connectionFactory.newConnection();
// 3 创建Channel
Channel channel = connection.createChannel();
// 4 创建Queue
channel.queueDeclare(queueName , true , false , false , null);
// 5 消费信息,autoAck要设置为false
channel.basicConsume(queueName , false , new MyConsumer(channel));
}
}
测试
启动生产端和消费端,来看看
RabbitMQ Management
,从下图两个红框中的信息我们可以得到,生产端先投递了
5
条消息,消费端进行消费消息,
ack
了
4
条消息,有
1
条消息
no ack
,这符合我们的预期。
我们再来看看消费端的输出,可以看出来RabbitMQ服务器一直在给消费端推送
RabbitMQ: 重回队列 message0
这条消息,其实消费端会一直输出,我只截取了一部分输出,因为RabbitMQ服务器一直让这条
no ack
的消息重回队列,并且一直推送这条消息给消费端。
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message1
------------ consumer message -----------
body:RabbitMQ: 重回队列 message2
------------ consumer message -----------
body:RabbitMQ: 重回队列 message3
------------ consumer message -----------
body:RabbitMQ: 重回队列 message4
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
现在我们来测试一下不重回队列的情况,修改
MyConsumer
类的代码如下:
// multiple-批量处理为false , 重回队列为false
channel.basicNack(envelope.getDeliveryTag() , false , false);
启动生产端和消费端,我们来看看
RabbitMQ Management
和消费端的输出。
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message1
------------ consumer message -----------
body:RabbitMQ: 重回队列 message2
------------ consumer message -----------
body:RabbitMQ: 重回队列 message3
------------ consumer message -----------
body:RabbitMQ: 重回队列 message4
/**
* Reject one or several received messages.
*
* Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
* @see com.rabbitmq.client.AMQP.Basic.Nack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to reject all messages up to and including
* the supplied delivery tag; false to reject just the supplied
* delivery tag.
* @param requeue true if the rejected message(s) should be requeued rather
* than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;