天天看点

RabbitMQ:消费端限流

消费端限流

为什么要对消费端限流

假设一个场景,首先,我们RabbitMQ服务器上积压了有上万条未处理的消息,我们随便打开一个消费端,巨量的消息就会瞬间全部推送过来,但是我们单个消费端是无法同时处理这么多消息的。

当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,我们无法约束生产端,这是用户的行为。所以我们应该对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。

怎么实现消费端限流

​RabbitMQ​

​​给我们提供了​

​QOS​

​​ (服务质量保证)功能,即在非自动确认消息的前提下(​

​autoAck 要设置为false​

​​),如果一定数目的消息未被​

​ack​

​​前,​

​RabbitMQ​

​服务器不会推送新的消息给消费端。

/**
     * Request specific "quality of service" settings.
     *
     * These settings impose limits on the amount of data the server
     * will deliver to consumers before requiring acknowledgements.
     * Thus they provide a means of consumer-initiated flow control.
     * @see com.rabbitmq.client.AMQP.Basic.Qos
     * @param prefetchSize maximum amount of content (measured in
     * octets) that the server will deliver, 0 if unlimited
     * @param prefetchCount maximum number of messages that the server
     * will deliver, 0 if unlimited
     * @param global true if the settings should be applied to the
     * entire channel rather than each consumer
     * @throws java.io.IOException if an error is encountered
     */
    void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;      
  • ​prefetchSize​

    ​​:对单条消息大小进行限制,​

    ​0​

    ​代表不限制。
  • ​prefetchCount​

    ​​:一次性消费的消息数量,告诉​

    ​RabbitMQ​

    ​​服务器不要同时给消费端推送多于​

    ​N​

    ​​个消息,即一旦有​

    ​N​

    ​​个消息还没有 ​

    ​ack​

    ​​,​

    ​RabbitMQ​

    ​​服务器不会推送新的消息给该​

    ​consumer​

    ​​,直到有消息被​

    ​ack​

    ​。
  • ​global​

    ​​:​

    ​true​

    ​​则将上面的设置应用于​

    ​channel​

    ​​级别,​

    ​false​

    ​​则将上面的设置应用于​

    ​consumer​

    ​​级别。当我们设置为 ​

    ​false​

    ​​ 的时候生效,设置为 ​

    ​true​

    ​​ 的时候没有了限流功能,因为​

    ​channel​

    ​级别的限流尚未实现。

我们既然要使用消费端限流,我们需要将​

​autoAck​

​​设置为​

​false​

​​,我们还要设置具体的限流规则,如限制单条消息大小、一次性消费消息数量、限流应用的级别。并且在消费端的​

​handleDelivery()​

​​方法中要手动 ​

​ack​

​,不然会有问题。

生产端

package com.kaven.rabbitmq.api.qos;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    // 自己服务器的IP
    private static String ip = "IP";
    // RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一个 "/" 的虚拟主机
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // default exchange 的路由规则: routingKey(test) 将匹配同名的 queue(test)
    private static String routingKey = "test";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 创建Connection
        Connection connection = connectionFactory.newConnection();

        // 3 创建Channel
        Channel channel = connection.createChannel();


        // 4 发送消息
        for (int i = 0; i < 5; i++) {
            String msg = "RabbitMQ: qos message" + i;
            channel.basicPublish(exchange , routingKey , null , msg.getBytes());
        }

        // 5 关闭连接
        channel.close();
        connection.close();
    }
}      

消费端

package com.kaven.rabbitmq.api.qos;


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer {

    public Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("------------ consumer message -----------");
        System.out.println("consumerTag:" + consumerTag);
        System.out.println("envelope:" + envelope);
        System.out.println("properties:" + properties);
        System.out.println("body:" + new String(body));

        channel.basicAck(envelope.getDeliveryTag() , false);
    }
}      
channel.basicAck(envelope.getDeliveryTag() , false);      

上面这一行是重点,因为我们对消费端进行限流了,使用了​

​RabbitMQ​

​​的​

​QOS​

​​,所以​

​autoAck = false​

​​了,所以我们需要自己手动​

​ack​

​​,这里设置批量处理​

​ack​

​​回应为​

​false​

​​,如果没有上面这一行,消费端就不能对已经接受到的消息发送​

​ack​

​​了,​

​RabbitMQ​

​服务器也就不会推送下一个消息给消费端,这样就会一直阻塞住。

package com.kaven.rabbitmq.api.qos;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    // 自己服务器的IP
    private static String ip = "IP";
    // RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一个 "/" 的虚拟主机
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // 队列名
    private static String queueName = "test";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 创建Connection
        Connection connection = connectionFactory.newConnection();

        // 3 创建Channel
        Channel channel = connection.createChannel();

        // 4 创建Queue
        channel.queueDeclare(queueName , true , false , false , null);

        // 5 限流方式
        // 0-说明对消息大小不限制 , 1-一次性最多消费1条消息 ,false-设置consumer级别而不是channel级别
        channel.basicQos(0 , 1 ,false);
        // autoAck一定要设置为false
        channel.basicConsume(queueName , false , new MyConsumer(channel));
    }
}      

测试

我们先把​

​MyConsumer​

​类下面这一行注释掉。

channel.basicAck(envelope.getDeliveryTag() , false);      

启动生产端和消费端后,看看​

​RabbitMQ Management​

​​,总共有​

​5​

​​条消息,有​

​4​

​​条消息准备好了,有​

​1​

​​条消息没有​

​ack​

​​,因为我们把手动​

​ack​

​​这一行代码给注释掉了,消费端消费了第一条消息后,并不会​

​ack​

​​这条消息,所以​

​RabbitMQ​

​服务器也就不会再推送消息给该消费端了,也就这样一直阻塞住。

RabbitMQ:消费端限流

从消费端的输出也可以看出来,消费端只消费了一条消息,其他消息并没有推送过来(当然因为消费端自己没有手动​

​ack​

​,消费端背锅)。

RabbitMQ:消费端限流

现在我们把注释的代码解除注释,再来进行测试。启动生产端和消费端,看看​

​RabbitMQ Management​

​,发现消息全部被消费端消费了。

RabbitMQ:消费端限流
------------ consumer message -----------
consumerTag:amq.ctag-arloOvWU7MWzv94vK_WkHg
envelope:Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: qos message0
------------ consumer message -----------
consumerTag:amq.ctag-arloOvWU7MWzv94vK_WkHg
envelope:Envelope(deliveryTag=2, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: qos message1
------------ consumer message -----------
consumerTag:amq.ctag-arloOvWU7MWzv94vK_WkHg
envelope:Envelope(deliveryTag=3, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: qos message2
------------ consumer message -----------
consumerTag:amq.ctag-arloOvWU7MWzv94vK_WkHg
envelope:Envelope(deliveryTag=4, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: qos message3
------------ consumer message -----------
consumerTag:amq.ctag-arloOvWU7MWzv94vK_WkHg
envelope:Envelope(deliveryTag=5, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: qos message4