天天看點

背景開發的rabbitmq官網拾遺常使用rabbitmq的常用功能,想着拓展一下,就到rabbitmq官網的文檔裡做一些總結和摘取,檢視文檔的視角是一位背景開發,是以運維工具方面省略了,比如 安裝、CLI工具、配置、權限、部分網絡、指令等。

背景開發的rabbitmq官網拾遺

  • 常使用rabbitmq的常用功能,想着拓展一下,就到rabbitmq官網的文檔裡做一些總結和摘取,檢視文檔的視角是一位背景開發,是以運維工具方面省略了,比如 安裝、CLI工具、配置、權限、部分網絡、指令等。
    • 網絡和連接配接(部分)
    • 監控
    • 用戶端連接配接
    • 分布式mq 與 clustering
    • Quorum Queues
    • Classic Mirrored Queues
    • Time-To-Live and Expiration
    • Lazy Queues
    • 普通的Queues
    • Exclusive Queues
    • Priority Queue Support
    • Exchange
    • Dead Letter Exchanges
    • Consumers
    • Publishers
    • Java Client 總結
    • channel特性
    • Channel Prefetch Setting (QoS)
    • firehose 消防水帶特性
      • 申明

常使用rabbitmq的常用功能,想着拓展一下,就到rabbitmq官網的文檔裡做一些總結和摘取,檢視文檔的視角是一位背景開發,是以運維工具方面省略了,比如 安裝、CLI工具、配置、權限、部分網絡、指令等。

順便貼上官網位址https://www.rabbitmq.com/documentation.html

網絡和連接配接(部分)

  1. 各種用戶端和RabbitMQ協定種類包括如下:

    AMQP 0-9-1 with extensions :一個連接配接多個通道,可以使用pki以及認證證書。

    AMQP 1.0 :有個連接配接多種會話,比上一個功能弱一點。

    MQTT 3.1.1

    STOMP 1.0 through 1.2

  2. 所有協定都是建立在tcp上的,使用長連接配接。一個client使用一個tcp連接配接。這裡注意,使用完連接配接後,應用需要關閉它,減少機器資源耗盡的風險。有些作業系統會限制一個程序可以使用的連接配接數目。
  3. 連接配接洩露問題:連接配接打開後一直不關閉,或者隻是關閉一小部分,造成資源耗盡。使用admin背景監控連接配接數量, RabbitMQ 3.7.9.之後還能監控連接配接打開速率。
  4. 高連接配接切換問題:應用頻繁的打開以及關閉連接配接,應該盡量使用長連接配接。切換速率超過 100/second 就要注意了。
  5. 流控:應用在推送連接配接上
  6. 心跳監測時間:對于叢集,小于5秒可能導緻錯誤判斷,不推薦。

監控

  1. mq的插件可以提供http的接口,查詢mq的監控名額,預設的http接口是 http://server-name:15672/api/**
  2. 主要名額:

    cpu狀态

    記憶體使用率

    虛拟記憶體使用

    I/O 使用情況

    節點的可用空間

    TCP連接配接數目

    網絡流量

    網絡延遲

  3. 監控頻率推薦是30-60秒

用戶端連接配接

  1. AMQP 0-9-1方式的用戶端的連接配接使用同一個Tcp連接配接,後面一個是更底層的連接配接,每個channel以一個id。

    channel是建立在成功的連接配接之上。關閉後會是防止相應的資源。關閉的channel不能再被使用。

  2. 異常場景:

    重複申明queue、exchange時候使用不同的屬性值,導緻406錯誤,注意有些場景,屬性是可以覆寫的,這個異常隻是說一種情況。

    通路不被允許的資源,403錯誤。

    綁定不存在的queue、或者exchange,404錯誤

    從不存在的queue消費、推消息到不存在的exchange,404錯誤

    通路一個被排除的queue,405異常

    但是用戶端可以自己處理,如Java用戶端,可以注冊錯誤處理器(error handle)

  3. 一個連接配接可以打開的channel數目,是可以配置的。channel數目增加,會導緻用戶端以及節點上資源消耗。是以需要合理控制。
  4. rabbitmq-event-exchange 插件

    這個插件可以監聽mq事件,包括連接配接、channel、queue等系統部分産生的事件。這個插件把事件推送到exchange裡面,名字是 amq.rabbitmq.event 應用自己聲明隊列并且監聽。

分布式mq 與 clustering

  1. 分布式有三種方式:with clustering, with Federation, and using the Shovel plugin.

    clustering: 多台機器聯系到一起,所有節點mq以及erlang版本一緻。上面的虛拟主機、exchange、使用者等自動複制。queue可以複制内容、鏡像、可以隻在一個節點上、可以使用Quorum queues 類型。

    Federation:允許exchange以及queue接收其他broker上對應exchange以及queue的消息。 exchange或者queue使用點對點的連結,預設情況是消息在連結上被傳遞一次。傳遞出去的消息如果不能配置設定到queue上,那麼消息來源的位置,也不會被發出去。常見場景是組合連個mq的broker。 可以使用不同版本的mq以及erlang

    Shovels插件:雷類似于Federation,但是更低級層次。他是消費一個消息,然後,傳遞到另一個exchange上。

  2. 節點通過名字做唯一識别,例如[email protected]。可以動态變化節點數目。
  3. 被複制的東西:

    所有的資料、狀态都會被複制到所有節點。除了預設的消息隊列,可以複制的隊列類型在後面談及。

  4. 叢集的所有節點是地位相同的。節點間通過cookies驗證身份。

    節點數目推薦使用奇數,1、3、5、7等。因為需要集體決策。

  5. 節點的操作會被複制到不同節點(原文大概是隊列的主從位置)
  6. 防止單個節點失敗

    用戶端連接配接使用一個list指明節點即可。另外推薦的方法有:dns動态解析變換,或者tcp連接配接的負載均衡方案。

  7. 節點失敗

    mq的broker能夠接受節點失敗。鏡像隊列可以使得内容複制到不同的節點;非鏡像隊列也可以用在叢集上。

Quorum Queues

quorum 是指決策多數。例如n/2+1 中的n。RabbitMQ 3.8.0.之後支援。

  1. 動機:

    傳統的鏡像隊列由于技術原因,有潛在風險丢失資訊。quorum queue保證消息安全、友善實用。

  2. 和傳統隊列比較,有很多差異點,簡單列舉部分;

    隻能是持久化的

    不支援排除特性

    不支援ttl、不支援死信

    不支援優先級

    支援Poison Message Handling

    消息一直不能被消費,導緻不斷從新入隊,成為Poison Message。消息head裡面用x-delivery-count記錄發送次數,然後隊列設定最大次數,大于最大次數的丢棄或者成為死信。

  3. 性能嚴重取決于叢集大小以及硬碟速度。

    複制因子預設是5,例如叢集是3節點,每個節點上有一個複制;叢集是7節點,隻有5個節點上有一個複制;

  4. 使用publish confirm機制保證消息安全,消息一緻性大于可用性。

Classic Mirrored Queues

官網開頭就說:建議讀者使用quorum queue。并評價為下一代高可用隊列。

  1. 介紹

    每一個queue有主從,主的節點負責消息接收、消息發送。主節點挂掉,最老的從節點變成主節點。無論用戶端連接配接到叢集哪個節點,執行操作的是主節點。

    主節點掉線後,如果上面的資料沒有同步,那麼消息就會丢失。消費者是自動應答模式的話,消息也可能會丢失。從節點轉發的消息沒有被消費,從節點就掉線的話,也可能會導緻消息丢失。

    具體同步、選主過程、失敗處理等略。

  2. 可靠性

    涉及點如下,詳情略:連接配接失敗處理、ack以及confirms機制、信條堅持以及資料名額監控、生産者資料安全、消費者資料安全、消費者監聽取消通知

  3. 高可用實作略
  4. 腦裂問題:

    網絡問題造成一個分區的兩個部分斷開連接配接,變成兩個分區;網絡再次連接配接時,分區不會自動合并。

    解決方法:

    mq版本使用3.4.0和3.4.1及以上,避免錯誤檢測到網絡分區。

    RabbitMQ提供了三種方法自動的解決網絡分區:

    pause-minority mode: 檢測自身處于少數派,關閉此叢集。

    pause-if-all-down mode:不能和任意指定節點連接配接,關閉此叢集。

    autoheal mode。(預設的是ignore模式):選擇節點多的作為主分區,其他重新開機。節點數一樣的話,随機選擇。

Time-To-Live and Expiration

  1. 在每一個隊列上設定ttl:定義隊列時候使用參數(“x-message-ttl”, 60000),機關毫秒
  2. 在每一個設定消息的ttl:消息設定屬性expiration,機關毫秒

    兩個屬性都有的話,最小值。

  3. 隊列自身的ttl:設定x-expires,定義隊列多久未使用後被删除。未使用是指沒有消費者。
  4. 隊列長度限制:

    可以設定消息總數量或者消息的總記憶體大小限制;

    處置超過限制的消息:定義隊列使用參數overflow(“overflow”:“reject-publish”),可以指定消息不能進入隊列或者從頭部丢棄消息,消息被抛棄或者成為死信。

Lazy Queues

  1. 介紹

    RabbitMQ 3.6.0之後支援懶加載隊列。适應于數百萬消息、或者記憶體較小的場景。這個是盡可能多的把消息放到硬碟上。少量消息放在記憶體中,預設是16384個消息。

    可以再運作的時候修改隊列的模式,但是需要重新開機。

普通的Queues

  1. 注意

    以"amq.開頭的隊列名字,是mq的保留字段。

  2. 隊列使用前需要申明。對于已經存在的隊列,使用相同的參數聲明,沒有影響;參數不同的話,會導緻406異常。

    隊列的多數可選參數可以動态變化。

Exclusive Queues

隻是被聲明他的連接配接使用。其他連接配接想要使用的話,會抛出異常。聲明他的連接配接關閉後,這個隊列被删除。

Priority Queue Support

  1. 介紹

    聲明隊列的時候使用參數x-max-priority辨別優先級。 1-255,推薦是1-10的數字。

    然後就可以推送具有優先級的消息,使用屬性priority定義。沒有值的話預設是0,必定義的最大值還要大的話,就相當于最大值。

  2. 高優先級的消息會被先處理。但是需要配合消息的prefetch屬性。因為消息進入消費者的緩存池,就不會再排序了。
  3. 隊列裡面的消息是從頭部掃描過期的,是以低優先級的過期消息可能會被高優先級的消息卡在後面。

    如果隊列有最大限制,按照通常丢棄頭部消息的政策,可能會造成高優先級的消息會被丢棄。

Exchange

  1. exchange 和 exchange 可以綁定,就像隊列綁定一樣使用。
  2. Alternate Exchanges: 一個exchange不能把消息配置設定給queue,那麼消息就會進入AE裡面,這個路徑可以一直走下去。

Dead Letter Exchanges

一個消息成為死信有三種情況

消息被消費者拒絕

消息過期

隊列長度限制導緻消息被丢棄

  1. 關于dead letter routing key:消息發送的時候自身有routing key = foo,預設的dead letter routing key 就是foo, 如果exchange上設定 dead letter routing key = bar,那麼死信的dead letter routing key 變成bar。
  2. 死信消息在原始隊列上的删除時機:死信隊列ack回複後,原始隊列才會把消息删除,如果死信隊列在回複前挂掉,死信消息會存在兩個位置。
  3. 如果死信消息發生了循環,這個消息會被丢棄。
  4. 死信消息的head有一個x-death屬性,存有死信消息的經曆的事件。

Consumers

  1. 每一個消費者也有一個唯一辨別。
  2. 消費者連接配接丢失的話,Java .Net client 會自動恢複。

    恢複的順序是:

    恢複connection

    恢複channel

    恢複queue

    恢複exchange

    恢複binding

    恢複消費者

  3. Consumer Prefetch

    限制處于未傳回ack的消息數目,隻有一個消息傳回ack才能騰出空間給下一個消息。

  4. Consumer Priorities

    消費者設定優先級,高優先級的消費者的會先消費,除非高優先級堵塞,低優先級的消費者才會消費消息。

    同一個級别的是輪詢消費。

  5. 設定exclusive 為true,保證某個時刻隻有一個customer消費queue。排他消費。

    設定x-single-active-consumer為true,某個時刻隻有一個customer消費queue直到這個消費者挂掉,然後其他消費者自動補上。單活消費。

    單活消費 與 排他消費差別在于,前者更大程度保證消息被一個消費者消費。這倆個屬性不能共存。

  6. 消費者并發:Java .net 等用戶端使用線程池控制異步消費操作,這個池子也可以設定并發度。用戶端保證了從同一個channel發送來的消息,會被按照原有順序消費,不必考慮池子的并發度。這裡需要注意,消費和等待ack是異步的,後消費的可能速度快導緻先傳回ack。

    有些用戶端會限制并發度,設定并發因子為1,以滿足嚴格的順序消費場景。

  7. 對于消費者被異常取消的場景,java 客戶單可以使用覆寫handleCancel方法自定義業務。

Publishers

  1. AMQP 0-9-1協定上:消息發送到exchange上,如果沒有queue接收,預設是消息丢棄。如果消息設定了mandatory屬性,消息會被傳回給exchange。
  2. message可以設定Content type(例如 application/json)、Content encoding(例如 gzip);但是mq不會使用這個字段,自己的application或者plugins 使用這些字段。
  3. Publisher Confirms的政策:

    流模式:持續推送消息,監聽消息ack,

    批模式:一次推送一批消息,等待消息ack

    單個模式:一次推送後,等待ack,然後再次推送。是批模式的特例。對流量影響最大。

    當出現資源報警時候,所有推送都是阻塞的直到警告清除。

  4. 生産者publish confirm的時候,不共享channel,使用channel pool ;消費者使用線程池,來保證消息安全。
  5. mq支援Java nio,但是nio不是為了提高流量的,是為了可控性。控制有多少線程在工作。
  6. 除了常見的exchange類型,還有其他的類型:Headers,下面Consistent hashing exchange, random routing exchange, internal event exchange and delayed message exchange,這些可以通過插件實作。
  7. 分發消息的時候,消息會被mq預設帶上一些屬性:Delivery tag、Redelivered、Exchange、Routing key、Consumer tag,還有一些可選的屬性:Message ID、Headers等。
  8. 推送安全有兩種模式:transaction和confirm,前者是重量級的,減少很多吞吐量。

Java Client 總結

  1. 并發安全:

    不同的線程使用同一channel應該避免,因該是給每一個線程提供不同的channel,可以使用channel pool 達到上述目的,作為一種同步機制解決方法。

    消費在一個線程上,推送在另一個線程上,也是安全的。

    當時使用手動ack模式,使用線程回複ack是很重要的,否則可能造成重複ack。

  2. 對于消費者,線程池裡面的線程是依據channel獨立開的,是以不同線程可以執行阻塞的方法。 每一個channel有自己的線程,
  3. 消息給了exchange,但是無法路由到queue上:如果消息代了mandatory的辨別,那麼消息會被傳回給用戶端。用戶端對于這種消息的處理,需要實作ReturnListener,不實作的話,默默丢棄消息。
  4. 其他自定義特性:

    1 設定消費者的線程池

    ExecutorService es = Executors.newFixedThreadPool(20);

    Connection conn = factory.newConnection(es);

    需要注意確定自定義的線程池執行shutdown()方法,否者它會阻止jvm關閉

    當有明顯證據顯示,預設的配置是處理消息的瓶頸的時候,才考慮自定義的線程池。

    2:設定hosts清單

    用戶端會按順序逐個清單的host,傳回第一個成功的連接配接。

    3 支援Java的nio

    4 支援網絡失敗後自動恢複,預設開啟

    觸發的時機包括:io異常、socket readTimeOut、心跳監測異常

    5 可以注冊一個恢複監聽器,Recovery Listeners

  5. 自動恢複:

    mq的entities (queues, exchanges, bindings, consumers)會在緩存裡面存資料。連接配接中斷的監測需要時間,library and the application會有一段時間意識不到連接配接斷開。 mq的publisher confirms機制保證消息不會丢失。

    當i/o操作異常的時候,application恢複連接配接會在延遲5秒後開始,這是因為假設大多數網絡失敗是臨時的。這個時間可以通過RecoveryDelayHandler動态修改,但是不應該小于2秒。

    恢複狀态的connection會拒絕所有的推送請求,publisher confirms不能完全避免消息丢失。開發者應該注意當連接配接恢複的時候,重新推送消息。

  6. 監控mq的各項名額可以使用MicrometerMetricsCollector。它的角色隻是讀取資料。

channel特性

  1. 每個channel在用戶端以及節點上都占用少量記憶體,
  2. 用戶端和mq都可以配置單個connection支援的最大channel數目

    channel對應一個deliver tag,消息的屬性裡面也有這個字段,是以消息和channel是綁定的。

  3. acknowledge modes

    自動ack:消息寫到tcp socket就認為成功了,容易丢消息。

    手動ack:手動傳回ack

    傳回的結果

    basic.ack:消息處理成功,可以删除了。

    Java用戶端,可以設定multiple,批量傳回ack

    basic.nack:息處理不成功

    設定requeue,指定是否重新入隊,不同于reject,可以設定multiple,批量傳回結果

    basic.reject:息處理不成功,可以删除了

    設定requeue,指定是否重新入隊

    重新入隊的消息,若果可能,回到原來的位置;如果不行,放到更靠近header的一個位置。

  4. 自動模式ack下:犧牲安全換取吞吐量,可以認為這個模式是不安全的。而且會造成伺服器過載,因為它不會限制channel prefetch,是以隻是用在那些消費速度很快的場景。
  5. 手動ack模式下:因為channel關閉或者其他異常,消息會自動入隊

Channel Prefetch Setting (QoS)

  1. 設定prefetch count,指定消費者一次接受多少消息處于進行中,隻有一個消息處理結束,才會再放入下一個消息。

    prefetch count會影響吞吐量,通常100~300能提供較好吞吐量。

  2. 持久的消息,到達硬碟的時候,basic.ack就會傳回個publisher

    和消費消息一樣,發送消息和等待ack異步的,兩者順序不完全相同。

    channel設定global辨別,這裡是rabbitmq給的含義

    true:basic.qos的值,應用給一個channel上所有的consumer。表示大家均攤這個數值。

    false:basic.qos的值,應用給獨立的consumer

firehose 消防水帶特性

開啟的話會影響性能,因為有額外的消息被産生以及路由

從exchange進入的消息,都會進入amq.rabbitmq.trace 這個預設exchange,需要自己取消費這個exchange。同時這個消息的頭會放入消息的來龍去脈。可以做備份吧。

消息頭如下:

headers: channel: 1

connection: [email protected]

exchange_name: priority.ex

node: [email protected]

properties: delivery_mode: 1

headers:

routed_queues: priority.q

routing_keys:

user: guest

vhost: /

申明

如需轉載,需要申明來源