天天看點

php消費rabbitmq消息QoS,中間件系列十 RabbitMQ之消費者端的消息确認機制

概述

在RabbitMQ中,即使将queue,exchange, message等都設定了持久化之後,還是不能保證100%保證資料不丢失了。為了實作消息不丢失,我們需要從Consumer端和Productor端同時進行處理。本篇文章先介紹Consumer端,在AMPQ-0-9-1中有定義從消費者到RabbitMQ的消息确認機制,通過此機制可以保證消息能夠從RabbitMQ正确到達消費者端。本文介紹在RabbitMQ中如何實作消費者端的消息确認機制,包括如下内容

1 消費者的實作機制

2 消費者端的代碼實作

3 使用wireshark對消息确認的關鍵包進行轉包,并進行分析

4 在使用消息确認機制的注意點

消費者端投遞确認機制

在消費者端确認的方式

RabbitMQ中的兩種确認方式:1 自動确認方式:RabbitMQ成功将消息發出(即将消息成功寫入TCP Socket)中立即認為本次投遞已經被正确處理,不管消費者端是否成功處理本次投遞

2 手動處理方式:消費者收到消息後,手動調用basic.ack/basic.nack/basic.reject後,RabbitMQ收到這些消息後,才認為本次投遞成功

批量手動投遞确認

消息手動除了一次确認一條,也可以一次确認多條。為了減少網絡流量,可以批量手動确認。在應答時,設定basic.nack的multiple 字段為true,可以同時對delivery_tag和比delivery_tag值小的投遞消息進行确認

例如,假設在通道上沒有确認消息的delivery_tag是5,6,7和8,當basic.nack中delivery_tag被設定為8并且multiple 被設定為true時,方法執行成功後,從5到8的所有消息将被确認。 如果multiple 設定為false,那麼交貨5,6和7仍然是未确認的。

投遞唯一碼: Delivery Tags

當消費者向RabbitMQ注冊後,RabbitMQ使用basic.deliver向消費者投遞消息時,消息體上會帶上delivery tag,這個值會唯一辨別本次投遞,在同一通道上,此值是唯一的。delivery tag值有64位長度,值從1開始,每發送一次消息值遞增1,最大值為9223372036854775807。消費者端在應答消息時,帶上此參數,告訴RabbitMQ某次投遞已經正确應答。

消費者端消息投遞确認代碼

工程:

工程:

代碼路徑:

消費者代碼:

代碼:關鍵點

a. channel.basicConsume設定接收非自動确認

b. 在處理完消息後,調用channel.basicAck進行手動消息确認// 預設消費者實作

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)

throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [ConsumerConfirmRecv] Received '" + message + "'");

// 消息正向确認

channel.basicAck(envelope.getDeliveryTag(),true);

// 消息否定确認: 如果設定multiple=false,requeue值啟作用,如果設定multiple=true,則requeue無論設定什麼值,背景統一處理成true

// channel.basicNack(envelope.getDeliveryTag(),false, false);

}

};

// 接收消息:設定非自動确認

channel.basicConsume(QUEUE_NAME, false, consumer);

消費者端抓包分析

執行以上的代碼,并進行抓包分析其中關鍵包。和一般的消息包不同點時,一般的消費者端的包分析見中間件系列九 RabbmtiMQ 通過wireshark抓包學習AMQP協定

正向确認-Basic.ack

和一般的消息包不同點時,一般的消費者端的包分析見本文,隻是多了Basic.ack包

php消費rabbitmq消息QoS,中間件系列十 RabbitMQ之消費者端的消息确認機制

131-132幀 RabbitMQ向消費者推送消息,消費者端進行否定确認

131幀 RabbitMQ向消費者推送消息Frame 131: 269 bytes on wire (2152 bits), 269 bytes captured (2152 bits) on interface 0

Ethernet II, Src: PcsCompu_48:72:ad (08:00:27:48:72:ad), Dst: Giga-Byt_bf:ae:ce (40:8d:5c:bf:ae:ce)

Internet Protocol Version 4, Src: 10.240.80.147, Dst: 10.240.80.99

Transmission Control Protocol, Src Port: 5672, Dst Port: 49877, Seq: 600, Ack: 642, Len: 215

# 傳回Consume的調用結果

Advanced Message Queueing Protocol

Type: Method (1)

Channel: 1

Length: 36

Class: Basic (60)

Method: Consume-Ok (21)

Arguments

# 此消息的消費者的編号

Consumer-Tag: amq.ctag-AhhpMhEMC5VuWcny42rObg

# 要投遞的消息參數

Advanced Message Queueing Protocol

Type: Method (1)

Channel: 1

Length: 87

Class: Basic (60)

Method: Deliver (60)

Arguments

# 要投遞的消息者标志

Consumer-Tag: amq.ctag-AhhpMhEMC5VuWcny42rObg

# 消息的編号,從1開始

Delivery-Tag: 1

# 是否是重新投遞的消息,如果消息是第一次投遞,則此值是false,如果此消息是重新投遞的,則此值為true

.... ...0 = Redelivered: False

# 來源的交換機

Exchange: consumerconfirm-exchange

# 路由鍵

Routing-Key: consumer-confirm

# 要投遞的消息的頭

Advanced Message Queueing Protocol

Type: Content header (2)

Channel: 1

Length: 27

Class ID: Basic (60)

Weight: 0

Body size: 33

Property flags: 0x9800

# 消息的屬性

Properties

Content-Type: text/plain

Delivery-Mode: 2

Priority: 0

# 要投遞的消息的内容

Advanced Message Queueing Protocol

Type: Content body (3)

Channel: 1

Length: 33

Payload: 436f6e73756d6572436f6e6669726d53656e642131353137...

132幀 消費者端對消息的确認

php消費rabbitmq消息QoS,中間件系列十 RabbitMQ之消費者端的消息确認機制

否定确認-Basic.Nack

和一般的消息包不同點時,一般的消費者端的包分析見本文,隻是多了Basic.Nack包

php消費rabbitmq消息QoS,中間件系列十 RabbitMQ之消費者端的消息确認機制

275-276幀 RabbitMQ向消費者推送消息,消費者端進行否定确認

275幀 RabbitMQ向消費者推送消息,和上節内容相似,這裡略

276幀 消費者端進行否定确認,比basic.ack多了Requeue屬性

php消費rabbitmq消息QoS,中間件系列十 RabbitMQ之消費者端的消息确認機制

其他消費者端的為了保證正确處理資料的機制

在消費者端為了正确處理資料,還可以設定Qos和投遞失敗

通道預取設定(Channel Prefetch Setting (QoS))

隻能在消息手動确認模式中啟作用。

為了避免消費者端一次同時處理過多的消息,可以通過basic.qos設定最大的預取值。該值定義了通道上允許的最大未确認消息,一旦未确認消息的數量達到配置值,RabbitMQ将停止在通道上傳送更多消息,直到至少有一個未被确認的消息被确認。

備注:通道預取設定在basic.get (“pull API”)中是不啟作用,即使在消息手動确認模式中

消費者确認模式,預取和吞吐量

在RabbitMQ中影響吞吐量最大的參數是:消息确認模式和Qos預取值

自動消息确認模式或設定Qos預取值為無限雖然可以最大的提高消息的投遞速度,但是在消費者端未及時處理的消息的數量也将增加,進而增加消費者RAM消耗,使用消費者端奔潰。是以以上兩種情況需要謹慎使用。

RabbitMQ官方推薦Qos預取值設定在 100到300範圍内的值通常提供最佳的吞吐量,并且不會有使消費者奔潰的問題

消費者失敗或失去連接配接時:自動重新排隊

在消息手動确認模式中,如果發生以下情況投遞消息所有的通道或連接配接被突然關閉(包括消費者端丢失TCP連接配接、消費者應用程式(程序)挂掉、通道級别的協定異常)任何已經投遞的消息但是沒有被消費者端确認的消息會自動重新排隊。

請注意,連接配接檢測不可用用戶端需要一段時間才會發現,是以會有一段時間内的所有消息會重新投遞

因為消息的可能重新投遞,所有必須保證消費者端的接口的幂等。

多次确認和對未知delivery_tag進行确認

如果消費者對同一delivery_tag進行多次确認,則抛出通道異常PRECONDITION_FAILED。如果對

未知的delivery_tag進行确認,也會抛出通道異常。

代碼

--------------------- 本文來自 hry2015 的CSDN 部落格 ,全文位址請點選:https://blog.csdn.net/hry2015/article/details/79416540?utm_source=copy