天天看點

消息隊列探秘 – RabbitMQ 消息隊列介紹

來源:高廣超,

1. 曆史

rabbitmq是一個由erlang開發的amqp(advanced message queue )的開源實作。amqp 的出現其實也是應了廣大人民群衆的需求,雖然在同步消息通訊的世界裡有很多公開标準(如 cobar的 iiop ,或者是 soap 等),但是在異步消息進行中卻不是這樣,隻有大企業有一些商業實作(如微軟的 msmq ,ibm 的 websphere mq 等),是以,在 2006 年的 6 月,cisco 、redhat、imatix 等聯合制定了 amqp 的公開标準。

rabbitmq是由rabbitmq technologies ltd開發并且提供商業支援的。該公司在2010年4月被springsource(vmware的一個部門)收購。在2013年5月被并入pivotal。其實vmware,pivotal和emc本質上是一家的。不同的是vmware是獨立上市子公司,而pivotal是整合了emc的某些資源,現在并沒有上市。

rabbitmq的官網是http://www.rabbitmq.com

2. 應用場景

言歸正傳。rabbitmq,或者說amqp解決了什麼問題,或者說它的應用場景是什麼?

對于一個大型的軟體系統來說,它會有很多的元件或者說子產品或者說子系統或者(subsystem or component or submodule)。那麼這些子產品的如何通信?這和傳統的ipc有很大的差別。傳統的ipc很多都是在單一系統上的,子產品耦合性很大,不适合擴充(scalability);如果使用socket那麼不同的子產品的确可以部署到不同的機器上,但是還是有很多問題需要解決。比如:

1)資訊的發送者和接收者如何維持這個連接配接,如果一方的連接配接中斷,這期間的資料如何方式丢失?

2)如何降低發送者和接收者的耦合度?

3)如何讓priority高的接收者先接到資料?

4)如何做到load balance?有效均衡接收者的負載?

5)如何有效的将資料發送到相關的接收者?也就是說将接收者subscribe 不同的資料,如何做有效的filter。

6)如何做到可擴充,甚至将這個通信子產品發到cluster上?

7)如何保證接收者接收到了完整,正确的資料?

amdq協定解決了以上的問題,而rabbitmq實作了amqp。

3. 系統架構

消息隊列探秘 – RabbitMQ 消息隊列介紹

rabbitmq server:也叫broker server,它不是運送食物的卡車,而是一種傳輸服務。原話是rabbitmqisn’t a food truck, it’s a delivery service. 他的角色就是維護一條從producer到consumer的路線,保證資料能夠按照指定的方式進行傳輸。但是這個保證也不是100%的保證,但是對于普通的應用來說這已經足夠了。當然對于商業系統來說,可以再做一層資料一緻性的guard,就可以徹底保證系統的一緻性了。

client p:也叫producer,資料的發送方。createmessages and publish (send) them to a broker server (rabbitmq).一個message有兩個部分:payload(有效載荷)和label(标簽)。payload顧名思義就是傳輸的資料。label是exchange的名字或者說是一個tag,它描述了payload,而且rabbitmq也是通過這個label來決定把這個message發給哪個consumer。amqp僅僅描述了label,而rabbitmq決定了如何使用這個label的規則。

client c: 也叫consumer,資料的接收方。consumersattach to a broker server (rabbitmq) and subscribe to a queue。把queue比作是一個有名字的郵箱。當有message到達某個郵箱後,rabbitmq把它發送給它的某個訂閱者即consumer。當然可能會把同一個message發送給很多的consumer。在這個message中,隻有payload,label已經被删掉了。對于consumer來說,它是不知道誰發送的這個資訊的。就是協定本身不支援。但是當然了如果producer發送的payload包含了producer的資訊就另當别論了。

對于一個資料從producer到consumer的正确傳遞,還有三個概念需要明确:exchanges, queues and bindings。

exchanges are where producers publish their messages.

queues are where the messages end up and are received by consumers

bindings are how the messages get routed from the exchange to particular queues.

還有幾個概念是上述圖中沒有标明的,那就是connection(連接配接),channel(通道,頻道)。

connection:就是一個tcp的連接配接。producer和consumer都是通過tcp連接配接到rabbitmq server的。以後我們可以看到,程式的起始處就是建立這個tcp連接配接。

channels:虛拟連接配接。它建立在上述的tcp連接配接中。資料流動都是在channel中進行的。也就是說,一般情況是程式起始建立tcp連接配接,第二步就是建立這個channel。

那麼,為什麼使用channel,而不是直接使用tcp連接配接?

對于os來說,建立和關閉tcp連接配接是有代價的,頻繁的建立關閉tcp連接配接對于系統的性能有很大的影響,而且tcp的連接配接數也有限制,這也限制了系統處理高并發的能力。但是,在tcp連接配接中建立channel是沒有上述代價的。對于producer或者consumer來說,可以并發的使用多個channel進行publish或者receive。有實驗表明,1s的資料可以publish10k的資料包。當然對于不同的硬體環境,不同的資料包大小這個資料肯定不一樣,但是我隻想說明,對于普通的consumer或者producer來說,這已經足夠了。如果不夠用,你考慮的應該是如何細化split你的設計。

broker:簡單來說就是消息隊列伺服器實體。

exchange:消息交換機,它指定消息按什麼規則,路由到哪個隊列。

queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。

binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。

routing key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。

vhost:虛拟主機,一個broker裡可以開設多個vhost,用作不同使用者的權限分離。

producer:消息生産者,就是投遞消息的程式。

consumer:消息消費者,就是接受消息的程式。

channel:消息通道,在用戶端的每個連接配接裡,可建立多個channel,每個channel代表一個會話任務。

由exchange,queue,routingkey三個才能決定一個從exchange到queue的唯一的線路。

4.基本概念

connectionfactory、connection、channel

connectionfactory、connection、channel都是rabbitmq對外提供的api中最基本的對象。connection是rabbitmq的socket連結,它封裝了socket協定相關部分邏輯。connectionfactory為connection的制造工廠。

channel是我們與rabbitmq打交道的最重要的一個接口,我們大部分的業務操作是在channel這個接口中完成的,包括定義queue、定義exchange、綁定queue與exchange、釋出消息等。

queue

queue(隊列)是rabbitmq的内部對象,用于存儲消息,用下圖表示。

消息隊列探秘 – RabbitMQ 消息隊列介紹

rabbitmq中的消息都隻能存儲在queue中,生産者(下圖中的p)生産消息并最終投遞到queue中,消費者(下圖中的c)可以從queue中擷取消息并消費。

消息隊列探秘 – RabbitMQ 消息隊列介紹

多個消費者可以訂閱同一個queue,這時queue中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息并處理。

消息隊列探秘 – RabbitMQ 消息隊列介紹

message acknowledgment

在實際應用中,可能會發生消費者收到queue中的消息,但沒有處理完成就當機(或出現其他意外)的情況,這種情況下就可能會導緻消息丢失。為了避免這種情況發生,我們可以要求消費者在消費完消息後發送一個回執給rabbitmq,rabbitmq收到消息回執(message acknowledgment)後才将該消息從queue中移除;如果rabbitmq沒有收到回執并檢測到消費者的rabbitmq連接配接斷開,則rabbitmq會将該消息發送給其他消費者(如果存在多個消費者)進行處理。這裡不存在timeout概念,一個消費者處理消息時間再長也不會導緻該消息被發送給其他消費者,除非它的rabbitmq連接配接斷開。

這裡會産生另外一個問題,如果我們的開發人員在處理完業務邏輯後,忘記發送回執給rabbitmq,這将會導緻嚴重的bug——queue中堆積的消息會越來越多;消費者重新開機後會重複消費這些消息并重複執行業務邏輯。

另外pub message是沒有ack的。

message durability

如果我們希望即使在rabbitmq服務重新開機的情況下,也不會丢失消息,我們可以将queue與message都設定為可持久化的(durable),這樣可以保證絕大部分情況下我們的rabbitmq消息不會丢失。但依然解決不了小機率丢失事件的發生(比如rabbitmq伺服器已經接收到生産者的消息,但還沒來得及持久化該消息時rabbitmq伺服器就斷電了),如果我們需要對這種小機率事件也要管理起來,那麼我們要用到事務。由于這裡僅為rabbitmq的簡單介紹,是以這裡将不講解rabbitmq相關的事務。

prefetch count

前面我們講到如果有多個消費者同時訂閱同一個queue中的消息,queue中的消息會被平攤給多個消費者。這時如果每個消息的處理時間不同,就有可能會導緻某些消費者一直在忙,而另外一些消費者很快就處理完手頭工作并一直空閑的情況。我們可以通過設定prefetchcount來限制queue每次發送給每個消費者的消息數,比如我們設定prefetchcount=1,則queue每次給每個消費者發送一條消息;消費者處理完這條消息後queue會再給該消費者發送一條消息。

消息隊列探秘 – RabbitMQ 消息隊列介紹

exchange

在上一節我們看到生産者将消息投遞到queue中,實際上這在rabbitmq中這種事情永遠都不會發生。實際的情況是,生産者将消息發送到exchange(交換器,下圖中的x),由exchange将消息路由到一個或多個queue中(或者丢棄)。

消息隊列探秘 – RabbitMQ 消息隊列介紹

exchange是按照什麼邏輯将消息路由到queue的?這個将在binding一節介紹。

rabbitmq中的exchange有四種類型,不同的類型有着不同的路由政策,這将在exchange types一節介紹。

routing key

生産者在将消息發送給exchange的時候,一般會指定一個routing key,來指定這個消息的路由規則,而這個routing key需要與exchange type及binding key聯合使用才能最終生效。

在exchange type與binding key固定的情況下(在正常使用時一般這些内容都是固定配置好的),我們的生産者就可以在發送消息給exchange時,通過指定routing key來決定消息流向哪裡。

rabbitmq為routing key設定的長度限制為255 bytes。

binding

rabbitmq中通過binding将exchange與queue關聯起來,這樣rabbitmq就知道如何正确地将消息路由到指定的queue了。

消息隊列探秘 – RabbitMQ 消息隊列介紹

binding key

在綁定(binding)exchange與queue的同時,一般會指定一個binding key;消費者将消息發送給exchange時,一般會指定一個routing key;當binding key與routing key相比對時,消息将會被路由到對應的queue中。這個将在exchange types章節會列舉實際的例子加以說明。

在綁定多個queue到同一個exchange的時候,這些binding允許使用相同的binding key。

binding key 并不是在所有情況下都生效,它依賴于exchange type,比如fanout類型的exchange就會無視binding key,而是将消息路由到所有綁定到該exchange的queue。

exchange types

rabbitmq常用的exchange type有fanout、direct、topic、headers這四種(amqp規範裡還提到兩種exchange type,分别為system與自定義,這裡不予以描述),下面分别進行介紹。

fanout

fanout類型的exchange路由規則非常簡單,它會把所有發送到該exchange的消息路由到所有與它綁定的queue中。

消息隊列探秘 – RabbitMQ 消息隊列介紹

上圖中,生産者(p)發送到exchange(x)的所有消息都會路由到圖中的兩個queue,并最終被兩個消費者(c1與c2)消費。

direct

direct類型的exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key完全比對的queue中。

消息隊列探秘 – RabbitMQ 消息隊列介紹

以上圖的配置為例,我們以routingkey=”error”發送消息到exchange,則消息會路由到queue1(amqp.gen-s9b…,這是由rabbitmq自動生成的queue名稱)和queue2(amqp.gen-agl…);如果我們以routingkey=”info”或routingkey=”warning”來發送消息,則消息隻會路由到queue2。如果我們以其他routingkey發送消息,則消息不會路由到這兩個queue中。

topic

前面講到direct類型的exchange路由規則是完全比對binding key與routing key,但這種嚴格的比對方式在很多情況下不能滿足實際業務需求。topic類型的exchange在比對規則上進行了擴充,它與direct類型的exchage相似,也是将消息路由到binding key與routing key相比對的queue中,但這裡的比對規則有些不同,它約定:

routing key為一個句點号“.”分隔的字元串(我們将被句點号“. ”分隔開的每一段獨立的字元串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” binding key與routing key一樣也是句點号“. ”分隔的字元串

binding key中可以存在兩種特殊字元“*”與“#”,用于做模糊比對,其中“*”用于比對一個單詞,“#”用于比對多個單詞(可以是零個)

消息隊列探秘 – RabbitMQ 消息隊列介紹

以上圖中的配置為例,routingkey=”quick.orange.rabbit”的消息會同時路由到q1與q2,routingkey=”lazy.orange.fox”的消息會路由到q1,routingkey=”lazy.brown.fox”的消息會路由到q2,routingkey=”lazy.pink.rabbit”的消息會路由到q2(隻會投遞給q2一次,雖然這個routingkey與q2的兩個bindingkey都比對);routingkey=”quick.brown.fox”、routingkey=”orange”、routingkey=”quick.orange.male.rabbit”的消息将會被丢棄,因為它們沒有比對任何bindingkey。

headers

headers類型的exchange不依賴于routing key與binding key的比對規則來路由消息,而是根據發送的消息内容中的headers屬性進行比對。

在綁定queue與exchange時指定一組鍵值對;當消息發送到exchange時,rabbitmq會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全比對queue與exchange綁定時指定的鍵值對;如果完全比對則消息會路由到該queue,否則不會路由到該queue。

該類型的exchange沒有用到過(不過也應該很有用武之地),是以不做介紹。

rpc

mq本身是基于異步的消息處理,前面的示例中所有的生産者(p)将消息發送到rabbitmq後不會知道消費者(c)處理成功或者失敗(甚至連有沒有消費者來處理這條消息都不知道)。

但實際的應用場景中,我們很可能需要一些同步處理,需要同步等待服務端将我的消息處理完成後再進行下一步處理。這相當于rpc(remote procedure call,遠端過程調用)。

在rabbitmq中也支援rpc。

消息隊列探秘 – RabbitMQ 消息隊列介紹

rabbitmq中實作rpc的機制是:

用戶端發送請求(消息)時,在消息的屬性(messageproperties,在amqp協定中定義了14中properties,這些屬性會随着消息一起發送)中設定兩個值replyto(一個queue名稱,用于告訴伺服器處理完成後将通知我的消息發送到這個queue中)和correlationid(此次請求的辨別号,伺服器處理完成後需要将此屬性返還,用戶端将根據這個id了解哪條請求被成功執行了或執行失敗)

伺服器端收到消息并處理

伺服器端處理完消息後,将生成一條應答消息到replyto指定的queue,同時帶上correlationid屬性

用戶端之前已訂閱replyto指定的queue,從中收到伺服器的應答消息後,根據其中的correlationid屬性分析哪條請求被執行了,根據執行結果進行後續業務處理

5. 細節闡明

使用ack确認message的正确傳遞

預設情況下,如果message 已經被某個consumer正确的接收到了,那麼該message就會被從queue中移除。當然也可以讓同一個message發送到很多的consumer。

如果一個queue沒被任何的consumer subscribe(訂閱),那麼,如果這個queue有資料到達,那麼這個資料會被cache,不會被丢棄。當有consumer時,這個資料會被立即發送到這個consumer,這個資料被consumer正确收到時,這個資料就被從queue中删除。

那麼什麼是正确收到呢?通過ack。每個message都要被acknowledged(确認,ack)。我們可以顯示的在程式中去ack,也可以自動的ack。如果有資料沒有被ack,那麼rabbitmq server會把這個資訊發送到下一個consumer。

如果這個app有bug,忘記了ack,那麼rabbitmq server不會再發送資料給它,因為server認為這個consumer處理能力有限。

而且ack的機制可以起到限流的作用(benefitto throttling):在consumer處理完成資料後發送ack,甚至在額外的延時後發送ack,将有效的balance consumer的load。

當然對于實際的例子,比如我們可能會對某些資料進行merge,比如merge 4s内的資料,然後sleep 4s後再擷取資料。特别是在監聽系統的state,我們不希望所有的state實時的傳遞上去,而是希望有一定的延時。這樣可以減少某些io,而且終端使用者也不會感覺到。

reject a message

有兩種方式,第一種的reject可以讓rabbitmq server将該message 發送到下一個consumer。第二種是從queue中立即删除該message。

creating a queue

consumer和procuder都可以通過 queue.declare 建立queue。對于某個channel來說,consumer不能declare一個queue,卻訂閱其他的queue。當然也可以建立私有的queue。這樣隻有app本身才可以使用這個queue。queue也可以自動删除,被标為auto-delete的queue在最後一個consumer unsubscribe後就會被自動删除。那麼如果是建立一個已經存在的queue呢?那麼不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次建立如果參數和第一次不一樣,那麼該操作雖然成功,但是queue的屬性并不會被修改。

那麼誰應該負責建立這個queue呢?是consumer,還是producer?

如果queue不存在,當然consumer不會得到任何的message。但是如果queue不存在,那麼producer publish的message會被丢棄。是以,還是為了資料不丢失,consumer和producer都try to create the queue!反正不管怎麼樣,這個接口都不會出問題。

queue對load balance的處理是完美的。對于多個consumer來說,rabbitmq 使用循環的方式(round-robin)的方式均衡的發送給不同的consumer。

exchanges

從架構圖可以看出,procuder publish的message進入了exchange。接着通過“routing keys”, rabbitmq會找到應該把這個message放到哪個queue裡。queue也是通過這個routing keys來做的綁定。

有三種類型的exchanges:direct, fanout,topic。每個實作了不同的路由算法(routing algorithm)。

direct exchange: 如果 routing key 比對, 那麼message就會被傳遞到相應的queue中。其實在queue建立時,它會自動的以queue的名字作為routing key來綁定那個exchange。

fanout exchange: 會向響應的queue廣播。

topic exchange:對key進行模式比對,比如ab可以傳遞到所有ab的queue。

virtual hosts

每個virtual host本質上都是一個rabbitmq server,擁有它自己的queue,exchagne,和bings rule等等。這保證了你可以在多個不同的application中使用rabbitmq。

消息隊列探秘 – RabbitMQ 消息隊列介紹