一、MQ
在介紹RabbitMq之前,先來說一下MQ。什麼是MQ?MQ全稱為Message Queue即消息隊列,就是一個消息的容器, MQ是消費-生産者模型的一個典型的代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取或者訂閱隊列中的消息。MQ架構非常之多,比較流行的有RabbitMq、ActiveMq、ZeroMq、kafka。根據自己項目的業務場景和需求來選擇相應的MQ架構(MQ架構比較)。為什麼要使用MQ呢?在項目中,一些無需即時傳回且耗時的操作提取出來,進行了異步處理,而這種異步處理的方式大大的節省了伺服器的請求響應時間,進而提高了系統的吞吐量。比如在高并發環境下,由于來不及同步處理,請求往往會發生堵塞,比如說,大量的insert,update之類的請求同時到達MySQL,直接導緻無數的行鎖表鎖,甚至最後請求會堆積過多,進而觸發too many connections錯誤。通過使用消息隊列,我們可以異步處理請求,進而緩解系統的壓力。
二、RabbitMQ
在MQ衆多架構中RabbitMQ仍然是首選,RabbitMQ用erlang語言開發。RabbitMQ是AMQP(進階消息隊列協定)的标準實作的可複用的企業消息系統。如果不熟悉AMQP,直接看RabbitMQ的文檔會比較困難。是以我先學習一下AMQP協定。
三、AMQP協定
AMQP協定參考和轉載自https://blog.csdn.net/yinwenjie/article/details/50820369寫的非常好。
1、AMQP協定的各個組成部分

(1)AMQP協定的各個組成部分:AMQP協定中的元素包括:Message(消息體)、Producer(消息生産者)、Consumer(消息消費者)、Virtual Host(虛拟節點)、Exchange(交換機)、Queue(隊列)等
(2)由Producer(消息生産者)和Consumer(消息消費者)構成了AMQP的用戶端,他們是發送消息和接收消息的主體。AMQP服務端稱為Broker,一個Broker中一定包含完整的Virtual Host(虛拟主機)、 Exchange(交換機)、Queue(隊列)定義。
(3)一個Broker可以建立多個Virtual Host(虛拟主機),Virtual Host的工作元素有Exchange和Queue。注意,如果AMQP是由多個Broker構成的叢集提供服務,那麼一個Virtual Host也可以由多個Broker共同構成。
(4)Connection是由Producer(消息生産者)和Consumer(消息消費者)建立的連接配接,連接配接到Broker實體節點上。但是有了Connection後用戶端還不能和伺服器通信,在Connection之上用戶端會建立Channel,連接配接到Virtual Host或者Queue上,這樣用戶端才能向Exchange發送消息或者從Queue接受消息。一個Connection上允許存在多個Channel,隻有Channel中能夠發送/接受消息。
(5)Exchange元素是AMQP協定中的交換機,Exchange可以綁定多個Queue也可以同時綁定其他Exchange。消息通過Exchange時,會按照Exchange中設定的Routing(路由)規則,将消息發送到符合的Queue或者Exchange中。
2、AMQP消息的工作原理
AMQP消息在這個結構中是如何通過Producer發出,又經過Broker最後到達Consumer的呢
(1)在Producer(消息生産者)用戶端建立了Channel後,就建立了到Broker上Virtual Host的連接配接。接下來Producer就可以向這個Virtual Host中的Exchange發送消息了。
(2)Exchange(交換機)能夠處理消息的前提是:它至少已經和某個Queue或者另外的Exchange形成了綁定關系,并設定好了到這些Queue和Excahnge的Routing(路由規則)。Excahnge中的Routing有三種模式。在Exchange收到消息後,會根據設定的Routing(路由規則),将消息發送到符合要求的Queue或者Exchange中(路由規則還會和Message中的Routing Key屬性配合使用)。
(3)Queue收到消息後,可能會進行如下的處理:如果目前沒有Consumer的Channel連接配接到這個Queue,那麼Queue将會把這條消息進行存儲直到有Channel被建立(AMQP協定的不同實作産品中,存儲方式又不盡相同);如果已經有Channel連接配接到這個Queue,那麼消息将會按順序被發送給這個Channel。
(4)Consumer收到消息後,就可以進行消息的處理了。但是整個消息傳遞的過程還沒有完成:視設定情況,Consumer在完成某一條消息的處理後,将需要手動的發送一條ACK消息給對應的Queue(當然您可以設定為自動發送,或者無需發送)。Queue在收到這條ACK資訊後,才會認為這條消息處理成功,并将這條消息從Queue中移除;如果在對應的Channel斷開後,Queue都沒有這條消息的ACK資訊,這條消息将會重新被發送給另外的Channel。當然,您還可以發送NACK資訊,這樣這條消息将會立即歸隊,并發送給另外的Channel。
3、Excahnge中Routing的三種模式
Exchange交換機在AMQP協定中主要負責按照一定的規則,将收到的消息轉發到已經和它事先綁定好的Queue或者另外的Exchange中。Excahnge交換機的這個處理過程稱之為Routing(路由)。目前流行的AMQP路由實作一共有三種:分别是Direct Exchange、Fanout Exchange和Topic Exchange。需要特别注意的是:Exhange需要具備怎樣的‘路由’規則,并沒有在AMQP标準協定進行強行規定,目前流行的AMQP轉發規則都是AMQP實作産品自行開發的(這也是為什麼AMQP消息中和路由、過濾規則相關的屬性是存放在application-properties區域的原因)。
(1)Direct路由
direct模式從字面上的了解應該是‘引導’、‘直接’的含義。direct模式下Exchange将使用AMQP消息中所攜帶的Routing-Key和Queue中的Routing Key進行比較。如果兩者完全比對,就會将這條消息發送到這個Queue中。如下圖所示:
(2)Fanout路由
Fanout路由模式不需要Routing Key。當設定為Fanout模式的Exchange收到AMQP消息後,将會将這個AMQP消息複制多份,分别發送到和自己綁定的各個Queue中。如下圖所示:
(3)Topic路由
Topic模式是Routing Key的比對模式。Exchange将支援使用‘#’和‘ * ’通配符進行Routing Key的比對查找(‘#’表示0個或若幹個關鍵詞,‘ * ’表示一個關鍵詞,注意是關鍵詞不是字母)。如下圖所示:
為了友善各位讀者的了解,這裡我們再舉幾個通配符比對的示例:
- “param.#”,可以比對“param”、“param.test”、“param.value”、“param.test.child”等等AMQP消息的Routing Key;但是不能比對諸如“param1.test”、“param2.test”以為param這個關鍵詞和param1這個關鍵詞不相同。
- “param.*.* ”,可以比對“param.test.test”、“param.test.value”、“param.test.child”等等AMQP消息的Routing Key;但是不能比對諸如“param”、“param.test”、“parm.child”等等Routing Key。
- “param.*.value”,可以比對“param.value.value”、“param.test.value”等Routing Key;但是不能比對諸如“param.value”、“param.value.child”等Routing Key。
注意,以上介紹的Direct 路由模式和Topic 路由模式中,如果Exchange交換機沒有找到任何比對Routing Key的Queue,那麼這條AMQP消息會被丢棄。(隻有Queue有儲存消息的功能,但是Exchange并不負責儲存消息)
四、參考資料
https://blog.csdn.net/yinwenjie/article/details/50820369