天天看點

RabbitMQ:消費端限流

消費端限流

為什麼要對消費端限流

假設一個場景,首先,我們RabbitMQ伺服器上積壓了有上萬條未處理的消息,我們随便打開一個消費端,巨量的消息就會瞬間全部推送過來,但是我們單個消費端是無法同時處理這麼多消息的。

當資料量特别大的時候,我們對生産端限流肯定是不科學的,因為有時候并發量就是特别大,有時候并發量又特别少,我們無法限制生産端,這是使用者的行為。是以我們應該對消費端限流,用于保持消費端的穩定,當消息數量激增的時候很有可能造成資源耗盡,以及影響服務的性能,導緻系統的卡頓甚至直接崩潰。

怎麼實作消費端限流

​RabbitMQ​

​​給我們提供了​

​QOS​

​​ (服務品質保證)功能,即在非自動确認消息的前提下(​

​autoAck 要設定為false​

​​),如果一定數目的消息未被​

​ack​

​​前,​

​RabbitMQ​

​伺服器不會推送新的消息給消費端。

/**
     * Request specific "quality of service" settings.
     *
     * These settings impose limits on the amount of data the server
     * will deliver to consumers before requiring acknowledgements.
     * Thus they provide a means of consumer-initiated flow control.
     * @see com.rabbitmq.client.AMQP.Basic.Qos
     * @param prefetchSize maximum amount of content (measured in
     * octets) that the server will deliver, 0 if unlimited
     * @param prefetchCount maximum number of messages that the server
     * will deliver, 0 if unlimited
     * @param global true if the settings should be applied to the
     * entire channel rather than each consumer
     * @throws java.io.IOException if an error is encountered
     */
    void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;      
  • ​prefetchSize​

    ​​:對單條消息大小進行限制,​

    ​0​

    ​代表不限制。
  • ​prefetchCount​

    ​​:一次性消費的消息數量,告訴​

    ​RabbitMQ​

    ​​伺服器不要同時給消費端推送多于​

    ​N​

    ​​個消息,即一旦有​

    ​N​

    ​​個消息還沒有 ​

    ​ack​

    ​​,​

    ​RabbitMQ​

    ​​伺服器不會推送新的消息給該​

    ​consumer​

    ​​,直到有消息被​

    ​ack​

    ​。
  • ​global​

    ​​:​

    ​true​

    ​​則将上面的設定應用于​

    ​channel​

    ​​級别,​

    ​false​

    ​​則将上面的設定應用于​

    ​consumer​

    ​​級别。當我們設定為 ​

    ​false​

    ​​ 的時候生效,設定為 ​

    ​true​

    ​​ 的時候沒有了限流功能,因為​

    ​channel​

    ​級别的限流尚未實作。

我們既然要使用消費端限流,我們需要将​

​autoAck​

​​設定為​

​false​

​​,我們還要設定具體的限流規則,如限制單條消息大小、一次性消費消息數量、限流應用的級别。并且在消費端的​

​handleDelivery()​

​​方法中要手動 ​

​ack​

​,不然會有問題。

生産端

package com.kaven.rabbitmq.api.qos;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // default exchange 的路由規則: routingKey(test) 将比對同名的 queue(test)
    private static String routingKey = "test";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立Connection
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();


        // 4 發送消息
        for (int i = 0; i < 5; i++) {
            String msg = "RabbitMQ: qos message" + i;
            channel.basicPublish(exchange , routingKey , null , msg.getBytes());
        }

        // 5 關閉連接配接
        channel.close();
        connection.close();
    }
}      

消費端

package com.kaven.rabbitmq.api.qos;


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer {

    public Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("------------ consumer message -----------");
        System.out.println("consumerTag:" + consumerTag);
        System.out.println("envelope:" + envelope);
        System.out.println("properties:" + properties);
        System.out.println("body:" + new String(body));

        channel.basicAck(envelope.getDeliveryTag() , false);
    }
}      
channel.basicAck(envelope.getDeliveryTag() , false);      

上面這一行是重點,因為我們對消費端進行限流了,使用了​

​RabbitMQ​

​​的​

​QOS​

​​,是以​

​autoAck = false​

​​了,是以我們需要自己手動​

​ack​

​​,這裡設定批量處理​

​ack​

​​回應為​

​false​

​​,如果沒有上面這一行,消費端就不能對已經接受到的消息發送​

​ack​

​​了,​

​RabbitMQ​

​伺服器也就不會推送下一個消息給消費端,這樣就會一直阻塞住。

package com.kaven.rabbitmq.api.qos;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // 隊列名
    private static String queueName = "test";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立Connection
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();

        // 4 建立Queue
        channel.queueDeclare(queueName , true , false , false , null);

        // 5 限流方式
        // 0-說明對消息大小不限制 , 1-一次性最多消費1條消息 ,false-設定consumer級别而不是channel級别
        channel.basicQos(0 , 1 ,false);
        // autoAck一定要設定為false
        channel.basicConsume(queueName , false , new MyConsumer(channel));
    }
}      

測試

我們先把​

​MyConsumer​

​類下面這一行注釋掉。

channel.basicAck(envelope.getDeliveryTag() , false);      

啟動生産端和消費端後,看看​

​RabbitMQ Management​

​​,總共有​

​5​

​​條消息,有​

​4​

​​條消息準備好了,有​

​1​

​​條消息沒有​

​ack​

​​,因為我們把手動​

​ack​

​​這一行代碼給注釋掉了,消費端消費了第一條消息後,并不會​

​ack​

​​這條消息,是以​

​RabbitMQ​

​伺服器也就不會再推送消息給該消費端了,也就這樣一直阻塞住。

RabbitMQ:消費端限流

從消費端的輸出也可以看出來,消費端隻消費了一條消息,其他消息并沒有推送過來(當然因為消費端自己沒有手動​

​ack​

​,消費端背鍋)。

RabbitMQ:消費端限流

現在我們把注釋的代碼解除注釋,再來進行測試。啟動生産端和消費端,看看​

​RabbitMQ Management​

​,發現消息全部被消費端消費了。

RabbitMQ:消費端限流
------------ consumer message -----------
consumerTag:amq.ctag-arloOvWU7MWzv94vK_WkHg
envelope:Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: qos message0
------------ consumer message -----------
consumerTag:amq.ctag-arloOvWU7MWzv94vK_WkHg
envelope:Envelope(deliveryTag=2, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: qos message1
------------ consumer message -----------
consumerTag:amq.ctag-arloOvWU7MWzv94vK_WkHg
envelope:Envelope(deliveryTag=3, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: qos message2
------------ consumer message -----------
consumerTag:amq.ctag-arloOvWU7MWzv94vK_WkHg
envelope:Envelope(deliveryTag=4, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: qos message3
------------ consumer message -----------
consumerTag:amq.ctag-arloOvWU7MWzv94vK_WkHg
envelope:Envelope(deliveryTag=5, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: qos message4