消費端限流
為什麼要對消費端限流
假設一個場景,首先,我們RabbitMQ伺服器上積壓了有上萬條未處理的消息,我們随便打開一個消費端,巨量的消息就會瞬間全部推送過來,但是我們單個消費端是無法同時處理這麼多消息的。
當資料量特别大的時候,我們對生産端限流肯定是不科學的,因為有時候并發量就是特别大,有時候并發量又特别少,我們無法限制生産端,這是使用者的行為。是以我們應該對消費端限流,用于保持消費端的穩定,當消息數量激增的時候很有可能造成資源耗盡,以及影響服務的性能,導緻系統的卡頓甚至直接崩潰。
怎麼實作消費端限流
RabbitMQ
給我們提供了
QOS
(服務品質保證)功能,即在非自動确認消息的前提下(
autoAck 要設定為false
),如果一定數目的消息未被
ack
前,
RabbitMQ
伺服器不會推送新的消息給消費端。
/**
* Request specific "quality of service" settings.
*
* These settings impose limits on the amount of data the server
* will deliver to consumers before requiring acknowledgements.
* Thus they provide a means of consumer-initiated flow control.
* @see com.rabbitmq.client.AMQP.Basic.Qos
* @param prefetchSize maximum amount of content (measured in
* octets) that the server will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
-
:對單條消息大小進行限制,prefetchSize
代表不限制。0
-
:一次性消費的消息數量,告訴prefetchCount
伺服器不要同時給消費端推送多于RabbitMQ
個消息,即一旦有N
個消息還沒有 N
,ack
伺服器不會推送新的消息給該RabbitMQ
,直到有消息被consumer
。ack
-
:global
則将上面的設定應用于true
級别,channel
則将上面的設定應用于false
級别。當我們設定為 consumer
的時候生效,設定為 false
的時候沒有了限流功能,因為true
級别的限流尚未實作。channel
我們既然要使用消費端限流,我們需要将
autoAck
設定為
false
,我們還要設定具體的限流規則,如限制單條消息大小、一次性消費消息數量、限流應用的級别。并且在消費端的
handleDelivery()
方法中要手動
ack
,不然會有問題。
生産端
package com.kaven.rabbitmq.api.qos;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
// 自己伺服器的IP
private static String ip = "IP";
// RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一個 "/" 的虛拟主機
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// default exchange 的路由規則: routingKey(test) 将比對同名的 queue(test)
private static String routingKey = "test";
public static void main(String[] args) throws IOException, TimeoutException {
// 1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 建立Connection
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
// 4 發送消息
for (int i = 0; i < 5; i++) {
String msg = "RabbitMQ: qos message" + i;
channel.basicPublish(exchange , routingKey , null , msg.getBytes());
}
// 5 關閉連接配接
channel.close();
connection.close();
}
}
消費端
package com.kaven.rabbitmq.api.qos;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class MyConsumer extends DefaultConsumer {
public Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("------------ consumer message -----------");
System.out.println("consumerTag:" + consumerTag);
System.out.println("envelope:" + envelope);
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
}
channel.basicAck(envelope.getDeliveryTag() , false);
上面這一行是重點,因為我們對消費端進行限流了,使用了
RabbitMQ
的
QOS
,是以
autoAck = false
了,是以我們需要自己手動
ack
,這裡設定批量處理
ack
回應為
false
,如果沒有上面這一行,消費端就不能對已經接受到的消息發送
ack
了,
RabbitMQ
伺服器也就不會推送下一個消息給消費端,這樣就會一直阻塞住。
package com.kaven.rabbitmq.api.qos;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
// 自己伺服器的IP
private static String ip = "IP";
// RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一個 "/" 的虛拟主機
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// 隊列名
private static String queueName = "test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 建立Connection
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
// 4 建立Queue
channel.queueDeclare(queueName , true , false , false , null);
// 5 限流方式
// 0-說明對消息大小不限制 , 1-一次性最多消費1條消息 ,false-設定consumer級别而不是channel級别
channel.basicQos(0 , 1 ,false);
// autoAck一定要設定為false
channel.basicConsume(queueName , false , new MyConsumer(channel));
}
}
測試
我們先把
MyConsumer
類下面這一行注釋掉。
channel.basicAck(envelope.getDeliveryTag() , false);
啟動生産端和消費端後,看看
RabbitMQ Management
,總共有
5
條消息,有
4
條消息準備好了,有
1
條消息沒有
ack
,因為我們把手動
ack
這一行代碼給注釋掉了,消費端消費了第一條消息後,并不會
ack
這條消息,是以
RabbitMQ
伺服器也就不會再推送消息給該消費端了,也就這樣一直阻塞住。

從消費端的輸出也可以看出來,消費端隻消費了一條消息,其他消息并沒有推送過來(當然因為消費端自己沒有手動
ack
,消費端背鍋)。
現在我們把注釋的代碼解除注釋,再來進行測試。啟動生産端和消費端,看看
RabbitMQ Management
,發現消息全部被消費端消費了。
------------ consumer message -----------
consumerTag:amq.ctag-arloOvWU7MWzv94vK_WkHg
envelope:Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: qos message0
------------ consumer message -----------
consumerTag:amq.ctag-arloOvWU7MWzv94vK_WkHg
envelope:Envelope(deliveryTag=2, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: qos message1
------------ consumer message -----------
consumerTag:amq.ctag-arloOvWU7MWzv94vK_WkHg
envelope:Envelope(deliveryTag=3, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: qos message2
------------ consumer message -----------
consumerTag:amq.ctag-arloOvWU7MWzv94vK_WkHg
envelope:Envelope(deliveryTag=4, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: qos message3
------------ consumer message -----------
consumerTag:amq.ctag-arloOvWU7MWzv94vK_WkHg
envelope:Envelope(deliveryTag=5, redeliver=false, exchange=, routingKey=test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: qos message4