消費端限流
什麼是消費端限流
- 假設一個場景, 首先, 我們RabbitMQ伺服器有上萬條消息未處理的消息, 我們随機打開一個消費者用戶端, 會出現下面情況
- 巨量的消息瞬間全部推送過來, 但是我們單個用戶端無法同時處理這麼多資料
- RabbitMQ提供了一種Qos(服務品質保證)功能, 即在非自動确認消息的前提下, 如果一定數目的消息(通過基于consumer或者channel設定Qos的值)未被确認前, 不進行消費新的消息
- void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
- 參數解釋
- prefetchSize: 0
- prefetchCount: 會告訴RabbitMQ不要同時給一個消費者推送多餘N個消息, 即一旦有N個消息還沒有ACK, 則該consumer将block掉, 直到有消息ACK
- global: true\false 是否将上面設定應用于channel, 簡單點說, 就是上面限制是channel級别還是consumer級别
- 注意
- prefetchSize和global這兩項, rabbitmq沒有實作, 暫且不研究
- prefetch_count和no_ack=false的情況下生效, 即在自動應答的情況下這兩個值是不生效的
消費端限流代碼實作
幫助類新增函數
public static AMQP.Queue.DeclareOk queueDeclare(Channel channel, String queueName, boolean durable) throws IOException {
return channel.queueDeclare(queueName, durable, false, false, null);
}
消費者
package com.dance.redis.mq.rabbit.qos;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Receiver {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String queueName = "test001";
// durable 是否持久化消息
RabbitMQHelper.queueDeclare(channel,queueName,true);
channel.basicQos(0, 1, false);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.out.println("receive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, false, consumer);
// 等待回調函數執行完畢之後,關閉資源。
TimeUnit.SECONDS.sleep(50);
channel.close();
RabbitMQHelper.closeConnection();
}
}
生産者
package com.dance.redis.mq.rabbit.qos;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
public class Sender {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String queueName = "test001";
RabbitMQHelper.queueDeclare(channel, queueName, true);
Map<String, Object> headers = new HashMap<>();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers).build();
for (int i = 0; i < 5; i++) {
String msg = "Hello World RabbitMQ " + i;
channel.basicPublish("", queueName, props, msg.getBytes());
}
}
}
測試
啟動消費者
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5SN4MTOzMmNkJTN0QGM1ADNzYzXwIjNxADM4AzLcBTMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
啟動生産者
檢視消費者