天天看點

AMQP協定學習

作者:閃念基因
AMQP協定學習

最近在閱讀AMQP協定。AMQP協定算是消息隊列裡無法繞開的一個協定,通過閱讀該協定來學習消息隊列以及自有協定設計。該協定的閱讀體驗非常好,協定本身沒有過于複雜,規範裡也會解釋各個地方的設計思路。

該文章是基于AMQP 0.9.1編寫,AMQP 0.9.1規範釋出于2008年。是以後面提到的很多特性在當時是先進的,但放到現在可能就習以為常了。比如Kafka于2011年釋出第一版,RTMP協定1.0版本于2012年釋出,HTTP/2于2015年釋出,

AMQP協定中的各個概念群組件

AMQP的全稱為:Advanced Message Queuing Protocol(進階消息隊列協定)

AMQP所覆寫的内容包含網絡協定以及服務端服務

  • 一套被稱作”進階消息隊列協定模型(AMQ Model)“的消息能力定義。該模型涵蓋Broker服務中用于路由和存儲消息的元件,以及把這些元件連在一起的規則。
  • 一個網絡層協定AMQP。能夠讓用戶端程式與實作了AMQ Model的服務端進行通信。

AMQP像是一個把東西連在一起的語言,而不是一個系統。其設計目标是:讓服務端可通過協定程式設計。了解了AMQP的這個設計目标,也就能夠了解其協定的設計思路了。

AMQP協定是一個二進制協定,具有一些現代特性:多通道(multi-channel),可協商(negotiated),異步、安全、便攜、語言中立、高效的。其協定主要分成兩層:

AMQP協定學習

功能層(Functional Layer):定義了一系列的指令

傳輸層(Transport Layer):攜帶了從應用 → 服務端的方法,用于處理多路複用、分幀、編碼、心跳、data-representation、錯誤處理。

這樣分層之後,可以把傳輸層替換為其它傳輸協定,而不需要修改功能層。同樣,也可以使用同樣的傳輸層,基于此實作不同的上層協定。可能RabbitMQ也是因為類似的原因,能夠比較容易的支援MQTT、STOMP等協定的吧。

AMQ Model的設計是由以下需求驅動的:

  • 確定符合标準的實作之間的互操作性。
  • 提供清晰且直接的方式控制QoS
  • 保持一緻和明确的命名
  • 通過協定能夠修改服務端的各種配置
  • 使用可以輕松映射到應用程式級API的指令符号
  • 清晰,每個操作隻能做一件事。

AMQP傳輸層是由以下需求驅動的

  • 緊湊。能夠快速封包和解包
  • 可以攜帶任意大小的消息,沒有明顯的限制
  • 同一個連接配接可以承載多個通道(Channel)
  • 長時間存活,沒有顯著的限制
  • 允許異步指令流水線
  • 容易擴充。易于處理新需求、或者變更需求
  • 向前相容
  • 使用強大的斷言模型,可修複
  • 對程式設計語言保持中立
  • 适合代碼生成過程

在設計過程中,希望能夠支援不同的消息架構:

  • 先存後發模型。有多個Writer,隻有一個Reader
  • 分散工作負載。有多個Writer和多個Reader
  • 釋出訂閱模型,多個Writer和多個reader
  • 基于消息内容的路由,多個Writer,多個Reader
  • 隊列檔案傳輸,多個Writer,多個Reader
  • 兩個節點之間點對點連接配接
  • 市場資料(Market data)分發。多個資料源,多個Reader

AMQ Model

主要包含了三個主要的元件:

  • exchange(交換器):從Publisher程式中收取消息,并把這些消息根據一些規則路由到消息隊列(Message Queue)中
  • message queue(消息隊列):存儲消息。直到消息被安全的投遞給了消費者。
  • binding:定義了 message queue 和 exchange 之間的關系,提供了消息路由的規則。

AMQ Model的整體架構

AMQP協定學習

可以把AMQP的架構了解為一個郵件服務:

  • 一個AMQP消息類似于一封郵件資訊
  • 消息隊列類似于一個郵箱(Mailbox)
  • 消費者類似一個郵件用戶端,能夠拉取和删除郵件。
  • 交換器類似一個MTA(郵件伺服器)。檢查郵件,基于郵件裡的路由資訊、路由表,來決定如何把郵件發送到一個或多個郵箱裡。
  • Routing Key類似于郵件中的To:,Cc:, Bcc: 的位址。不包含服務端資訊。
  • 每一個交換器執行個體,類似于各個MTA程序。用于處理不同子域名的郵件,或者特定類型的郵件。
  • Binding 類似于MTA中的路由表。

在AMQP裡,生産者直接把消息發到服務端,服務端再把這些消息路由到郵箱中。消費者直接從郵箱裡取消息。但在AMQP之前的很多中間件中,釋出者是把消息直接發到對應的郵箱裡(類似于存儲釋出隊列),或者直接發到郵件清單裡(類似topic訂閱)。

這裡的主要差別在于,使用者可以控制消息隊列和交換器的綁定規則,而不是依賴中間件自身的代碼。這樣就可以做很多有趣的事情。比如定義一個這樣的規則:把所有包含這樣和這樣Header的消息,都複制一份到這個消息隊列中。“

而這一點也是我認為AMQP和其他一些消息隊列最重要的差異。

生命周期

消息的生命周期

AMQP協定學習
  • 消息由生産者産生。生産者把内容放到消息裡,并設定一些屬性以及消息的路由。然後生産者把消息發給服務端。
  • 服務端收到消息,交換器(大部分情況)把消息路由到若幹個該伺服器上的消息隊列中。如果這個消息找不到路由,則會丢棄或者退回給生産者(生産者可自行決定)。
  • 一條消息可以存在于許多消息隊列中。伺服器可以通過複制消息,引用計數等方式來實作。這不會影響互操作性。但是,将一條消息路由到多個消息隊列時,每個消息隊列上的消息都是相同的。沒有可以區分各種副本的唯一辨別符。
  • 消息到達消息隊列。消息隊列會立即嘗試通過AMQP将其傳遞給消費者。如果做不到,消息隊列将消息存儲(按生産者的要求存儲在記憶體中或磁盤上),并等待消費者準備就緒。如果沒有消費者,則消息隊列可以通過AMQP将消息傳回給生産者(同樣,如果生産者要求這樣做)。
  • 當消息隊列可以将消息傳遞給消費者時,它将消息從其内部緩沖區中删除。 可以立即删除,也可以在使用者确認其已成功處理消息之後删除(ack)。 由消費者選擇“确認”消息的方式和時間。消費者也可以拒絕消息(否定确認)。
  • 生産者發消息與消費者确認,被分組成一個事務。當一個應用同時扮演多個角色時:發消息,發ack,commit或者復原事務。消息從服務端投遞給消費者這個過程不是事務的。消費者對消息進行确認就夠了。

在這個過程中,生産者隻能把所有消息發到一個單點(交換器),而不能直接把消息發到某個消息隊列(message-queue)中。

交換器(exchange)的生命周期

每個AMQP服務端都會自己建立一些交換器,這些不能被銷毀。AMQP程式也可以建立其自己的交換器。AMQP并不使用 create 這個方法,而是使用 declare 方法來表示:如果不存在,則建立,存在了則繼續。程式可以建立交換器用于私有使用,并在任務完成後銷毀它們。雖然AMQP提供了銷毀交換器的方法,但一般來講程式不需要銷毀它。

隊列(queue)的生命周期

隊列分為兩種,

  • 持久化消息隊列:由很多消費者共享。當消費者都退出後,隊列依然存在,并會繼續收集消息。
  • 臨時消息隊列:臨時消息隊列對于消費者是私有和綁定的。當消費者斷開連接配接,則消息隊列被删除。
AMQP協定學習

臨時消息隊列的生命周期

綁定(Bindings)

綁定是交換器和消息隊列之間的關系,告訴交換器如何路由消息。

// 綁定指令的僞代碼
Queue.Bind <queue> TO <exchange> WHERE <condition>           

幾個經典的使用案例:共享隊列、私有的回複隊列、釋出-訂閱。

構造一個共享隊列

Queue.Declare queue=app.svc01 // 聲明一個叫做 app.svc01 的隊列

// Comsumer
Basic.Consume queue=app.svc01 // 消費者消費該隊列

// Producer
Basic.Publish routing-key=app.svc01 
// 生産者釋出消息。routingKey為隊列名稱           
AMQP協定學習

https://www.rabbitmq.com/tutorials/tutorial-two-python.html

構造一個私有回複隊列

一般來講,回複隊列是私有的、臨時的、由服務端命名、隻有一個消費者。(沒有直接使用AMQP協定中的例子,而是使用了RabbitMQ的例子)

Queue.Declare queue=rpc_queue // 調用的隊列

// Server
Basic.Consume queue=rpc_queue

// Client
Queue.Declare queue=<empty> exclusive=TRUE

S:Queue.Declare-Ok queue=amq.gen-X
... // AMQP服務端告訴隊列名稱
Basic.Publish queue=rpc_queue reply_to=amq_gen-X... // 用戶端向服務端發送請求

// Server
handleMessage()
// 服務端處理好消息後,向消息列的reply-to字段中的隊列發送響應
Basic.Publish exchange=<empty> routing-key={message.replay_to}           
AMQP協定學習

https://www.rabbitmq.com/tutorials/tutorial-six-python.html

構造一個釋出-訂閱隊列

在傳統的中間件中,術語 subscription 含糊不清。至少包含兩個概念:比對消息的條件集,和一個臨時隊列用于存放比對的消息。AMQP把這兩部分拆成:binding和message queus。在AMQP中,并沒有一個實體叫做 subscription

AMQP的釋出訂閱模型為:

  • 給一個消費者保留消息(一些場景下是多個消費者)
  • 從多個源收集消息,比如比對Topic、消息的字段、或者内容等方式

訂閱隊列與命名隊列或回複隊列之間的關鍵差別在于,訂閱隊列名稱與路由目的無關,并且路由是根據抽象的比對條件完成的,而不是路由鍵字段的一對一比對。

// Consumer
Queue.Declare queue=<empty> exclusive=TRUE
// 這裡是使用服務端下發的隊列名稱,并設定為獨占。
// 也可以使用約定的隊列名稱。這樣就相當于把釋出-訂閱模型與共享隊列組合使用了

S:Queue.Declare-Ok queue=tmp.2

Queue.Bind queue=
tmp.2 
TO exchange=amq.topic WHERE routing-key=*.orange.*
Basic.Consume queue=t
mp.2


// Producer
Basic.Publish exchange=amq.topic routing-key=quick.orange.rabbit           
AMQP協定學習

https://www.rabbitmq.com/tutorials/tutorial-five-python.html

AMQP指令架構

中間件複雜度很高,是以設計協定時的挑戰是要馴服其複雜性。AMQP采用方法是基于類來建立傳統API模型。類中包含方法,并定義了方法明确應該做什麼。

AMQP中有兩種不同的方式進行對話:

  • 同步請求-響應。一個節點發送請求,另一個階段發送響應。适用于性能不重要的方法。發送同步請求時,該節點直到收到回複後,才能發送下一個請求
  • 異步通知。一個節點發送資料,但是不期待回複。一般用于性能很重要的地方。異步請求會盡可能快的發送消息,不等待确認。隻在需要的時候在更上層(比如消費者層)實作限流等功能。AMQP中可以沒有确認,要麼成功,要麼就會收到關閉Channel或者連接配接的異常。如果需要明确的追蹤成功或者失敗,那麼應該使用事務。

AMQP中的類

Connection類

AMQP是一個長連接配接協定。Connection被設計為長期使用的,可以攜帶多個Channel。Connection的生命周期是:

  1. 用戶端打開到服務端的TCP/IP連接配接,發送協定頭。這是用戶端發送的資料裡,唯一不能被解析為方法的資料。
  2. 服務端傳回其協定版本、屬性(比如支援的安全機制清單)。 the Start method
  3. 用戶端選擇安全機制 Start-Ok
  4. 服務端開始認證過程, 它使用SASL的質詢-響應模型(challenge-response model)。它向用戶端發送一個質詢 Secure
  5. 用戶端向服務端發送一個認證響應Secure-Ok。比如,如果使用 plain 認證機制,則響應會包含登入名和密碼
  6. 用戶端重複質詢Secure或轉到協商步驟,發送一系列參數,如最大幀大小 Tune
  7. 用戶端接受,或者調低這些參數 Tune-Ok
  8. 用戶端正式打開連接配接,并選擇一個Vhost Open
  9. 服務端确認VHost有效 Open-Ok
  10. 用戶端可以按照預期使用連接配接
  11. 當一個節點打算結束連接配接 Close
  12. 另一個節點需要結束握手 Close-Ok
  13. 服務端和用戶端關閉Socket連接配接。

如果在發送或者收到 Open 或者 Open-Ok 之前,某一個節點發現了一個錯誤,則必須直接關閉Socket,且不發送任何資料。

Channel類

AMQP是一個多通道協定。Channel提供了一種方式,在比較重的TCP/IP連接配接上建立多個輕量級的連接配接。這會讓協定對防火牆更加友好,因為端口使用是可預知的。它也意味着很容易支援流量調整和其他QoS特性。

Channels互相是獨立的,可以同步執行不同的功能。可用帶寬會在目前活動之間共享。

這裡期望也鼓勵多線程用戶端程式應該使用 每個線程一個channel 的模型。不過,一個用戶端在一個或多個AMQP服務端上打開多個連接配接也是可以的。

Channel的生命周期為:

  1. 用戶端打開一個新通道 Open
  2. 服務端确認新通道準備就緒 Open-Ok
  3. 用戶端和服務端按預期來使用通道.
  4. 一個節點關閉了通道 Close
  5. 另一個節點對通道關閉進行握手 Close-Ok

Exchange類

Exchange類能夠讓應用操作服務端的交換器。這個類能夠讓程式自己設定路由,而不是通過某些配置。不過大部分程式并不需要這個級别的複雜度,過去的中間件也不隻支援這個語義。

Exchange的生命周期為:

  1. 用戶端讓服務端確定該exchange存在Declare。用戶端可以細化為:“如果交換器不存在則進行建立” 或 “如果交換器不存在,警告我,不需要建立”
  2. 用戶端向Exchange發消息
  3. 用戶端也可以選擇删掉Exchange Delete

Queue類

該類用于讓程式管理服務端上的消息隊列。幾乎所有的消費者應用都是基本步驟,至少要驗證使用的消息隊列是否存在。

一個持久化消息隊列的生命周期非常簡單

  1. 用戶端斷言這個消息隊列存在 Declare(設定 passive 參數)
  2. 服務端确認消息隊列存在 Declare-Ok
  3. 用戶端消息隊列中讀消息

一個臨時消息隊列的生命周期會更有趣些:

  1. 用戶端建立消息隊列 Declare(不提供隊列名稱,伺服器會配置設定一個名稱)。服務端确認 Declare-Ok
  2. 用戶端在消息隊列上啟動一個消費者
  3. 用戶端取消消費,可以是顯示取消,也可以是通過關閉通道或者連接配接連接配接隐式取消的
  4. 當最後一個消費者從消息隊列中消失的時候,在過了禮貌性逾時後,服務端會删除消息隊列

AMQP實作了Topic訂閱的分發模型。這可以讓訂閱在合作的訂閱者間進行負載均衡。涉及到額外的綁定階段的生命周期:

  1. 用戶端建立一個隊列Declare,服務端确認Declare-Ok
  2. 用戶端綁定消息隊列到一個topic exchange上Bind,服務端确認Bind-Ok
  3. 用戶端像之前一樣使用消息隊列。

Basic類

Basic實作本規範中描述的消息功能。支援如下語義:

  1. 從用戶端→服務端發消息。異步Publish
  2. 開始或者停止消費Consume,Cancel
  3. 從服務端到用戶端發消息。異步Deliver,Return
  4. 确認消息Ack,Reject
  5. 同步的從消息隊列中讀取消息Get

事務類:

AMQP支援兩種類型的事務:

  1. 自動事務。每個釋出的消息和應答都處理為獨立事務.
  2. 服務端本地事務:伺服器會緩存釋出的消息和應答,并會根據需要由client來送出它們.

Transaction 類(“tx”) 使應用程式可通路第二種類型,即伺服器事務。這個類的語義是:

  1. 應用程式要求服務端事務,在需要的每個channel裡Select
  2. 應用程式做一些工作Publish, Ack
  3. 應用程式送出或復原工作Commit,Roll-back
  4. 應用程式正常工作,循環往複。

事務包含釋出消息和ack,不包含分發。是以,復原并不能重入隊列或者重新分發任何消息。用戶端有權在事務中确認這些消息。

功能說明

AMQP的功能描述,一定程度上也是RabbitMQ的功能描述,不過RabbitMQ基于AMQP做了一些擴充

消息和内容

消息會攜帶一些屬性,以及具體内容(二進制資料)

消息是可被持久化的。持久化消息是可以安全的存在硬碟上的,即使發生了驗證的網絡錯誤、服務端崩潰溢出等情況,也可以確定被投遞。

消息可以有優先級。同一個隊列中,高優先級的消息會比低優先級的消息先被發送。當消息需要被丢棄時(比如服務端記憶體不足等),将會優先丢棄低優先級消息

服務端一定不能修改消息的内容。但服務端可能會在消息頭上添加一些屬性,但一定不會移除或者修改已經存在的屬性。

虛拟主機(VHost)

虛拟主機是服務端的一個資料分區。在多租戶使用時,可以友善進行管理。

虛拟主機有自己的命名空間、交換器、消息隊列等等。所有連接配接,隻可能和一個虛拟主機建立。

交換器(Exchange)

交換器是一個虛拟主機内的消息路由Agent。用于處理消息的路由資訊(一般是Routing-Key),然後将其發送到消息隊列或者内部服務中。交換器可能是持久化的、臨時的、自動删除的。交換器把消息路由到消息隊列時可以是并行的。這會建立一個消息的多個執行個體。

Direct 交換器

AMQP協定學習
  1. 一個消息隊列使用RoutingKey K 綁定到交換器
  2. 生産者向交換器發送RoutingKey為R的消息
  3. 當 K=R時,消息被轉發到該消息隊列中

Fanout 交換器

AMQP協定學習
  1. 一個消息隊列沒有使用任何參數綁定交換器
  2. 生産者向交換器發了一條消息
  3. 這個消息無條件的發送到該消息隊列

Topic 交換器

  1. 消息隊列使用路由規則 P 綁定到交換器
  2. 生産者使用RoutingKey R 發送消息到交換器
  3. 如果R 能夠比對 P,則把消息發到該消息隊列。

RoutingKey必須由若幹個被點.分隔的單詞組成。每個單詞隻能包含字母和數字。其中 * 比對一個單詞,# 比對0個或者多個單詞。比如 *.stock.# 比對 usd.stock 和 eur.stock.db 但是不比對 stock.nasdaq

Headers 交換器

  1. 消息隊列使用Header的參數表來綁定。不适用RoutingKey
  2. 生産者向交換器發送消息,Header中包含了指定的鍵值對
  3. 如果比對,則傳給消息隊列。

比如:

format=json,type=log,x-match=all
format=line,type=log,x-match=any           

如果 x-match 為all,則必須都比對才行。如果x-match為any,則有任意一個header比對即可。

系統交換器

這個平時應該用不到,這裡略過。感興趣的可以直接檢視AMQP0.9.1的3.1.3.5章節。

AMQP的傳輸架構

解釋了指令如何映射到傳輸層的。在設計自有協定時,可以參考一下它的設計思路,以及中間需要注意的問題。

AMQP是一個二進制協定。有不同類型的幀frame 構成。幀會攜帶協定的方法以及其他資訊。所有的幀都有相同的基本結構,即:幀頭,payload,幀尾。payload格式取決于幀的類型。

我們假設使用的是面向流的可靠網絡層(比如TCP/IP)。單個Socket連接配接上可以有多個獨立的控制線程,也就是通道Channel。不同的通道共享一個連接配接,每個通道上的幀都是按嚴格的順序排列,這樣可以用一個狀态機來解析協定。

傳輸層(wire-level)的格式被設計為擴充性強、且足夠通用,可以用于任何更高層的協定(不僅僅是AMQP)。我們假設AMQP是會被擴充、優化的。

主要涉及這幾個部分:資料類型、協定協商、分幀方式、幀細節、方法幀、内容幀、心跳幀、錯誤處理、通道與連接配接的關閉。

資料類型

AMQP的資料類型用于方法幀中,他們有

  • 整數(1-8個位元組),表示大小,數量,範圍等。全都是無符号整數
  • Bits。用于表示為開/關值,會被封包為位元組。
  • 短字元串。用于存放短的文本屬性。最多255位元組,解析時不用擔心緩沖區溢出。
  • 長字元串:用于存放二進制資料塊
  • 字段表(Field Table),用于存放鍵值對

協定協商

用戶端連接配接時,和服務端協商可接受的配置。當兩個節點達成一緻後,連接配接才能繼續使用。通過協商,可以讓我們斷言假設和前提條件。主要協商這幾方面的資訊

  • 實作的協定和版本。服務端可能會在同一端口提供多種協定的支援
  • 加密參數和驗證
  • 最大幀尺寸、Channel的數量、某些操作的限制。

如果協商達成一緻,雙方會根據協商預配置設定緩沖區避免死鎖。傳入的幀如果滿足協商條件,則認為其實安全的。如果超過了,那麼另一方必須斷開連接配接。

分幀方式

TCP/IP是流協定。沒有内置的分幀機制。現有的協定一般有這幾種方式進行分幀:

  • 每個連接配接隻發送一個幀。簡單,但是慢。
  • 在流中加入分隔符來分幀。簡單,但是解析較慢(因為需要不斷的讀取,去尋找分隔符)
  • 計算幀的尺寸,并在每個幀之前發送尺寸。簡單且快速。也是AMQP的選擇

幀細節

幀頭包括:幀類型、通道、尺寸。幀尾包含錯誤檢測資訊。

AMQP協定學習

處理一個幀的步驟:

  1. 讀幀頭,檢查幀類型和Channel
  2. 根據幀類型,讀取payload并處理
  3. 讀幀尾校驗

在實作時,性能很重要的時候,我們會使用 read-ahead buffering 或者 gathering reads 去避免讀幀時進行三次系統調用。

方法幀

AMQP協定學習

處理方式:

  1. 讀取方法幀的payload
  2. 解包為結構
  3. 檢查方法在目前上下文中是否允許
  4. 檢查參數是否有效
  5. 執行方法。

方法幀是由AMQP資料字段組成。編碼代碼可以直接從協定規範中生成,速度非常快。

内容幀

内容是端到端直接發送的應用資料。内容由一系列屬性和二進制資料組成。其中一系列的屬性組成了 ”内容幀的幀頭“。而二進制資料,可以是任意大小,它可能被拆分成多個塊發送,每個塊是一個 content-body幀

一些方法(比如 Basic.Publish,Basic.Deliver)是會攜帶内容的。一個内容幀的幀頭如下結構:

AMQP協定學習

這裡把 content-body 作為單獨的幀,這樣就可以支援Zero-copy技術,這部分内容就不需要被編碼。把内容屬性放到自己的幀裡,這樣收件人就可以選擇性的丢棄不想處理的内容。

通道與連接配接的關閉

對于用戶端,隻要發送了 Open 就認為連接配接和通道是打開的。對于服務端則是Open-Ok。如果一個節點想要關閉通道和連接配接必須要進行握手。

如果突然或者意外關閉,沒辦法立刻被檢測到,可能會導緻丢失傳回值。是以需要在關閉之前進行握手。在一個節點發送 Close 後,另一個節點必須發送 Close-Ok 來回複。然後雙方可以關閉通道或者連接配接。如果節點忽略了 Close 操作,當雙方同時發送 Close 時,可能會導緻死鎖。

最後,更多細節可以檢視AMQP的官方規範,以及RabbitMQ在其基礎上擴充的其它特性。

參考連結

  • AMQP 0.9.1 規範:https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf
  • AMQP 0.9.1 完整的中文翻譯:http://www.blogjava.net/qbna350816/archive/2016/08/12/431554.html
  • RabbitMQ 對于 AMQP 0.9.1 的勘誤:https://www.rabbitmq.com/amqp-0-9-1-errata.html
  • RabbitMQ 對與 AMQP 0.9.1 的擴充:https://www.rabbitmq.com/extensions.html
  • AMQP 1.0 最終版:http://www.amqp.org/specification/1.0/amqp-org-download

作者:0neSe7en

來源-微信公衆号:即刻技術團隊

出處:https://mp.weixin.qq.com/s/7HIXUSq-l1DzWEUScwPYAg

繼續閱讀