天天看點

15-RabbitMQ進階特性-消費端限流

消費端限流

什麼是消費端限流

  • 假設一個場景, 首先, 我們RabbitMQ伺服器有上萬條消息未處理的消息, 我們随機打開一個消費者用戶端, 會出現下面情況
  • 巨量的消息瞬間全部推送過來, 但是我們單個用戶端無法同時處理這麼多資料
  • RabbitMQ提供了一種Qos(服務品質保證)功能, 即在非自動确認消息的前提下, 如果一定數目的消息(通過基于consumer或者channel設定Qos的值)未被确認前, 不進行消費新的消息
  • void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
  • 參數解釋
  • prefetchSize: 0
  • prefetchCount: 會告訴RabbitMQ不要同時給一個消費者推送多餘N個消息, 即一旦有N個消息還沒有ACK, 則該consumer将block掉, 直到有消息ACK
  • global: true\false 是否将上面設定應用于channel, 簡單點說, 就是上面限制是channel級别還是consumer級别
  • 注意
  • prefetchSize和global這兩項, rabbitmq沒有實作, 暫且不研究
  • prefetch_count和no_ack=false的情況下生效, 即在自動應答的情況下這兩個值是不生效的

消費端限流代碼實作

幫助類新增函數

public static AMQP.Queue.DeclareOk queueDeclare(Channel channel, String queueName, boolean durable) throws IOException {
    return channel.queueDeclare(queueName, durable, false, false, null);
}      

消費者

package com.dance.redis.mq.rabbit.qos;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
 
public class Receiver {
 
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String queueName = "test001";  
        //    durable 是否持久化消息
        RabbitMQHelper.queueDeclare(channel,queueName,true);
        channel.basicQos(0, 1, false);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                System.out.println("receive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 參數:隊列名稱、是否自動ACK、Consumer
        channel.basicConsume(queueName, false, consumer);
        // 等待回調函數執行完畢之後,關閉資源。
        TimeUnit.SECONDS.sleep(50);
        channel.close();
        RabbitMQHelper.closeConnection();
    }
}      

生産者

package com.dance.redis.mq.rabbit.qos;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class Sender {


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String queueName = "test001";
        RabbitMQHelper.queueDeclare(channel, queueName, true);
        Map<String, Object> headers = new HashMap<>();
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .headers(headers).build();
        for (int i = 0; i < 5; i++) {
            String msg = "Hello World RabbitMQ " + i;
            channel.basicPublish("", queueName, props, msg.getBytes());
        }
    }

}      

測試

啟動消費者

15-RabbitMQ進階特性-消費端限流

啟動生産者

15-RabbitMQ進階特性-消費端限流

檢視消費者