天天看點

【rabbitmq】rabbitmq概念解析--消息确認--示例程式

概述

本示例程式全部來自rabbitmq官方示例程式,​​rabbitmq-demo​​;

官方共有6個demo,針對不同的語言(如 C#,Java,Spring-AMQP等),都有不同的示例程式;

本示例程式主要是Spring-AMQP的參考示例,如果需要其他語言的參考示例,可以參考官網;

rabbitmq模拟器

​​模拟器​​

rabbitmq簡介

核心架構圖

【rabbitmq】rabbitmq概念解析--消息确認--示例程式

​​AMQP 0-9-1 Model Explained​​

重要文法說明

  • producer或publisher: 消息生産者/釋出者,即:産生消息的;
  • Exchange:producer或publisher隻會将message發送到Exchange,目前有4種不同的Exchange類型;
  • Queue:消息隊列,所有的消費者都是直接從Queue擷取Message并消費;
  • Binging:連接配接Exchange和Queue的紐帶,決定Exchange如何路由消息到不同的Queue;
  • routingKey:生産者-->message-->Exchange,需要指定一個key,叫做routingKey;
  • routingKey:Exchange-->Binging-->Queue,Binging有一個Key值,叫routingKey或bingingKey;
  • bingingKey:Exchange-->Binging-->Queue,Binging有一個Key值,bingingKey;

核心了解

4種不同的Exchange,對routingKey的解釋都不相同;

對routingKey的不同解釋,決定了Exchange路由Message到Queue的不同方案;

  1. direct exchange: 比對2個routingKey(即routingKey和bingingKey)是否相等,相等時才進行消息路由;
  2. fanout exchange: 忽略routingKey,會将Message路由到所有綁定的Queue;
  3. topic exchange: routingKey格式形如​

    ​aaa.bbb.xxx​

    ​​、​

    ​*.ccc.dd.#​

    ​,類似正規表達式比對;
  4. headers exchange:

jar包說明

  • Java版本:

    Java版本使用如下jar(說明:若是使用):

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.0.2</version>
</dependency>      
  • Spring-AMQP版本:

    ​​​Spring AMQP 官方詳細文章​​​

    使用Profile配置各個demo的運作選擇,當

    使用如下Jar包:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>      

demo1: 單生産者-單消費者

【rabbitmq】rabbitmq概念解析--消息确認--示例程式
  • Java版本:​​單生産者單消費者​​

    程式位置:java.demo1包下面

  • spring-amqp版本:​​單生産者單消費者​​

    application.properties配置

spring.profiles.active=hello-world, sender, receiver      

demo2: 單生産者-多消費者

​​Work queues官方示例​​

【rabbitmq】rabbitmq概念解析--消息确認--示例程式

application.properties配置

spring.profiles.active=work-queues, sender, receiver
#spring.profiles.active=work-queues, sender
#spring.profiles.active=work-queues, receiver      

較長的描述參見:​​單生産者-多消費者詳細​​

demo3: 釋出/訂閱

​​Publish/Subscribe官方示例​​

【rabbitmq】rabbitmq概念解析--消息确認--示例程式
  • 消費廣播到多個消費者進行消費;
  • 使用fanout pattern;

application.properties配置

spring.profiles.active=pub-sub, receiver , sender      

較長的描述參見:​​釋出/訂閱詳細​​

demo4: Routing

​​Routing官方示例​​

Direct exchange 模式進行route結構圖

【rabbitmq】rabbitmq概念解析--消息确認--示例程式

a message goes to the queues whose ​

​binding key​

​ exactly matches the ​

​routing key​

​ of the message;(相等時才路由)

Multiple bindings

【rabbitmq】rabbitmq概念解析--消息确認--示例程式

兩個Queue使用相同的BingingKey(black) ==> 效果類似于:釋出/訂閱模式(demo3);

完整的結構圖

【rabbitmq】rabbitmq概念解析--消息确認--示例程式

application.properties配置

pring.profiles.active=routing, receiver , sender      

較長的描述參見:​​釋出/訂閱詳細​​

demo5: Topics

​​Topics官方示例​​

【rabbitmq】rabbitmq概念解析--消息确認--示例程式
  • 使用 Topic exchange實作;
  • 發送到Topic exchange的routingKey必須滿足一定要求:用"."分割的words清單,如:​

    ​*.aaa.bbb.#​

    ​ ;
  • BingingKey和routingKey有相同的格式要求;
  • ​*​

    ​ : 可以比對一個word;
  • ​#​

    ​: 可以比對0個或多個words;

application.properties配置

pring.profiles.active=topics, receiver , sender      

較長的描述參見:​​Topics​​

demo6: RPC over RabbitMQ

​​RPC官方示例​​

結構圖

【rabbitmq】rabbitmq概念解析--消息确認--示例程式

application.properties配置

spring.profiles.active=rpc,server
#spring.profiles.active=rpc,client      

較長的描述參見:​​RPC​​

消費端确認

Delivery Identifiers: Delivery Tags

消費者注冊後,rabbitmq将消息傳遞給消費者時,都會帶有一個“Delivery Tags”,這個是唯一的ID辨別,id以整數的遞增的方式實作。

Acknowledgement Modes(消費端)

自動确認模式

  • 發送之後,就認為是發送成功(fire-and-forget)
  • 消息不停的發送到消費端消費,無需等待消費端任何确認;

缺點:

  • 可能造成消費端不堪重負;

手動模式

  1. basic.ack: 肯定的确認;
  2. basic.nack: 否定的确認(RabbitMQ對AMQP 0-9-1的擴充),支援消息​

    ​批量确認​

    ​;
  3. basic.reject:否定的确認,消息消費失敗後,直接從broker中将消息​

    ​delete​

    ​​,​

    ​不支援批量确認​

    ​;

Acknowledging Multiple Deliveries at Once(消息批量确認)

  • 一次确認多個消息發送,而不是每一個消息單獨确認;
  • basic.reject:不具備該功能;
  • basic.nack: 具備該功能;

實作方式

  • multiple field: 設定為true;

示例

假設:在Channel(ch)上有5,6,7,8這4個delivery tags未确認;

  • 情況1,​

    ​delivery_tag=8 & multiple=true​

    ​: 則5,6,7,8這4個tags都将被确認;
  • 情況2,​

    ​delivery_tag=8 & multiple=false​

    ​:則隻有8被确認,而5,6,7将不會被被确認;

Channel Prefetch Count (QoS)[可以設定消費端消費的速率]

  • 消息消費是​

    ​異步​

    ​​完成的,手動确認也是​

    ​異步​

    ​的;
  • 有一部分消息是被消費了,但是還未來得及确認:​

    ​希望控制未被确認消息的size,防止無界的緩存​

    ​;
  • ​prefetch count​

    ​​:使用​

    ​basic.qos​

    ​方法設定該值可以控制未被确認消息的max size;
  • 當達到該最大值時,rabbitmq将停止傳遞消息進行消費;
  • 僅對​

    ​basic.qos​

    ​​方法有效,對​

    ​basic.get​

    ​方法無效;

示例

假設:在Channel(Ch)上有5,6,7,8共4個​

​未被确認​

​的消息,且ch的​

​prefetch count=4​

​;

結果:rabbitmq将不會再傳遞任何消息到該Channel上,除非有消息被确認;

消費确認選擇,prefetch設定以及吞吐量

  • 情況1:增大​

    ​prefetch​

    ​:提高向消費者傳遞消息的速度;
  • 情況2:自動确認模式可以産生最佳的傳送速率;

應避免:

  1. ​自動确認模式​

    ​;
  2. ​手動确認模式​

    ​​ +​

    ​無限制的prefetch​

    ​;

結論:

  • ​情況1​

    ​​和​

    ​情況2​

    ​​都可能導緻​

    ​傳遞但未來得及處理​

    ​的Message增加,增大RAM的消耗;

推薦值:

  • ​prefetch​

    ​: 100~300,可以有效提高吞吐量,并避免RAM消耗過多的風險;

消費失敗或連接配接中斷: 自動重新reQueue

當消息發送給消費端後,如果出現如下情況,則消息會​

​重新reQueue​

​,會被再次發送;

  1. TCP連接配接中斷;
  2. 消費端挂掉:無法進行消息确認;

Client Errors: Double Acking and Unknown Tags

消費端無法對同一個消息确認超過一次,當超過一次之後,将抛出Channel error: ​

​PRECONDITION_FAILED - unknown delivery tag XXXX​

總結

  • 每個傳遞給消費端的消息,都有一個唯一的辨別​

    ​delivery tag​

    ​;
  • 自動消息确認;
  • 手動消息确認:​

    ​每個消息單獨确認​

    ​​和​

    ​批量消息确認​

    ​;
  • ​prefetchCount​

    ​:可以控制消息端的吞吐量,避免消費端消費過慢,産生RAM大量消耗;
  • 失敗重傳:​

    ​TCP連接配接中斷​

    ​​或​

    ​消費端挂掉​

    ​,都會引起消息重新入隊列,重新消費(手動消息确認時);
  • 無法對同一個消息進行2次或2次以上的​

    ​确認​

    ​,否則會抛出異常;

發送端确認

Channel事務

  • 不推薦使用: 會嚴重降低吞吐量;

在 AMQP 0-9-1中,保證消息不丢失的唯一方法,就是使用事務;

  1. 開啟Channel事務;
  2. 發送消息,送出事務;

類似消費端的應答确認機制

  • ​confirm.select​

    ​​: 應用于Channel時,表示使用​

    ​确認模式​

    ​;
  • ​事務​

    ​​和​

    ​确認模式​

    ​無法共存:二者隻能選擇其一;

确認模式 (confirm.select)

  • 發送端使用​

    ​confirm.select​

    ​;
  • ​broker​

    ​​發送​

    ​basic.ack​

    ​來确認Message已被處理;
  • ​delivery-tag​

    ​: 消息序列,具有唯一性;
  • ​multiple=true​

    ​​: 用于設定​

    ​批量消息确認​

    ​;
  • 無法保證消息何時被确認;
  • 确認模式:消息要麼被​

    ​confirmed(OK)​

    ​​,要麼被​

    ​nack(fail)​

    ​,且only once;

Java示例:(發送端發送大量messages,使用确認模式)

​​程式-确認模式​​

否定确認

異常情況時,服務端無法處理消息,則​

​broker​

​​發送​

​basic.nack​

​​來進行​

​否定确認​

​;

應答延時和持久化消息

  • 僅當消息被持久化到disk之後,才會發送​

    ​basic.ack​

    ​應答;
  • 吞吐量提高建議:​

    ​異步處理應答​

    ​​、​

    ​批量發送消息​

    ​;

應答順序

發送确認 + 保證傳遞

  • 消息持久化: 并不能保證消息不丢失(在寫入disk前broker就挂掉);

限制

參考