背景開發的rabbitmq官網拾遺
- 常使用rabbitmq的常用功能,想着拓展一下,就到rabbitmq官網的文檔裡做一些總結和摘取,檢視文檔的視角是一位背景開發,是以運維工具方面省略了,比如 安裝、CLI工具、配置、權限、部分網絡、指令等。
-
- 網絡和連接配接(部分)
- 監控
- 用戶端連接配接
- 分布式mq 與 clustering
- Quorum Queues
- Classic Mirrored Queues
- Time-To-Live and Expiration
- Lazy Queues
- 普通的Queues
- Exclusive Queues
- Priority Queue Support
- Exchange
- Dead Letter Exchanges
- Consumers
- Publishers
- Java Client 總結
- channel特性
- Channel Prefetch Setting (QoS)
- firehose 消防水帶特性
-
- 申明
常使用rabbitmq的常用功能,想着拓展一下,就到rabbitmq官網的文檔裡做一些總結和摘取,檢視文檔的視角是一位背景開發,是以運維工具方面省略了,比如 安裝、CLI工具、配置、權限、部分網絡、指令等。
順便貼上官網位址https://www.rabbitmq.com/documentation.html
網絡和連接配接(部分)
-
各種用戶端和RabbitMQ協定種類包括如下:
AMQP 0-9-1 with extensions :一個連接配接多個通道,可以使用pki以及認證證書。
AMQP 1.0 :有個連接配接多種會話,比上一個功能弱一點。
MQTT 3.1.1
STOMP 1.0 through 1.2
- 所有協定都是建立在tcp上的,使用長連接配接。一個client使用一個tcp連接配接。這裡注意,使用完連接配接後,應用需要關閉它,減少機器資源耗盡的風險。有些作業系統會限制一個程序可以使用的連接配接數目。
- 連接配接洩露問題:連接配接打開後一直不關閉,或者隻是關閉一小部分,造成資源耗盡。使用admin背景監控連接配接數量, RabbitMQ 3.7.9.之後還能監控連接配接打開速率。
- 高連接配接切換問題:應用頻繁的打開以及關閉連接配接,應該盡量使用長連接配接。切換速率超過 100/second 就要注意了。
- 流控:應用在推送連接配接上
- 心跳監測時間:對于叢集,小于5秒可能導緻錯誤判斷,不推薦。
監控
- mq的插件可以提供http的接口,查詢mq的監控名額,預設的http接口是 http://server-name:15672/api/**
-
主要名額:
cpu狀态
記憶體使用率
虛拟記憶體使用
I/O 使用情況
節點的可用空間
TCP連接配接數目
網絡流量
網絡延遲
- 監控頻率推薦是30-60秒
用戶端連接配接
-
AMQP 0-9-1方式的用戶端的連接配接使用同一個Tcp連接配接,後面一個是更底層的連接配接,每個channel以一個id。
channel是建立在成功的連接配接之上。關閉後會是防止相應的資源。關閉的channel不能再被使用。
-
異常場景:
重複申明queue、exchange時候使用不同的屬性值,導緻406錯誤,注意有些場景,屬性是可以覆寫的,這個異常隻是說一種情況。
通路不被允許的資源,403錯誤。
綁定不存在的queue、或者exchange,404錯誤
從不存在的queue消費、推消息到不存在的exchange,404錯誤
通路一個被排除的queue,405異常
但是用戶端可以自己處理,如Java用戶端,可以注冊錯誤處理器(error handle)
- 一個連接配接可以打開的channel數目,是可以配置的。channel數目增加,會導緻用戶端以及節點上資源消耗。是以需要合理控制。
-
rabbitmq-event-exchange 插件
這個插件可以監聽mq事件,包括連接配接、channel、queue等系統部分産生的事件。這個插件把事件推送到exchange裡面,名字是 amq.rabbitmq.event 應用自己聲明隊列并且監聽。
分布式mq 與 clustering
-
分布式有三種方式:with clustering, with Federation, and using the Shovel plugin.
clustering: 多台機器聯系到一起,所有節點mq以及erlang版本一緻。上面的虛拟主機、exchange、使用者等自動複制。queue可以複制内容、鏡像、可以隻在一個節點上、可以使用Quorum queues 類型。
Federation:允許exchange以及queue接收其他broker上對應exchange以及queue的消息。 exchange或者queue使用點對點的連結,預設情況是消息在連結上被傳遞一次。傳遞出去的消息如果不能配置設定到queue上,那麼消息來源的位置,也不會被發出去。常見場景是組合連個mq的broker。 可以使用不同版本的mq以及erlang
Shovels插件:雷類似于Federation,但是更低級層次。他是消費一個消息,然後,傳遞到另一個exchange上。
- 節點通過名字做唯一識别,例如[email protected]。可以動态變化節點數目。
-
被複制的東西:
所有的資料、狀态都會被複制到所有節點。除了預設的消息隊列,可以複制的隊列類型在後面談及。
-
叢集的所有節點是地位相同的。節點間通過cookies驗證身份。
節點數目推薦使用奇數,1、3、5、7等。因為需要集體決策。
- 節點的操作會被複制到不同節點(原文大概是隊列的主從位置)
-
防止單個節點失敗
用戶端連接配接使用一個list指明節點即可。另外推薦的方法有:dns動态解析變換,或者tcp連接配接的負載均衡方案。
-
節點失敗
mq的broker能夠接受節點失敗。鏡像隊列可以使得内容複制到不同的節點;非鏡像隊列也可以用在叢集上。
Quorum Queues
quorum 是指決策多數。例如n/2+1 中的n。RabbitMQ 3.8.0.之後支援。
-
動機:
傳統的鏡像隊列由于技術原因,有潛在風險丢失資訊。quorum queue保證消息安全、友善實用。
-
和傳統隊列比較,有很多差異點,簡單列舉部分;
隻能是持久化的
不支援排除特性
不支援ttl、不支援死信
不支援優先級
支援Poison Message Handling
消息一直不能被消費,導緻不斷從新入隊,成為Poison Message。消息head裡面用x-delivery-count記錄發送次數,然後隊列設定最大次數,大于最大次數的丢棄或者成為死信。
-
性能嚴重取決于叢集大小以及硬碟速度。
複制因子預設是5,例如叢集是3節點,每個節點上有一個複制;叢集是7節點,隻有5個節點上有一個複制;
- 使用publish confirm機制保證消息安全,消息一緻性大于可用性。
Classic Mirrored Queues
官網開頭就說:建議讀者使用quorum queue。并評價為下一代高可用隊列。
-
介紹
每一個queue有主從,主的節點負責消息接收、消息發送。主節點挂掉,最老的從節點變成主節點。無論用戶端連接配接到叢集哪個節點,執行操作的是主節點。
主節點掉線後,如果上面的資料沒有同步,那麼消息就會丢失。消費者是自動應答模式的話,消息也可能會丢失。從節點轉發的消息沒有被消費,從節點就掉線的話,也可能會導緻消息丢失。
具體同步、選主過程、失敗處理等略。
-
可靠性
涉及點如下,詳情略:連接配接失敗處理、ack以及confirms機制、信條堅持以及資料名額監控、生産者資料安全、消費者資料安全、消費者監聽取消通知
- 高可用實作略
-
腦裂問題:
網絡問題造成一個分區的兩個部分斷開連接配接,變成兩個分區;網絡再次連接配接時,分區不會自動合并。
解決方法:
mq版本使用3.4.0和3.4.1及以上,避免錯誤檢測到網絡分區。
RabbitMQ提供了三種方法自動的解決網絡分區:
pause-minority mode: 檢測自身處于少數派,關閉此叢集。
pause-if-all-down mode:不能和任意指定節點連接配接,關閉此叢集。
autoheal mode。(預設的是ignore模式):選擇節點多的作為主分區,其他重新開機。節點數一樣的話,随機選擇。
Time-To-Live and Expiration
- 在每一個隊列上設定ttl:定義隊列時候使用參數(“x-message-ttl”, 60000),機關毫秒
-
在每一個設定消息的ttl:消息設定屬性expiration,機關毫秒
兩個屬性都有的話,最小值。
- 隊列自身的ttl:設定x-expires,定義隊列多久未使用後被删除。未使用是指沒有消費者。
-
隊列長度限制:
可以設定消息總數量或者消息的總記憶體大小限制;
處置超過限制的消息:定義隊列使用參數overflow(“overflow”:“reject-publish”),可以指定消息不能進入隊列或者從頭部丢棄消息,消息被抛棄或者成為死信。
Lazy Queues
-
介紹
RabbitMQ 3.6.0之後支援懶加載隊列。适應于數百萬消息、或者記憶體較小的場景。這個是盡可能多的把消息放到硬碟上。少量消息放在記憶體中,預設是16384個消息。
可以再運作的時候修改隊列的模式,但是需要重新開機。
普通的Queues
-
注意
以"amq.開頭的隊列名字,是mq的保留字段。
-
隊列使用前需要申明。對于已經存在的隊列,使用相同的參數聲明,沒有影響;參數不同的話,會導緻406異常。
隊列的多數可選參數可以動态變化。
Exclusive Queues
隻是被聲明他的連接配接使用。其他連接配接想要使用的話,會抛出異常。聲明他的連接配接關閉後,這個隊列被删除。
Priority Queue Support
-
介紹
聲明隊列的時候使用參數x-max-priority辨別優先級。 1-255,推薦是1-10的數字。
然後就可以推送具有優先級的消息,使用屬性priority定義。沒有值的話預設是0,必定義的最大值還要大的話,就相當于最大值。
- 高優先級的消息會被先處理。但是需要配合消息的prefetch屬性。因為消息進入消費者的緩存池,就不會再排序了。
-
隊列裡面的消息是從頭部掃描過期的,是以低優先級的過期消息可能會被高優先級的消息卡在後面。
如果隊列有最大限制,按照通常丢棄頭部消息的政策,可能會造成高優先級的消息會被丢棄。
Exchange
- exchange 和 exchange 可以綁定,就像隊列綁定一樣使用。
- Alternate Exchanges: 一個exchange不能把消息配置設定給queue,那麼消息就會進入AE裡面,這個路徑可以一直走下去。
Dead Letter Exchanges
一個消息成為死信有三種情況
消息被消費者拒絕
消息過期
隊列長度限制導緻消息被丢棄
- 關于dead letter routing key:消息發送的時候自身有routing key = foo,預設的dead letter routing key 就是foo, 如果exchange上設定 dead letter routing key = bar,那麼死信的dead letter routing key 變成bar。
- 死信消息在原始隊列上的删除時機:死信隊列ack回複後,原始隊列才會把消息删除,如果死信隊列在回複前挂掉,死信消息會存在兩個位置。
- 如果死信消息發生了循環,這個消息會被丢棄。
- 死信消息的head有一個x-death屬性,存有死信消息的經曆的事件。
Consumers
- 每一個消費者也有一個唯一辨別。
-
消費者連接配接丢失的話,Java .Net client 會自動恢複。
恢複的順序是:
恢複connection
恢複channel
恢複queue
恢複exchange
恢複binding
恢複消費者
-
Consumer Prefetch
限制處于未傳回ack的消息數目,隻有一個消息傳回ack才能騰出空間給下一個消息。
-
Consumer Priorities
消費者設定優先級,高優先級的消費者的會先消費,除非高優先級堵塞,低優先級的消費者才會消費消息。
同一個級别的是輪詢消費。
-
設定exclusive 為true,保證某個時刻隻有一個customer消費queue。排他消費。
設定x-single-active-consumer為true,某個時刻隻有一個customer消費queue直到這個消費者挂掉,然後其他消費者自動補上。單活消費。
單活消費 與 排他消費差別在于,前者更大程度保證消息被一個消費者消費。這倆個屬性不能共存。
-
消費者并發:Java .net 等用戶端使用線程池控制異步消費操作,這個池子也可以設定并發度。用戶端保證了從同一個channel發送來的消息,會被按照原有順序消費,不必考慮池子的并發度。這裡需要注意,消費和等待ack是異步的,後消費的可能速度快導緻先傳回ack。
有些用戶端會限制并發度,設定并發因子為1,以滿足嚴格的順序消費場景。
- 對于消費者被異常取消的場景,java 客戶單可以使用覆寫handleCancel方法自定義業務。
Publishers
- AMQP 0-9-1協定上:消息發送到exchange上,如果沒有queue接收,預設是消息丢棄。如果消息設定了mandatory屬性,消息會被傳回給exchange。
- message可以設定Content type(例如 application/json)、Content encoding(例如 gzip);但是mq不會使用這個字段,自己的application或者plugins 使用這些字段。
-
Publisher Confirms的政策:
流模式:持續推送消息,監聽消息ack,
批模式:一次推送一批消息,等待消息ack
單個模式:一次推送後,等待ack,然後再次推送。是批模式的特例。對流量影響最大。
當出現資源報警時候,所有推送都是阻塞的直到警告清除。
- 生産者publish confirm的時候,不共享channel,使用channel pool ;消費者使用線程池,來保證消息安全。
- mq支援Java nio,但是nio不是為了提高流量的,是為了可控性。控制有多少線程在工作。
- 除了常見的exchange類型,還有其他的類型:Headers,下面Consistent hashing exchange, random routing exchange, internal event exchange and delayed message exchange,這些可以通過插件實作。
- 分發消息的時候,消息會被mq預設帶上一些屬性:Delivery tag、Redelivered、Exchange、Routing key、Consumer tag,還有一些可選的屬性:Message ID、Headers等。
- 推送安全有兩種模式:transaction和confirm,前者是重量級的,減少很多吞吐量。
Java Client 總結
-
并發安全:
不同的線程使用同一channel應該避免,因該是給每一個線程提供不同的channel,可以使用channel pool 達到上述目的,作為一種同步機制解決方法。
消費在一個線程上,推送在另一個線程上,也是安全的。
當時使用手動ack模式,使用線程回複ack是很重要的,否則可能造成重複ack。
- 對于消費者,線程池裡面的線程是依據channel獨立開的,是以不同線程可以執行阻塞的方法。 每一個channel有自己的線程,
- 消息給了exchange,但是無法路由到queue上:如果消息代了mandatory的辨別,那麼消息會被傳回給用戶端。用戶端對于這種消息的處理,需要實作ReturnListener,不實作的話,默默丢棄消息。
-
其他自定義特性:
1 設定消費者的線程池
ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
需要注意確定自定義的線程池執行shutdown()方法,否者它會阻止jvm關閉
當有明顯證據顯示,預設的配置是處理消息的瓶頸的時候,才考慮自定義的線程池。
2:設定hosts清單
用戶端會按順序逐個清單的host,傳回第一個成功的連接配接。
3 支援Java的nio
4 支援網絡失敗後自動恢複,預設開啟
觸發的時機包括:io異常、socket readTimeOut、心跳監測異常
5 可以注冊一個恢複監聽器,Recovery Listeners
-
自動恢複:
mq的entities (queues, exchanges, bindings, consumers)會在緩存裡面存資料。連接配接中斷的監測需要時間,library and the application會有一段時間意識不到連接配接斷開。 mq的publisher confirms機制保證消息不會丢失。
當i/o操作異常的時候,application恢複連接配接會在延遲5秒後開始,這是因為假設大多數網絡失敗是臨時的。這個時間可以通過RecoveryDelayHandler動态修改,但是不應該小于2秒。
恢複狀态的connection會拒絕所有的推送請求,publisher confirms不能完全避免消息丢失。開發者應該注意當連接配接恢複的時候,重新推送消息。
- 監控mq的各項名額可以使用MicrometerMetricsCollector。它的角色隻是讀取資料。
channel特性
- 每個channel在用戶端以及節點上都占用少量記憶體,
-
用戶端和mq都可以配置單個connection支援的最大channel數目
channel對應一個deliver tag,消息的屬性裡面也有這個字段,是以消息和channel是綁定的。
-
acknowledge modes
自動ack:消息寫到tcp socket就認為成功了,容易丢消息。
手動ack:手動傳回ack
傳回的結果
basic.ack:消息處理成功,可以删除了。
Java用戶端,可以設定multiple,批量傳回ack
basic.nack:息處理不成功
設定requeue,指定是否重新入隊,不同于reject,可以設定multiple,批量傳回結果
basic.reject:息處理不成功,可以删除了
設定requeue,指定是否重新入隊
重新入隊的消息,若果可能,回到原來的位置;如果不行,放到更靠近header的一個位置。
- 自動模式ack下:犧牲安全換取吞吐量,可以認為這個模式是不安全的。而且會造成伺服器過載,因為它不會限制channel prefetch,是以隻是用在那些消費速度很快的場景。
- 手動ack模式下:因為channel關閉或者其他異常,消息會自動入隊
Channel Prefetch Setting (QoS)
-
設定prefetch count,指定消費者一次接受多少消息處于進行中,隻有一個消息處理結束,才會再放入下一個消息。
prefetch count會影響吞吐量,通常100~300能提供較好吞吐量。
-
持久的消息,到達硬碟的時候,basic.ack就會傳回個publisher
和消費消息一樣,發送消息和等待ack異步的,兩者順序不完全相同。
channel設定global辨別,這裡是rabbitmq給的含義
true:basic.qos的值,應用給一個channel上所有的consumer。表示大家均攤這個數值。
false:basic.qos的值,應用給獨立的consumer
firehose 消防水帶特性
開啟的話會影響性能,因為有額外的消息被産生以及路由
從exchange進入的消息,都會進入amq.rabbitmq.trace 這個預設exchange,需要自己取消費這個exchange。同時這個消息的頭會放入消息的來龍去脈。可以做備份吧。
消息頭如下:
headers: channel: 1
connection: [email protected]
exchange_name: priority.ex
node: [email protected]
properties: delivery_mode: 1
headers:
routed_queues: priority.q
routing_keys:
user: guest
vhost: /
申明
如需轉載,需要申明來源