雲栖号資訊:【 點選檢視更多行業資訊】
在這裡您可以找到不同行業的第一手的上雲資訊,還在等什麼,快來!
消費端限流
- 為什麼要對消費端限流
假設一個場景,首先,我們 Rabbitmq 伺服器積壓了有上萬條未處理的消息,我們随便打開一個消費者用戶端,會出現這樣情況: 巨量的消息瞬間全部推送過來,但是我們單個用戶端無法同時處理這麼多資料!
當資料量特别大的時候,我們對生産端限流肯定是不科學的,因為有時候并發量就是特别大,有時候并發量又特别少,我們無法限制生産端,這是使用者的行為。是以我們應該對消費端限流,用于保持消費端的穩定,當消息數量激增的時候很有可能造成資源耗盡,以及影響服務的性能,導緻系統的卡頓甚至直接崩潰。
2.限流的 api 講解
RabbitMQ 提供了一種 qos (服務品質保證)功能,即在非自動确認消息的前提下,如果一定數目的消息(通過基于 consume 或者 channel 設定 Qos 的值)未被确認前,不進行消費新的消息。
/**
* Request specific "quality of service" settings.
* These settings impose limits on the amount of data the server
* will deliver to consumers before requiring acknowledgements.
* Thus they provide a means of consumer-initiated flow control.
* @param prefetchSize maximum amount of content (measured in
* octets) that the server will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
- prefetchSize:0,單條消息大小限制,0代表不限制
- prefetchCount:一次性消費的消息數量。會告訴 RabbitMQ 不要同時給一個消費者推送多于 N 個消息,即一旦有 N 個消息還沒有 ack,則該 consumer 将 block 掉,直到有消息 ack。
- global:true、false 是否将上面設定應用于 channel,簡單點說,就是上面限制是 channel 級别的還是 consumer 級别。當我們設定為 false 的時候生效,設定為 true 的時候沒有了限流功能,因為 channel 級别尚未實作。
- 注意:prefetchSize 和 global 這兩項,rabbitmq 沒有實作,暫且不研究。特别注意一點,prefetchCount 在 no_ask=false 的情況下才生效,即在自動應答的情況下這兩個值是不生效的。
3.如何對消費端進行限流
- 首先第一步,我們既然要使用消費端限流,我們需要關閉自動 ack,将 autoAck 設定為 falsechannel.basicConsume(queueName, false, consumer);
- 第二步我們來設定具體的限流大小以及數量。channel.basicQos(0, 15, false);
- 第三步在消費者的 handleDelivery 消費方法中手動 ack,并且設定批量處理 ack 回應為 truechannel.basicAck(envelope.getDeliveryTag(), true);
這是生産端代碼,與前幾章的生産端代碼沒有做任何改變,主要的操作集中在消費端。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class QosProducer {
public static void main(String[] args) throws Exception {
//1. 建立一個 ConnectionFactory 并進行設定
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2. 通過連接配接工廠來建立連接配接
Connection connection = factory.newConnection();
//3. 通過 Connection 來建立 Channel
Channel channel = connection.createChannel();
//4. 聲明
String exchangeName = "test_qos_exchange";
String routingKey = "item.add";
//5. 發送
String msg = "this is qos msg";
for (int i = 0; i < 10; i++) {
String tem = msg + " : " + i;
channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());
System.out.println("Send message : " + tem);
}
//6. 關閉連接配接
channel.close();
connection.close();
}
}
這裡我們建立一個消費者,通過以下代碼來驗證限流效果以及 global 參數設定為 true 時不起作用.。我們通過Thread.sleep(5000); 來讓 ack 即處理消息的過程慢一些,這樣我們就可以從背景管理工具中清晰觀察到限流情況。
import com.rabbitmq.client.*;
import java.io.IOException;
public class QosConsumer {
public static void main(String[] args) throws Exception {
//1. 建立一個 ConnectionFactory 并進行設定
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2. 通過連接配接工廠來建立連接配接
Connection connection = factory.newConnection();
//3. 通過 Connection 來建立 Channel
final Channel channel = connection.createChannel();
//4. 聲明
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.basicQos(0, 3, false);
//一般不用代碼綁定,在管理界面手動綁定
channel.queueBind(queueName, exchangeName, routingKey);
//5. 建立消費者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("[x] Received '" + message + "'");
channel.basicAck(envelope.getDeliveryTag(), true);
}
};
//6. 設定 Channel 消費者綁定隊列
channel.basicConsume(queueName, false, consumer);
channel.basicConsume(queueName, false, consumer1);
}
}
我們從下圖中發現 Unacked值一直都是 3 ,每過 5 秒 消費一條消息即 Ready 和 Total 都減少 3,而 Unacked的值在這裡代表消費者正在處理的消息,通過我們的實驗發現了消費者一次性最多處理 3 條消息,達到了消費者限流的預期功能。

當我們将void basicQos(int prefetchSize, int prefetchCount, boolean global)中的 global 設定為 true的時候我們發現并沒有了限流的作用。
TTL
TTL是Time To Live的縮寫,也就是生存時間。RabbitMQ支援消息的過期時間,在消息發送時可以進行指定。
RabbitMQ支援隊列的過期時間,從消息入隊列開始計算,隻要超過了隊列的逾時時間配置,那麼消息會自動的清除。
這與 Redis 中的過期時間概念類似。我們應該合理使用 TTL 技術,可以有效的處理過期垃圾消息,進而降低伺服器的負載,最大化的發揮伺服器的性能。
RabbitMQ allows you to set TTL (time to live) for both messages and queues. This can be done using optional queue arguments or policies (the latter option is recommended). Message TTL can be enforced for a single queue, a group of queues or applied for individual messages.
RabbitMQ允許您為消息和隊列設定TTL(生存時間)。 這可以使用可選的隊列參數或政策來完成(建議使用後一個選項)。 可以對單個隊列,一組隊列強制執行消息TTL,也可以為單個消息應用消息TTL。
——摘自 RabbitMQ 官方文檔
1.消息的 TTL
我們在生産端發送消息的時候可以在 properties 中指定 expiration屬性來對消息過期時間進行設定,機關為毫秒(ms)。
/**
* deliverMode 設定為 2 的時候代表持久化消息
* expiration 意思是設定消息的有效期,超過10秒沒有被消費者接收後會被自動删除
* headers 自定義的一些屬性
* */
//5. 發送
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("myhead1", "111");
headers.put("myhead2", "222");
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("100000")
.headers(headers)
.build();
String msg = "test message";
channel.basicPublish("", queueName, properties, msg.getBytes());
我們也可以背景管理頁面中進入 Exchange 發送消息指定expiration
2.隊列的 TTL
我們也可以在背景管理界面中新增一個 queue,建立時可以設定 ttl,對于隊列中超過該時間的消息将會被移除。
死信隊列
死信隊列:沒有被及時消費的消息存放的隊列
消息沒有被及時消費的原因:
- a.消息被拒絕(basic.reject/ basic.nack)并且不再重新投遞 requeue=false
- b.TTL(time-to-live) 消息逾時未消費
- c.達到最大隊列長度
實作死信隊列步驟
- 首先需要設定死信隊列的 exchange 和 queue,然後進行綁定:
Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: # 代表接收所有路由 key
- 然後我們進行正常聲明交換機、隊列、綁定,隻不過我們需要在普通隊列加上一個參數即可: arguments.put("x-dead-letter-exchange",' dlx.exchange' )
- 這樣消息在過期、requeue失敗、 隊列在達到最大長度時,消息就可以直接路由到死信隊列!
.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DlxProducer {
public static void main(String[] args) throws Exception {
//設定連接配接以及建立 channel 湖綠
String exchangeName = "test_dlx_exchange";
String routingKey = "item.update";
String msg = "this is dlx msg";
//我們設定消息過期時間,10秒後再消費 讓消息進入死信隊列
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.expiration("10000")
.build();
channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
System.out.println("Send message : " + msg);
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class DlxConsumer {
public static void main(String[] args) throws Exception {
//建立連接配接、建立channel忽略 内容可以在上面代碼中擷取
String exchangeName = "test_dlx_exchange";
String queueName = "test_dlx_queue";
String routingKey = "item.#";
//必須設定參數到 arguments 中
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
//将 arguments 放入隊列的聲明中
channel.queueDeclare(queueName, true, false, false, arguments);
//一般不用代碼綁定,在管理界面手動綁定
channel.queueBind(queueName, exchangeName, routingKey);
//聲明死信隊列
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
//路由鍵為 # 代表可以路由到所有消息
channel.queueBind("dlx.queue", "dlx.exchange", "#");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//6. 設定 Channel 消費者綁定隊列
channel.basicConsume(queueName, true, consumer);
}
}
總結
DLX也是一個正常的 Exchange,和一般的 Exchange 沒有差別,它能在任何的隊列上被指定,實際上就是設定某個隊列的屬性。當這個隊列中有死信時,RabbitMQ 就會自動的将這個消息重新釋出到設定的 Exchange 上去,進而被路由到另一個隊列。可以監聽這個隊列中消息做相應的處理。
【雲栖号線上課堂】每天都有産品技術專家分享!
課程位址:
https://yqh.aliyun.com/zhibo立即加入社群,與專家面對面,及時了解課程最新動态!
【雲栖号線上課堂 社群】
https://c.tb.cn/F3.Z8gvnK
原文釋出時間:2020-06-11
本文作者: 網際網路架構師
本文來自:“
網際網路架構師 微信公衆号”,了解相關資訊可以關注“
網際網路架構師”