天天看點

RabbitMQ:重回隊列

重回隊列

在​​消費端限流​​這篇部落格中,我們講了要使用​

​RabbitMQ​

​​的​

​QOS​

​​,就要把​

​autoAck​

​​設定成​

​false​

​​,這時我們就要進行手動​

​ack​

​​了,不然就會有問題,但是之前的部落格中我們隻實踐了手動​

​ack​

​​,而沒有實踐過手動​

​no ack​

​​ ,當消費端認為消息消費不成功時,便會應答​

​no ack​

​,以便說明這個消息我(消費端)沒有消費成功,接下來就看RabbitMQ伺服器怎麼做了,是重新把該條消息放到隊列尾部(也就是重回隊列),再重新推送,還是不重新推送。

RabbitMQ:重回隊列

生産端

生産端代碼和之前沒有太大的變化,這裡通過在​

​AMQP.BasicProperties​

​​中設定​

​header​

​​,來友善消費端實作​

​no ack​

​操作。

package com.kaven.rabbitmq.api.requeue;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class Producer {
    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // default exchange 的路由規則: routingKey(test) 将比對同名的 queue(test)
    private static String routingKey = "test";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立Connection
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();


        // 4 發送消息
        for (int i = 0; i < 5; i++) {

            Map<String , Object> header = new HashMap<>();
            header.put("num" , i);

            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(header)
                    .build();


            String msg = "RabbitMQ: 重回隊列 message" + i;
            channel.basicPublish(exchange , routingKey , properties , msg.getBytes());
        }

        // 5 關閉連接配接
        channel.close();
        connection.close();
    }
}      

消費端

之前的篇章,使用​

​basicAck()​

​​方法來實作消費端手動​

​ack​

​​操作,這裡我們增加​

​basicNack()​

​​方法來實作消費端手動​

​no ack​

​​操作,根據生産端投遞過來的​

​AMQP.BasicProperties properties​

​​中​

​header​

​​的​

​key-value​

​​來判斷是​

​ack​

​​還是​

​no ack​

​。

package com.kaven.rabbitmq.api.requeue;


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer {

    public Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("------------ consumer message -----------");
        System.out.println("body:" + new String(body));

        // 休眠兩秒鐘 , 使效果更明顯
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if((Integer)(properties.getHeaders().get("num")) == 0){
            // multiple-批量處理為false , 重回隊列為true
            channel.basicNack(envelope.getDeliveryTag() , false , true);
        } else{
            // multiple-批量處理為false
            channel.basicAck(envelope.getDeliveryTag() , false);
        }
    }
}      
package com.kaven.rabbitmq.api.requeue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // 隊列名
    private static String queueName = "test";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立Connection
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();

        // 4 建立Queue
        channel.queueDeclare(queueName , true , false , false , null);


        // 5 消費資訊,autoAck要設定為false
        channel.basicConsume(queueName , false , new MyConsumer(channel));
    }
}      

測試

啟動生産端和消費端,來看看​

​RabbitMQ Management​

​​,從下圖兩個紅框中的資訊我們可以得到,生産端先投遞了​

​5​

​​條消息,消費端進行消費消息,​

​ack​

​​了​

​4​

​​條消息,有​

​1​

​​條消息​

​no ack​

​,這符合我們的預期。

RabbitMQ:重回隊列

我們再來看看消費端的輸出,可以看出來RabbitMQ伺服器一直在給消費端推送​

​RabbitMQ: 重回隊列 message0​

​​這條消息,其實消費端會一直輸出,我隻截取了一部分輸出,因為RabbitMQ伺服器一直讓這條​

​no ack​

​的消息重回隊列,并且一直推送這條消息給消費端。

------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message1
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message2
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message3
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message4
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0      

現在我們來測試一下不重回隊列的情況,修改​

​MyConsumer​

​類的代碼如下:

// multiple-批量處理為false , 重回隊列為false
            channel.basicNack(envelope.getDeliveryTag() , false , false);      

啟動生産端和消費端,我們來看看​

​RabbitMQ Management​

​和消費端的輸出。

RabbitMQ:重回隊列
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message0
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message1
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message2
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message3
------------ consumer message -----------
body:RabbitMQ: 重回隊列 message4      
/**
     * Reject one or several received messages.
     *
     * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
     * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
     * @see com.rabbitmq.client.AMQP.Basic.Nack
     * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
     * @param multiple true to reject all messages up to and including
     * the supplied delivery tag; false to reject just the supplied
     * delivery tag.
     * @param requeue true if the rejected message(s) should be requeued rather
     * than discarded/dead-lettered
     * @throws java.io.IOException if an error is encountered
     */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue)
            throws IOException;