rabbitMQ是一個在AMQP協定标準基礎上完整的,可服用的企業消息系統。他遵循Mozilla Public License開源協定。采用 Erlang 實作的工業級的消息隊列(MQ)伺服器。
RabbitMQ的官方站:http://www.rabbitmq.com/
AMQP(進階消息隊列協定) 是一個異步消息傳遞所使用的應用層協定規範,作為線路層協定,而不是API(例如JMS),AMQP 用戶端能夠無視消息的來源任意發送和接受資訊。AMQP的原始用途隻是為金融界提供一個可以彼此協作的消息協定,而現在的目标則是為通用消息隊列架構提供通用建構工具。是以,面向消息的中間件 (MOM)系統,例如釋出/訂閱隊列,沒有作為基本元素實作。反而通過發送簡化的AMQ實體,使用者被賦予了建構例如這些實體的能力。這些實體也是規範的一 部分,形成了線上路層協定頂端的一個層級:AMQP模型。這個模型統一了消息模式,諸如之前提到的釋出/訂閱,隊列,事務以及流資料,并且添加了額外的特性,例如更易于擴充,基于内容的路由。
AMQP當中有四個概念非常重要
<code>virtual host</code>,虛拟主機
<code>exchange</code>,交換機
<code>queue</code>,隊列
<code>binding</code>,綁定
一個虛拟主機持有一組交換機、隊列和綁定。
為什麼需要多個虛拟主機呢?因為RabbitMQ當中,使用者隻能在虛拟主機的粒度進行權限控制。是以,如果需要禁止A組通路B組的交換機/隊列/綁定,必須為A和B分别建立一個虛拟主機。每一個RabbitMQ伺服器都有一個預設的虛拟主機<code>/</code>。
何謂虛拟主機(virtual host),交換機(exchange),隊列(queue)和綁定(binding)
隊列(Queues)是你的消息(messages)的終點,可以了解成裝消息的容器。消息就一直在裡面,直到有用戶端(也就是消費者,Consumer)連接配接到這個隊列并且将其取走為止。不過,也可以将一個隊列配置成這樣的:一旦消息進入這個隊列,此消息就被删除。
隊列是由消費者(Consumer)通過程式建立的,不是通過配置檔案或者指令行工具。這沒什麼問題,如果一個消費者試圖建立一個已經存在的隊列,RabbitMQ會直接忽略這個請求。是以我們可以将消息隊列的配置寫在應用程式的代碼裡面。
而要把一個消息放進隊列前,需要有一個交換機(Exchange)。
交換機(Exchange)可以了解成具有路由表的路由程式。每個消息都有一個稱為路由鍵(routing key)的屬性,就是一個簡單的字元串。交換機當中有一系列的綁定(binding),即路由規則(routes)。(例如,指明具有路由鍵 “X” 的消息要到名為timbuku的隊列當中去。)
消費者程式(Consumer)要負責建立你的交換機。交換機可以存在多個,每個交換機在自己獨立的程序當中執行,是以增加多個交換機就是增加多個程序,可以充分利用伺服器上的CPU核以便達到更高的效率。例如,在一個8核的伺服器上,可以建立5個交換機來用5個核,另外3個核留下來做消息處理。類似的,在RabbitMQ的叢集當中,你可以用類似的思路來擴充交換機一邊擷取更高的吞吐量。
交換機如何判斷要把消息送到哪個隊列?你需要路由規則,即綁定(binding)。一個綁定就是一個類似這樣的規則:将交換機“desert(沙漠)”當中具有路由鍵“阿裡巴巴”的消息送到隊列“hideout(山洞)”裡面去。換句話說,一個綁定就是一個基于路由鍵将交換機和隊列連接配接起來的路由規則。例如,具有路由鍵“audit”的消息需要被送到兩個隊列,“log-forever”和“alert-the-big-dude”。要做到這個,就需要建立兩個綁定,每個都連接配接一個交換機和一個隊列,兩者都是由“audit”路由鍵觸發。在這種情況下,交換機會複制一份消息并且把它們分别發送到兩個隊列當中。交換機不過就是一個由綁定構成的路由表。
交換機有多種類型。他們都是做路由的,但是它們接受不同類型的綁定。為什麼不建立一種交換機來處理所有類型的路由規則呢?因為每種規則用來做比對分子的CPU開銷是不同的。例如,一個“topic”類型的交換機試圖将消息的路由鍵與類似“dogs.*”的模式進行比對。比對這種末端的通配符比直接将路由鍵與“dogs”比較(“direct”類型的交換機)要消耗更多的CPU。如果你不需要“topic”類型的交換機帶來的靈活性,你可以通過使用“direct”類型的交換機擷取更高的處理效率。那麼有哪些類型,他們又是怎麼處理的呢?
Exchange

你花了大量的時間來建立隊列、交換機和綁定,然後,伺服器程式挂了。你的隊列、交換機和綁定怎麼樣了?還有,放在隊列裡面但是尚未處理的消息們呢?
如果你是用預設參數構造的這一切的話,那麼,他們都灰飛煙滅了。RabbitMQ重新開機之後會幹淨的像個新生兒。你必須重做所有的一切,亡羊補牢,如何避免将來再度發生此類杯具?
隊列和交換機有一個建立時候指定的标志durable。durable的唯一含義就是具有這個标志的隊列和交換機會在重新開機之後重建立立,它不表示說在隊列當中的消息會在重新開機後恢複。那麼如何才能做到不隻是隊列和交換機,還有消息都是持久的呢?
但是首先需要考慮的問題是:是否真的需要消息的持久化?如果需要重新開機後消息可以回複,那麼它需要被寫入磁盤。但即使是最簡單的磁盤操作也是要消耗時間的。是以需要衡量判斷。
當你将消息釋出到交換機的時候,可以指定一個标志“Delivery Mode”(投遞模式)。根據你使用的AMQP的庫不同,指定這個标志的方法可能不太一樣。簡單的說,就是将Delivery Mode設定成2,也就是持久的(persistent)即可。一般的AMQP庫都是将Delivery Mode設定成1,也就是非持久的。是以要持久化消息的步驟如下:
将交換機設成 durable。
将隊列設成 durable。
将消息的 Delivery Mode 設定成2 。
綁定(Bindings)怎麼辦?綁定無法在建立的時候設定成durable。沒問題,如果你綁定了一個durable的隊列和一個durable的交換機,RabbitMQ會自動保留這個綁定。類似的,如果删除了某個隊列或交換機(無論是不是durable),依賴它的綁定都會自動删除。
注意:
RabbitMQ 不允許你綁定一個非堅固(non-durable)的交換機和一個durable的隊列。反之亦然。要想成功必須隊列和交換機都是durable的。
一旦建立了隊列和交換機,就不能修改其标志了。例如,如果建立了一個non-durable的隊列,然後想把它改變成durable的,唯一的辦法就是删除這個隊列然後重制建立。是以,最好仔細檢查建立的标志。
在Windows上安裝Rabbit MQ 指南,最好的是這篇《Rabbit MQ Windows Installation guide》,其中還包括了使用.NET RabbitMQ.Client Nuget 包通路Rabbit MQ的示例代碼。
安裝Rabbit MQ
Rabbit MQ 是建立在強大的Erlang OTP平台上,是以安裝Rabbit MQ的前提是安裝Erlang。通過下面兩個連接配接下載下傳安裝3.2.3 版本:
下載下傳并安裝 Eralng OTP For Windows (vR16B03)
運作安裝 Rabbit MQ Server Windows Installer (v3.2.3)
預設安裝的Rabbit MQ 監聽端口是5672
激活Rabbit MQ's Management Plugin
使用Rabbit MQ 管理插件,可以更好的可視化方式檢視Rabbit MQ 伺服器執行個體的狀态,你可以在指令行中使用下面的指令激活:
<code>"C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin\rabbitmq-plugins.bat" enable rabbitmq_management</code>
<code>要重新開機服務才能生效,可以執行</code>
<code><code>net stop RabbitMQ && net start RabbitMQ</code></code>
下面我們使用rabbitmqctl控制台指令(位于C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>)來建立使用者,密碼,綁定權限等。
Microsoft Windows [版本 6.3.9600]
(c) 2013 Microsoft Corporation。保留所有權利。
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin 的目錄
2014/11/01 15:04 <DIR> .
2014/11/01 15:04 <DIR> ..
2014/01/23 22:57 817 rabbitmq-echopid.bat
2014/01/23 22:57 1,900 rabbitmq-plugins.bat
2014/01/23 22:57 4,356 rabbitmq-server.bat
2014/01/23 22:57 7,123 rabbitmq-service.bat
2014/01/23 22:57 1,621 rabbitmqctl.bat
5 個檔案 15,817 位元組
2 個目錄 96,078,618,624 可用位元組
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba
t list_users
Listing users ...
guest [administrator]
...done.
t list_vhosts
Listing vhosts ...
/
t add_user geffzhang zsy@2014
Creating user "geffzhang" ...
geffzhang []
t set_user_tags geffzhang administrator
Setting tags for user "geffzhang" to [administrator] ...
t set_permissions -p / geffzhang ".*" ".*" ".*"
Setting permissions for user "geffzhang" in vhost "/" ...
geffzhang [administrator]
使用浏覽器打開<code>http://localhost:15672</code> 通路Rabbit Mq的管理控制台,使用剛才建立的賬号登陸系統:
在.NET上使用Rabbit MQ
通過Nuget 擷取Rabbit MQ NET client bindings from NuGet:
<code>PM> Install-Package RabbitMQ.Client</code>
<code>我們最常見的一個場景是發送和接收Rabbit MQ 持久化消息:</code>
<code>第一步是聲明durable Exchange 和 Queue</code>
private readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory { HostName = "Geffzhang-NB", UserName="geffzhang", Password ="zsy@2014", VirtualHost ="/" };
const string ExchangeName = "test.exchange";
const string QueueName = "test.queue";
using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
channel.ExchangeDeclare(ExchangeName, "direct", durable:true, autoDelete:false, arguments:null);
channel.QueueDeclare(QueueName, durable:true, exclusive:false, autoDelete:false,arguments:null);
channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
}
下面對上面代碼進行說明:
1. 使用ConnectionFactory建立連接配接,雖然建立時指定了多個server address,但每個connection隻與一個實體的server進行連接配接。
2. 定義交換方式 ,建立了Direct Exchange和Durable Queue,并使用QueueName作為routing key ,可以把消息直接投遞到某個隊列。rabbitmq交換方式分為三種,分别是:
Direct Exchange – 處理路由鍵。需要将一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全比對。這是一個完整的比對。如果一個隊列綁定到該交換機上要求路由鍵 “dog”,則隻有被标記為“dog”的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,隻會轉發dog。
Fanout Exchange – 不處理路由鍵。你隻需要簡單的将隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每台子網内的主機都獲得了一份複制的消息。Fanout交換機轉發消息是最快的。
Topic Exchange – 将路由鍵和某模式進行比對。此時隊列需要綁定要一個模式上。符号“#”比對一個或多個詞,符号“*”比對不多不少一個詞。是以“audit.#”能夠比對到“audit.irs.corporate”,但是“audit.*” 隻會比對到“audit.irs”。
運作上述代碼,可以在Rabbit MQ的管理控制台上看到test.exchange Exchange 綁定到 建立的隊列 test.queue
第二步就是釋出持久化消息到隊列
Exchange和Queue建立好以後,就可以發送消息到隊列了。RabbitMq 可以接受byte[]的資料,字元串采用utf-8編碼的位元組數組。確定消息可持久化的,需要設定PersistMode為true,參看下面的代碼:
var props = channel.CreateBasicProperties();
props.SetPersistent(true);
var msgBody = Encoding.UTF8.GetBytes("Hello, World!");
channel.BasicPublish(ExchangeName, routingKey:QueueName, basicProperties:props, body:msgBody);
第三步就是消費消息了,有幾種不同的方法從隊列中消費消息,最常見的是使用<code>BasicGet</code>:
BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true);
var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
NoAck:true 告訴RabbitMQ立即從隊列中删除消息,另一個非常受歡迎的方式是從隊列中删除已經确認接收的消息,可以通過單獨調用BasicAck 進行确認:
BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck:false);
//process message ...
channel.BasicAck(msgResponse.DeliveryTag, multiple:false);
使用BasicAck方式來告之是否從隊列中移除該條消息,這一點很重要,因為在某些應用場景下,比如從隊列中擷取消息并用它來操作資料庫或日志檔案時,如果出現操作失敗時,則該條消息應該保留在隊列中,隻到操作成功時才從隊列中移除。
另一種方法是通過基于推送的事件訂閱。您可以使用内置的 QueueingBasicConsumer 提供簡化的程式設計模型,通過允許您在共享隊列上阻塞,直到收到一條消息,例如
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
var msgResponse = consumer.Queue.Dequeue(); //blocking
如何基于RabbitMQ實作優先級隊列
https://github.com/derekgreer/rabbitBus
https://github.com/evolvIQ/PushMQ
歡迎大家掃描下面二維碼成為我的客戶,為你服務和上雲