最近在閱讀AMQP協定。AMQP協定算是消息隊列裡無法繞開的一個協定,通過閱讀該協定來學習消息隊列以及自有協定設計。該協定的閱讀體驗非常好,協定本身沒有過于複雜,規範裡也會解釋各個地方的設計思路。
該文章是基于AMQP 0.9.1編寫,AMQP 0.9.1規範釋出于2008年。是以後面提到的很多特性在當時是先進的,但放到現在可能就習以為常了。比如Kafka于2011年釋出第一版,RTMP協定1.0版本于2012年釋出,HTTP/2于2015年釋出,
AMQP協定中的各個概念群組件
AMQP的全稱為:Advanced Message Queuing Protocol(進階消息隊列協定)
AMQP所覆寫的内容包含網絡協定以及服務端服務
- 一套被稱作”進階消息隊列協定模型(AMQ Model)“的消息能力定義。該模型涵蓋Broker服務中用于路由和存儲消息的元件,以及把這些元件連在一起的規則。
- 一個網絡層協定AMQP。能夠讓用戶端程式與實作了AMQ Model的服務端進行通信。
AMQP像是一個把東西連在一起的語言,而不是一個系統。其設計目标是:讓服務端可通過協定程式設計。了解了AMQP的這個設計目标,也就能夠了解其協定的設計思路了。
AMQP協定是一個二進制協定,具有一些現代特性:多通道(multi-channel),可協商(negotiated),異步、安全、便攜、語言中立、高效的。其協定主要分成兩層:
功能層(Functional Layer):定義了一系列的指令
傳輸層(Transport Layer):攜帶了從應用 → 服務端的方法,用于處理多路複用、分幀、編碼、心跳、data-representation、錯誤處理。
這樣分層之後,可以把傳輸層替換為其它傳輸協定,而不需要修改功能層。同樣,也可以使用同樣的傳輸層,基于此實作不同的上層協定。可能RabbitMQ也是因為類似的原因,能夠比較容易的支援MQTT、STOMP等協定的吧。
AMQ Model的設計是由以下需求驅動的:
- 確定符合标準的實作之間的互操作性。
- 提供清晰且直接的方式控制QoS
- 保持一緻和明确的命名
- 通過協定能夠修改服務端的各種配置
- 使用可以輕松映射到應用程式級API的指令符号
- 清晰,每個操作隻能做一件事。
AMQP傳輸層是由以下需求驅動的
- 緊湊。能夠快速封包和解包
- 可以攜帶任意大小的消息,沒有明顯的限制
- 同一個連接配接可以承載多個通道(Channel)
- 長時間存活,沒有顯著的限制
- 允許異步指令流水線
- 容易擴充。易于處理新需求、或者變更需求
- 向前相容
- 使用強大的斷言模型,可修複
- 對程式設計語言保持中立
- 适合代碼生成過程
在設計過程中,希望能夠支援不同的消息架構:
- 先存後發模型。有多個Writer,隻有一個Reader
- 分散工作負載。有多個Writer和多個Reader
- 釋出訂閱模型,多個Writer和多個reader
- 基于消息内容的路由,多個Writer,多個Reader
- 隊列檔案傳輸,多個Writer,多個Reader
- 兩個節點之間點對點連接配接
- 市場資料(Market data)分發。多個資料源,多個Reader
AMQ Model
主要包含了三個主要的元件:
- exchange(交換器):從Publisher程式中收取消息,并把這些消息根據一些規則路由到消息隊列(Message Queue)中
- message queue(消息隊列):存儲消息。直到消息被安全的投遞給了消費者。
- binding:定義了 message queue 和 exchange 之間的關系,提供了消息路由的規則。
AMQ Model的整體架構
可以把AMQP的架構了解為一個郵件服務:
- 一個AMQP消息類似于一封郵件資訊
- 消息隊列類似于一個郵箱(Mailbox)
- 消費者類似一個郵件用戶端,能夠拉取和删除郵件。
- 交換器類似一個MTA(郵件伺服器)。檢查郵件,基于郵件裡的路由資訊、路由表,來決定如何把郵件發送到一個或多個郵箱裡。
- Routing Key類似于郵件中的To:,Cc:, Bcc: 的位址。不包含服務端資訊。
- 每一個交換器執行個體,類似于各個MTA程序。用于處理不同子域名的郵件,或者特定類型的郵件。
- Binding 類似于MTA中的路由表。
在AMQP裡,生産者直接把消息發到服務端,服務端再把這些消息路由到郵箱中。消費者直接從郵箱裡取消息。但在AMQP之前的很多中間件中,釋出者是把消息直接發到對應的郵箱裡(類似于存儲釋出隊列),或者直接發到郵件清單裡(類似topic訂閱)。
這裡的主要差別在于,使用者可以控制消息隊列和交換器的綁定規則,而不是依賴中間件自身的代碼。這樣就可以做很多有趣的事情。比如定義一個這樣的規則:把所有包含這樣和這樣Header的消息,都複制一份到這個消息隊列中。“
而這一點也是我認為AMQP和其他一些消息隊列最重要的差異。
生命周期
消息的生命周期
- 消息由生産者産生。生産者把内容放到消息裡,并設定一些屬性以及消息的路由。然後生産者把消息發給服務端。
- 服務端收到消息,交換器(大部分情況)把消息路由到若幹個該伺服器上的消息隊列中。如果這個消息找不到路由,則會丢棄或者退回給生産者(生産者可自行決定)。
- 一條消息可以存在于許多消息隊列中。伺服器可以通過複制消息,引用計數等方式來實作。這不會影響互操作性。但是,将一條消息路由到多個消息隊列時,每個消息隊列上的消息都是相同的。沒有可以區分各種副本的唯一辨別符。
- 消息到達消息隊列。消息隊列會立即嘗試通過AMQP将其傳遞給消費者。如果做不到,消息隊列将消息存儲(按生産者的要求存儲在記憶體中或磁盤上),并等待消費者準備就緒。如果沒有消費者,則消息隊列可以通過AMQP将消息傳回給生産者(同樣,如果生産者要求這樣做)。
- 當消息隊列可以将消息傳遞給消費者時,它将消息從其内部緩沖區中删除。 可以立即删除,也可以在使用者确認其已成功處理消息之後删除(ack)。 由消費者選擇“确認”消息的方式和時間。消費者也可以拒絕消息(否定确認)。
- 生産者發消息與消費者确認,被分組成一個事務。當一個應用同時扮演多個角色時:發消息,發ack,commit或者復原事務。消息從服務端投遞給消費者這個過程不是事務的。消費者對消息進行确認就夠了。
在這個過程中,生産者隻能把所有消息發到一個單點(交換器),而不能直接把消息發到某個消息隊列(message-queue)中。
交換器(exchange)的生命周期
每個AMQP服務端都會自己建立一些交換器,這些不能被銷毀。AMQP程式也可以建立其自己的交換器。AMQP并不使用 create 這個方法,而是使用 declare 方法來表示:如果不存在,則建立,存在了則繼續。程式可以建立交換器用于私有使用,并在任務完成後銷毀它們。雖然AMQP提供了銷毀交換器的方法,但一般來講程式不需要銷毀它。
隊列(queue)的生命周期
隊列分為兩種,
- 持久化消息隊列:由很多消費者共享。當消費者都退出後,隊列依然存在,并會繼續收集消息。
- 臨時消息隊列:臨時消息隊列對于消費者是私有和綁定的。當消費者斷開連接配接,則消息隊列被删除。
臨時消息隊列的生命周期
綁定(Bindings)
綁定是交換器和消息隊列之間的關系,告訴交換器如何路由消息。
// 綁定指令的僞代碼
Queue.Bind <queue> TO <exchange> WHERE <condition>
幾個經典的使用案例:共享隊列、私有的回複隊列、釋出-訂閱。
構造一個共享隊列
Queue.Declare queue=app.svc01 // 聲明一個叫做 app.svc01 的隊列
// Comsumer
Basic.Consume queue=app.svc01 // 消費者消費該隊列
// Producer
Basic.Publish routing-key=app.svc01
// 生産者釋出消息。routingKey為隊列名稱
https://www.rabbitmq.com/tutorials/tutorial-two-python.html
構造一個私有回複隊列
一般來講,回複隊列是私有的、臨時的、由服務端命名、隻有一個消費者。(沒有直接使用AMQP協定中的例子,而是使用了RabbitMQ的例子)
Queue.Declare queue=rpc_queue // 調用的隊列
// Server
Basic.Consume queue=rpc_queue
// Client
Queue.Declare queue=<empty> exclusive=TRUE
S:Queue.Declare-Ok queue=amq.gen-X
... // AMQP服務端告訴隊列名稱
Basic.Publish queue=rpc_queue reply_to=amq_gen-X... // 用戶端向服務端發送請求
// Server
handleMessage()
// 服務端處理好消息後,向消息列的reply-to字段中的隊列發送響應
Basic.Publish exchange=<empty> routing-key={message.replay_to}
https://www.rabbitmq.com/tutorials/tutorial-six-python.html
構造一個釋出-訂閱隊列
在傳統的中間件中,術語 subscription 含糊不清。至少包含兩個概念:比對消息的條件集,和一個臨時隊列用于存放比對的消息。AMQP把這兩部分拆成:binding和message queus。在AMQP中,并沒有一個實體叫做 subscription
AMQP的釋出訂閱模型為:
- 給一個消費者保留消息(一些場景下是多個消費者)
- 從多個源收集消息,比如比對Topic、消息的字段、或者内容等方式
訂閱隊列與命名隊列或回複隊列之間的關鍵差別在于,訂閱隊列名稱與路由目的無關,并且路由是根據抽象的比對條件完成的,而不是路由鍵字段的一對一比對。
// Consumer
Queue.Declare queue=<empty> exclusive=TRUE
// 這裡是使用服務端下發的隊列名稱,并設定為獨占。
// 也可以使用約定的隊列名稱。這樣就相當于把釋出-訂閱模型與共享隊列組合使用了
S:Queue.Declare-Ok queue=tmp.2
Queue.Bind queue=
tmp.2
TO exchange=amq.topic WHERE routing-key=*.orange.*
Basic.Consume queue=t
mp.2
// Producer
Basic.Publish exchange=amq.topic routing-key=quick.orange.rabbit
https://www.rabbitmq.com/tutorials/tutorial-five-python.html
AMQP指令架構
中間件複雜度很高,是以設計協定時的挑戰是要馴服其複雜性。AMQP采用方法是基于類來建立傳統API模型。類中包含方法,并定義了方法明确應該做什麼。
AMQP中有兩種不同的方式進行對話:
- 同步請求-響應。一個節點發送請求,另一個階段發送響應。适用于性能不重要的方法。發送同步請求時,該節點直到收到回複後,才能發送下一個請求
- 異步通知。一個節點發送資料,但是不期待回複。一般用于性能很重要的地方。異步請求會盡可能快的發送消息,不等待确認。隻在需要的時候在更上層(比如消費者層)實作限流等功能。AMQP中可以沒有确認,要麼成功,要麼就會收到關閉Channel或者連接配接的異常。如果需要明确的追蹤成功或者失敗,那麼應該使用事務。
AMQP中的類
Connection類
AMQP是一個長連接配接協定。Connection被設計為長期使用的,可以攜帶多個Channel。Connection的生命周期是:
- 用戶端打開到服務端的TCP/IP連接配接,發送協定頭。這是用戶端發送的資料裡,唯一不能被解析為方法的資料。
- 服務端傳回其協定版本、屬性(比如支援的安全機制清單)。 the Start method
- 用戶端選擇安全機制 Start-Ok
- 服務端開始認證過程, 它使用SASL的質詢-響應模型(challenge-response model)。它向用戶端發送一個質詢 Secure
- 用戶端向服務端發送一個認證響應Secure-Ok。比如,如果使用 plain 認證機制,則響應會包含登入名和密碼
- 用戶端重複質詢Secure或轉到協商步驟,發送一系列參數,如最大幀大小 Tune
- 用戶端接受,或者調低這些參數 Tune-Ok
- 用戶端正式打開連接配接,并選擇一個Vhost Open
- 服務端确認VHost有效 Open-Ok
- 用戶端可以按照預期使用連接配接
- 當一個節點打算結束連接配接 Close
- 另一個節點需要結束握手 Close-Ok
- 服務端和用戶端關閉Socket連接配接。
如果在發送或者收到 Open 或者 Open-Ok 之前,某一個節點發現了一個錯誤,則必須直接關閉Socket,且不發送任何資料。
Channel類
AMQP是一個多通道協定。Channel提供了一種方式,在比較重的TCP/IP連接配接上建立多個輕量級的連接配接。這會讓協定對防火牆更加友好,因為端口使用是可預知的。它也意味着很容易支援流量調整和其他QoS特性。
Channels互相是獨立的,可以同步執行不同的功能。可用帶寬會在目前活動之間共享。
這裡期望也鼓勵多線程用戶端程式應該使用 每個線程一個channel 的模型。不過,一個用戶端在一個或多個AMQP服務端上打開多個連接配接也是可以的。
Channel的生命周期為:
- 用戶端打開一個新通道 Open
- 服務端确認新通道準備就緒 Open-Ok
- 用戶端和服務端按預期來使用通道.
- 一個節點關閉了通道 Close
- 另一個節點對通道關閉進行握手 Close-Ok
Exchange類
Exchange類能夠讓應用操作服務端的交換器。這個類能夠讓程式自己設定路由,而不是通過某些配置。不過大部分程式并不需要這個級别的複雜度,過去的中間件也不隻支援這個語義。
Exchange的生命周期為:
- 用戶端讓服務端確定該exchange存在Declare。用戶端可以細化為:“如果交換器不存在則進行建立” 或 “如果交換器不存在,警告我,不需要建立”
- 用戶端向Exchange發消息
- 用戶端也可以選擇删掉Exchange Delete
Queue類
該類用于讓程式管理服務端上的消息隊列。幾乎所有的消費者應用都是基本步驟,至少要驗證使用的消息隊列是否存在。
一個持久化消息隊列的生命周期非常簡單
- 用戶端斷言這個消息隊列存在 Declare(設定 passive 參數)
- 服務端确認消息隊列存在 Declare-Ok
- 用戶端消息隊列中讀消息
一個臨時消息隊列的生命周期會更有趣些:
- 用戶端建立消息隊列 Declare(不提供隊列名稱,伺服器會配置設定一個名稱)。服務端确認 Declare-Ok
- 用戶端在消息隊列上啟動一個消費者
- 用戶端取消消費,可以是顯示取消,也可以是通過關閉通道或者連接配接連接配接隐式取消的
- 當最後一個消費者從消息隊列中消失的時候,在過了禮貌性逾時後,服務端會删除消息隊列
AMQP實作了Topic訂閱的分發模型。這可以讓訂閱在合作的訂閱者間進行負載均衡。涉及到額外的綁定階段的生命周期:
- 用戶端建立一個隊列Declare,服務端确認Declare-Ok
- 用戶端綁定消息隊列到一個topic exchange上Bind,服務端确認Bind-Ok
- 用戶端像之前一樣使用消息隊列。
Basic類
Basic實作本規範中描述的消息功能。支援如下語義:
- 從用戶端→服務端發消息。異步Publish
- 開始或者停止消費Consume,Cancel
- 從服務端到用戶端發消息。異步Deliver,Return
- 确認消息Ack,Reject
- 同步的從消息隊列中讀取消息Get
事務類:
AMQP支援兩種類型的事務:
- 自動事務。每個釋出的消息和應答都處理為獨立事務.
- 服務端本地事務:伺服器會緩存釋出的消息和應答,并會根據需要由client來送出它們.
Transaction 類(“tx”) 使應用程式可通路第二種類型,即伺服器事務。這個類的語義是:
- 應用程式要求服務端事務,在需要的每個channel裡Select
- 應用程式做一些工作Publish, Ack
- 應用程式送出或復原工作Commit,Roll-back
- 應用程式正常工作,循環往複。
事務包含釋出消息和ack,不包含分發。是以,復原并不能重入隊列或者重新分發任何消息。用戶端有權在事務中确認這些消息。
功能說明
AMQP的功能描述,一定程度上也是RabbitMQ的功能描述,不過RabbitMQ基于AMQP做了一些擴充
消息和内容
消息會攜帶一些屬性,以及具體内容(二進制資料)
消息是可被持久化的。持久化消息是可以安全的存在硬碟上的,即使發生了驗證的網絡錯誤、服務端崩潰溢出等情況,也可以確定被投遞。
消息可以有優先級。同一個隊列中,高優先級的消息會比低優先級的消息先被發送。當消息需要被丢棄時(比如服務端記憶體不足等),将會優先丢棄低優先級消息
服務端一定不能修改消息的内容。但服務端可能會在消息頭上添加一些屬性,但一定不會移除或者修改已經存在的屬性。
虛拟主機(VHost)
虛拟主機是服務端的一個資料分區。在多租戶使用時,可以友善進行管理。
虛拟主機有自己的命名空間、交換器、消息隊列等等。所有連接配接,隻可能和一個虛拟主機建立。
交換器(Exchange)
交換器是一個虛拟主機内的消息路由Agent。用于處理消息的路由資訊(一般是Routing-Key),然後将其發送到消息隊列或者内部服務中。交換器可能是持久化的、臨時的、自動删除的。交換器把消息路由到消息隊列時可以是并行的。這會建立一個消息的多個執行個體。
Direct 交換器
- 一個消息隊列使用RoutingKey K 綁定到交換器
- 生産者向交換器發送RoutingKey為R的消息
- 當 K=R時,消息被轉發到該消息隊列中
Fanout 交換器
- 一個消息隊列沒有使用任何參數綁定交換器
- 生産者向交換器發了一條消息
- 這個消息無條件的發送到該消息隊列
Topic 交換器
- 消息隊列使用路由規則 P 綁定到交換器
- 生産者使用RoutingKey R 發送消息到交換器
- 如果R 能夠比對 P,則把消息發到該消息隊列。
RoutingKey必須由若幹個被點.分隔的單詞組成。每個單詞隻能包含字母和數字。其中 * 比對一個單詞,# 比對0個或者多個單詞。比如 *.stock.# 比對 usd.stock 和 eur.stock.db 但是不比對 stock.nasdaq
Headers 交換器
- 消息隊列使用Header的參數表來綁定。不适用RoutingKey
- 生産者向交換器發送消息,Header中包含了指定的鍵值對
- 如果比對,則傳給消息隊列。
比如:
format=json,type=log,x-match=all
format=line,type=log,x-match=any
如果 x-match 為all,則必須都比對才行。如果x-match為any,則有任意一個header比對即可。
系統交換器
這個平時應該用不到,這裡略過。感興趣的可以直接檢視AMQP0.9.1的3.1.3.5章節。
AMQP的傳輸架構
解釋了指令如何映射到傳輸層的。在設計自有協定時,可以參考一下它的設計思路,以及中間需要注意的問題。
AMQP是一個二進制協定。有不同類型的幀frame 構成。幀會攜帶協定的方法以及其他資訊。所有的幀都有相同的基本結構,即:幀頭,payload,幀尾。payload格式取決于幀的類型。
我們假設使用的是面向流的可靠網絡層(比如TCP/IP)。單個Socket連接配接上可以有多個獨立的控制線程,也就是通道Channel。不同的通道共享一個連接配接,每個通道上的幀都是按嚴格的順序排列,這樣可以用一個狀态機來解析協定。
傳輸層(wire-level)的格式被設計為擴充性強、且足夠通用,可以用于任何更高層的協定(不僅僅是AMQP)。我們假設AMQP是會被擴充、優化的。
主要涉及這幾個部分:資料類型、協定協商、分幀方式、幀細節、方法幀、内容幀、心跳幀、錯誤處理、通道與連接配接的關閉。
資料類型
AMQP的資料類型用于方法幀中,他們有
- 整數(1-8個位元組),表示大小,數量,範圍等。全都是無符号整數
- Bits。用于表示為開/關值,會被封包為位元組。
- 短字元串。用于存放短的文本屬性。最多255位元組,解析時不用擔心緩沖區溢出。
- 長字元串:用于存放二進制資料塊
- 字段表(Field Table),用于存放鍵值對
協定協商
用戶端連接配接時,和服務端協商可接受的配置。當兩個節點達成一緻後,連接配接才能繼續使用。通過協商,可以讓我們斷言假設和前提條件。主要協商這幾方面的資訊
- 實作的協定和版本。服務端可能會在同一端口提供多種協定的支援
- 加密參數和驗證
- 最大幀尺寸、Channel的數量、某些操作的限制。
如果協商達成一緻,雙方會根據協商預配置設定緩沖區避免死鎖。傳入的幀如果滿足協商條件,則認為其實安全的。如果超過了,那麼另一方必須斷開連接配接。
分幀方式
TCP/IP是流協定。沒有内置的分幀機制。現有的協定一般有這幾種方式進行分幀:
- 每個連接配接隻發送一個幀。簡單,但是慢。
- 在流中加入分隔符來分幀。簡單,但是解析較慢(因為需要不斷的讀取,去尋找分隔符)
- 計算幀的尺寸,并在每個幀之前發送尺寸。簡單且快速。也是AMQP的選擇
幀細節
幀頭包括:幀類型、通道、尺寸。幀尾包含錯誤檢測資訊。
處理一個幀的步驟:
- 讀幀頭,檢查幀類型和Channel
- 根據幀類型,讀取payload并處理
- 讀幀尾校驗
在實作時,性能很重要的時候,我們會使用 read-ahead buffering 或者 gathering reads 去避免讀幀時進行三次系統調用。
方法幀
處理方式:
- 讀取方法幀的payload
- 解包為結構
- 檢查方法在目前上下文中是否允許
- 檢查參數是否有效
- 執行方法。
方法幀是由AMQP資料字段組成。編碼代碼可以直接從協定規範中生成,速度非常快。
内容幀
内容是端到端直接發送的應用資料。内容由一系列屬性和二進制資料組成。其中一系列的屬性組成了 ”内容幀的幀頭“。而二進制資料,可以是任意大小,它可能被拆分成多個塊發送,每個塊是一個 content-body幀
一些方法(比如 Basic.Publish,Basic.Deliver)是會攜帶内容的。一個内容幀的幀頭如下結構:
這裡把 content-body 作為單獨的幀,這樣就可以支援Zero-copy技術,這部分内容就不需要被編碼。把内容屬性放到自己的幀裡,這樣收件人就可以選擇性的丢棄不想處理的内容。
通道與連接配接的關閉
對于用戶端,隻要發送了 Open 就認為連接配接和通道是打開的。對于服務端則是Open-Ok。如果一個節點想要關閉通道和連接配接必須要進行握手。
如果突然或者意外關閉,沒辦法立刻被檢測到,可能會導緻丢失傳回值。是以需要在關閉之前進行握手。在一個節點發送 Close 後,另一個節點必須發送 Close-Ok 來回複。然後雙方可以關閉通道或者連接配接。如果節點忽略了 Close 操作,當雙方同時發送 Close 時,可能會導緻死鎖。
最後,更多細節可以檢視AMQP的官方規範,以及RabbitMQ在其基礎上擴充的其它特性。
參考連結
- AMQP 0.9.1 規範:https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf
- AMQP 0.9.1 完整的中文翻譯:http://www.blogjava.net/qbna350816/archive/2016/08/12/431554.html
- RabbitMQ 對于 AMQP 0.9.1 的勘誤:https://www.rabbitmq.com/amqp-0-9-1-errata.html
- RabbitMQ 對與 AMQP 0.9.1 的擴充:https://www.rabbitmq.com/extensions.html
- AMQP 1.0 最終版:http://www.amqp.org/specification/1.0/amqp-org-download
作者:0neSe7en
來源-微信公衆号:即刻技術團隊
出處:https://mp.weixin.qq.com/s/7HIXUSq-l1DzWEUScwPYAg