天天看点

RabbitMQ:重回队列

重回队列

在​​消费端限流​​这篇博客中,我们讲了要使用​

​RabbitMQ​

​​的​

​QOS​

​​,就要把​

​autoAck​

​​设置成​

​false​

​​,这时我们就要进行手动​

​ack​

​​了,不然就会有问题,但是之前的博客中我们只实践了手动​

​ack​

​​,而没有实践过手动​

​no ack​

​​ ,当消费端认为消息消费不成功时,便会应答​

​no ack​

​,以便说明这个消息我(消费端)没有消费成功,接下来就看RabbitMQ服务器怎么做了,是重新把该条消息放到队列尾部(也就是重回队列),再重新推送,还是不重新推送。

RabbitMQ:重回队列

生产端

生产端代码和之前没有太大的变化,这里通过在​

​AMQP.BasicProperties​

​​中设置​

​header​

​​,来方便消费端实现​

​no ack​

​操作。

package com.kaven.rabbitmq.api.requeue;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class Producer {
    // 自己服务器的IP
    private static String ip = "IP";
    // RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一个 "/" 的虚拟主机
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // default exchange 的路由规则: routingKey(test) 将匹配同名的 queue(test)
    private static String routingKey = "test";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 创建Connection
        Connection connection = connectionFactory.newConnection();

        // 3 创建Channel
        Channel channel = connection.createChannel();


        // 4 发送消息
        for (int i = 0; i < 5; i++) {

            Map<String , Object> header = new HashMap<>();
            header.put("num" , i);

            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(header)
                    .build();


            String msg = "RabbitMQ: 重回队列 message" + i;
            channel.basicPublish(exchange , routingKey , properties , msg.getBytes());
        }

        // 5 关闭连接
        channel.close();
        connection.close();
    }
}      

消费端

之前的篇章,使用​

​basicAck()​

​​方法来实现消费端手动​

​ack​

​​操作,这里我们增加​

​basicNack()​

​​方法来实现消费端手动​

​no ack​

​​操作,根据生产端投递过来的​

​AMQP.BasicProperties properties​

​​中​

​header​

​​的​

​key-value​

​​来判断是​

​ack​

​​还是​

​no ack​

​。

package com.kaven.rabbitmq.api.requeue;


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer {

    public Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("------------ consumer message -----------");
        System.out.println("body:" + new String(body));

        // 休眠两秒钟 , 使效果更明显
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if((Integer)(properties.getHeaders().get("num")) == 0){
            // multiple-批量处理为false , 重回队列为true
            channel.basicNack(envelope.getDeliveryTag() , false , true);
        } else{
            // multiple-批量处理为false
            channel.basicAck(envelope.getDeliveryTag() , false);
        }
    }
}      
package com.kaven.rabbitmq.api.requeue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    // 自己服务器的IP
    private static String ip = "IP";
    // RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一个 "/" 的虚拟主机
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // 队列名
    private static String queueName = "test";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 创建Connection
        Connection connection = connectionFactory.newConnection();

        // 3 创建Channel
        Channel channel = connection.createChannel();

        // 4 创建Queue
        channel.queueDeclare(queueName , true , false , false , null);


        // 5 消费信息,autoAck要设置为false
        channel.basicConsume(queueName , false , new MyConsumer(channel));
    }
}      

测试

启动生产端和消费端,来看看​

​RabbitMQ Management​

​​,从下图两个红框中的信息我们可以得到,生产端先投递了​

​5​

​​条消息,消费端进行消费消息,​

​ack​

​​了​

​4​

​​条消息,有​

​1​

​​条消息​

​no ack​

​,这符合我们的预期。

RabbitMQ:重回队列

我们再来看看消费端的输出,可以看出来RabbitMQ服务器一直在给消费端推送​

​RabbitMQ: 重回队列 message0​

​​这条消息,其实消费端会一直输出,我只截取了一部分输出,因为RabbitMQ服务器一直让这条​

​no ack​

​的消息重回队列,并且一直推送这条消息给消费端。

------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message1
------------ consumer message -----------
body:RabbitMQ: 重回队列 message2
------------ consumer message -----------
body:RabbitMQ: 重回队列 message3
------------ consumer message -----------
body:RabbitMQ: 重回队列 message4
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0      

现在我们来测试一下不重回队列的情况,修改​

​MyConsumer​

​类的代码如下:

// multiple-批量处理为false , 重回队列为false
            channel.basicNack(envelope.getDeliveryTag() , false , false);      

启动生产端和消费端,我们来看看​

​RabbitMQ Management​

​和消费端的输出。

RabbitMQ:重回队列
------------ consumer message -----------
body:RabbitMQ: 重回队列 message0
------------ consumer message -----------
body:RabbitMQ: 重回队列 message1
------------ consumer message -----------
body:RabbitMQ: 重回队列 message2
------------ consumer message -----------
body:RabbitMQ: 重回队列 message3
------------ consumer message -----------
body:RabbitMQ: 重回队列 message4      
/**
     * Reject one or several received messages.
     *
     * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
     * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
     * @see com.rabbitmq.client.AMQP.Basic.Nack
     * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
     * @param multiple true to reject all messages up to and including
     * the supplied delivery tag; false to reject just the supplied
     * delivery tag.
     * @param requeue true if the rejected message(s) should be requeued rather
     * than discarded/dead-lettered
     * @throws java.io.IOException if an error is encountered
     */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue)
            throws IOException;