重回隊列
在消費端限流這篇部落格中,我們講了要使用
RabbitMQ
的
QOS
,就要把
autoAck
設定成
false
,這時我們就要進行手動
ack
了,不然就會有問題,但是之前的部落格中我們隻實踐了手動
ack
,而沒有實踐過手動
no ack
,當消費端認為消息消費不成功時,便會應答
no ack
,以便說明這個消息我(消費端)沒有消費成功,接下來就看RabbitMQ伺服器怎麼做了,是重新把該條消息放到隊列尾部(也就是重回隊列),再重新推送,還是不重新推送。
生産端
生産端代碼和之前沒有太大的變化,這裡通過在
AMQP.BasicProperties
中設定
header
,來友善消費端實作
no ack
操作。
package com.kaven.rabbitmq.api.requeue;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Producer {
// 自己伺服器的IP
private static String ip = "IP";
// RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一個 "/" 的虛拟主機
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// default exchange 的路由規則: routingKey(test) 将比對同名的 queue(test)
private static String routingKey = "test";
public static void main(String[] args) throws IOException, TimeoutException {
// 1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 建立Connection
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
// 4 發送消息
for (int i = 0; i < 5; i++) {
Map<String , Object> header = new HashMap<>();
header.put("num" , i);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(header)
.build();
String msg = "RabbitMQ: 重回隊列 message" + i;
channel.basicPublish(exchange , routingKey , properties , msg.getBytes());
}
// 5 關閉連接配接
channel.close();
connection.close();
}
}
消費端
之前的篇章,使用
basicAck()
方法來實作消費端手動
ack
操作,這裡我們增加
basicNack()
方法來實作消費端手動
no ack
操作,根據生産端投遞過來的
AMQP.BasicProperties properties
中
header
的
key-value
來判斷是
ack
還是
no ack
。
package com.kaven.rabbitmq.api.requeue;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class MyConsumer extends DefaultConsumer {
public Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("------------ consumer message -----------");
System.out.println("body:" + new String(body));
// 休眠兩秒鐘 , 使效果更明顯
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)(properties.getHeaders().get("num")) == 0){
// multiple-批量處理為false , 重回隊列為true
channel.basicNack(envelope.getDeliveryTag() , false , true);
} else{
// multiple-批量處理為false
channel.basicAck(envelope.getDeliveryTag() , false);
}
}
}
package com.kaven.rabbitmq.api.requeue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
// 自己伺服器的IP
private static String ip = "IP";
// RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一個 "/" 的虛拟主機
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// 隊列名
private static String queueName = "test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 建立Connection
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
// 4 建立Queue
channel.queueDeclare(queueName , true , false , false , null);
// 5 消費資訊,autoAck要設定為false
channel.basicConsume(queueName , false , new MyConsumer(channel));
}
}
測試
啟動生産端和消費端,來看看
RabbitMQ Management
,從下圖兩個紅框中的資訊我們可以得到,生産端先投遞了
5
條消息,消費端進行消費消息,
ack
了
4
條消息,有
1
條消息
no ack
,這符合我們的預期。
我們再來看看消費端的輸出,可以看出來RabbitMQ伺服器一直在給消費端推送
RabbitMQ: 重回隊列 message0
這條消息,其實消費端會一直輸出,我隻截取了一部分輸出,因為RabbitMQ伺服器一直讓這條
no ack
的消息重回隊列,并且一直推送這條消息給消費端。
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message1
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message2
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message3
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message4
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
現在我們來測試一下不重回隊列的情況,修改
MyConsumer
類的代碼如下:
// multiple-批量處理為false , 重回隊列為false
channel.basicNack(envelope.getDeliveryTag() , false , false);
啟動生産端和消費端,我們來看看
RabbitMQ Management
和消費端的輸出。
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message1
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message2
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message3
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message4
/**
* Reject one or several received messages.
*
* Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
* @see com.rabbitmq.client.AMQP.Basic.Nack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to reject all messages up to and including
* the supplied delivery tag; false to reject just the supplied
* delivery tag.
* @param requeue true if the rejected message(s) should be requeued rather
* than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;