天天看點

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

在企業應用系統領域,會面對不同系統之間的通信、內建與整合,尤其當面臨異構系統時,這種分布式的調用與通信變得越發重要。其次,系統中一般會有很多對實時性要求不高的但是執行起來比較較耗時的地方,比如發送短信,郵件提醒,更新文章閱讀計數,記錄使用者記錄檔等等,如果實時處理的話,在使用者通路量比較大的情況下,對系統壓力比較大。

面對這些問題,我們一般會将這些請求,放在消息隊列中處理;異構系統之間使用消息進行通訊。消息傳遞相較檔案傳遞與遠端過程調用(RPC)而言,似乎更勝一籌,因為它具有更好的平台無關性,并能夠很好地支援并發與異步調用。是以如果系統中出現了如下情況:

對操作的實時性要求不高,而需要執行的任務極為耗時;

存在異構系統間的整合;

一般的可以考慮引入消息隊列。對于第一種情況,常常會選擇消息隊列來處理執行時間較長的任務。引入的消息隊列就成了消息處理的緩沖區。消息隊列引入的異步通信機制,使得發送方和接收方都不用等待對方傳回成功消息,就可以繼續執行下面的代碼,進而提高了資料處理的能力。尤其是當通路量和資料流量較大的情況下,就可以結合消息隊列與背景任務,通過避開高峰期對大資料進行處理,就可以有效降低資料庫處理資料的負荷。

本文簡單介紹在RabbitMQ這一消息代理工具,以及在.NET中如何使用RabbitMQ.

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

現在就可以對RabbitMQ Server進行配置了。

首先,切換到RabbitMQ Server的安裝目錄:

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

在sbin下面有很多batch檔案,用來控制RabbitMQ Server,當然您也可以直接在安裝開始菜單中來執行相應的操作:

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

最簡單的方式是使RabbitMQ以Windows Service的方式在背景運作,是以我們需要以管理者權限打開cmd,然後切換到sbin目錄下,執行這三條指令即可:

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

現在RabbitMQ的服務端已經啟動起來了。

下面可以使用sbin目錄下面的rabbitmqctl.bat這個腳本來檢視和控制服務端狀态的,在cmd中直接運作rabbitmqctl status。如果看到以下結果:

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

顯示node沒有連接配接上,需要到C:\Windows目錄下,将.erlang.cookie檔案,拷貝到使用者目錄下 C:\Users\{使用者名},這是Erlang的Cookie檔案,允許與Erlang進行互動,現在重複運作剛才的指令就會得到如下資訊:

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

RabbitMQ Server上面也有使用者概念,安裝好之後,使用rabbitmqctl list_users指令,可以看到上面目前的使用者:

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

可以看到,現在隻有一個角色為administrator的名為guest的使用者,這個是RabbitMQ預設為我們建立的,他有RabbitMQ的所有權限,一般的,我們需要建立一個我們自己的使用者,設定密碼,并授予權限,并将其設定為管理者,可以使用下面的指令來執行這一操作:

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

上面的一條指令添加了一個名為yy的使用者,并設定了密碼hello!,下面的指令為使用者yy分别授予對所有消息隊列的配置、讀和寫的權限。

現在我們可以将預設的guest使用者删掉,使用下面的指令即可:

如果要修改密碼,可以使用下面的指令:

在使用RabitMQ之前,需要對下面的幾個基本概念說明一下:

RabbitMQ是一個消息代理。他從消息生産者(producers)那裡接收消息,然後把消息送給消息消費者(consumer)在發送和接受之間,他能夠根據設定的規則進行路由,緩存和持久化。

一般提到RabbitMQ和消息,都用到一些專有名詞。

生産(Producing)意思就是發送。發送消息的程式就是一個生産者(producer)。我們一般用"P"來表示:

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

隊列(queue)就是郵箱的名稱。消息通過你的應用程式和RabbitMQ進行傳輸,它們隻能存儲在隊列(queue)中。 隊列(queue)容量沒有限制,你要存儲多少消息都可以——基本上是一個無限的緩沖區。多個生産者(producers)能夠把消息發送給同一個隊列,同樣,多個消費者(consumers)也能從同一個隊列(queue)中擷取資料。隊列可以畫成這樣(圖上是隊列的名稱):

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

消費(Consuming)和擷取消息是一樣的意思。一個消費者(consumer)就是一個等待擷取消息的程式。我們把它畫作"C":

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

通常,消息生産者,消息消費者和消息代理不在同一台機器上。

為了展示RabbitMQ的基本使用,我們發送一個HelloWorld消息,然後接收并處理。

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

首先建立一個控制台程式,用來将消息發送到RabbitMQ的消息隊列中,代碼如下:

首先,需要建立一個ConnectionFactory,設定目标,由于是在本機,是以設定為localhost,如果RabbitMQ不在本機,隻需要設定目标機器的IP位址或者機器名稱即可,然後設定前面建立的使用者名yy和密碼hello!。

緊接着要建立一個Channel,如果要發送消息,需要建立一個隊列,然後将消息釋出到這個隊列中。在建立隊列的時候,隻有RabbitMQ上該隊列不存在,才會去建立。消息是以二進制數組的形式傳輸的,是以如果消息是實體對象的話,需要序列化和然後轉化為二進制數組。

現在用戶端發送代碼已經寫好了,運作之後,消息會釋出到RabbitMQ的消息隊列中,現在需要編寫服務端的代碼連接配接到RabbitMQ上去擷取這些消息。

同樣,建立一個名為Receive的服務端控制台應用程式,服務端代碼如下:

和發送一樣,首先需要定義連接配接,然後聲明消息隊列。要接收消息,需要定義一個Consume,然後從消息隊列中不斷Dequeue消息,然後處理。

現在發送端和接收端的代碼都寫好了,運作發送端,發送消息:

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

現在,名為hello的消息隊列中,發送了一條消息。這條消息存儲到了RabbitMQ的伺服器上了。使用rabbitmqctl 的list_queues可以檢視所有的消息隊列,以及裡面的消息個數,可以看到,目前Rabbitmq上隻有一個消息隊列,裡面隻有一條消息:

現在運作接收端程式,如下:

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

可以看到,已經接受到了用戶端發送的Hello World,現在再來看RabitMQ上的消息隊列資訊:

可以看到,hello這個隊列中的消息隊列個數為0,這表示,當接收端,接收到消息之後,RabbitMQ上就把這個消息删掉了。

前面的例子展示了如何往一個指定的消息隊列中發送和收取消息。現在我們建立一個工作隊列(work queue)來将一些耗時的任務分發給多個工作者(workers):

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

工作隊列(work queues, 又稱任務隊列Task Queues)的主要思想是為了避免立即執行并等待一些占用大量資源、時間的操作完成。而是把任務(Task)當作消息發送到隊列中,稍後處理。一個運作在背景的工作者(worker)程序就會取出任務然後處理。當運作多個工作者(workers)時,任務會在它們之間共享。

這個在網絡應用中非常有用,它可以在短暫的HTTP請求中處理一些複雜的任務。在一些實時性要求不太高的地方,我們可以處理完主要操作之後,以消息的方式來處理其他的不緊要的操作,比如寫日志等等。

準備

在第一部分,發送了一個包含“Hello World!”的字元串消息。現在發送一些字元串,把這些字元串當作複雜的任務。這裡使用time.sleep()函數來模拟耗時的任務。在字元串中加上點号(.)來表示任務的複雜程度,一個點(.)将會耗時1秒鐘。比如"Hello..."就會耗時3秒鐘。

對之前示例的send.cs做些簡單的調整,以便可以發送随意的消息。這個程式會按照計劃發送任務到我們的工作隊列中。

加粗部分是經過修改過了的。

接着我們修改接收端,讓他根據消息中的逗點的個數來Sleep對應的秒數:

輪詢分發

使用工作隊列的一個好處就是它能夠并行的處理隊列。如果堆積了很多任務,我們隻需要添加更多的工作者(workers)就可以了,擴充很簡單。

現在,我們先啟動兩個接收端,等待接受消息,然後啟動一個發送端開始發送消息。

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

在cmd條件下,發送了5條消息,每條消息後面的逗點表示該消息需要執行的時長,來模拟耗時的操作。

然後可以看到,兩個接收端依次接收到了發出的消息:

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

預設,RabbitMQ會将每個消息按照順序依次分發給下一個消費者。是以每個消費者接收到的消息個數大緻是平均的。 這種消息分發的方式稱之為輪詢(round-robin)。

當處理一個比較耗時得任務的時候,也許想知道消費者(consumers)是否運作到一半就挂掉。在目前的代碼中,當RabbitMQ将消息發送給消費者(consumers)之後,馬上就會将該消息從隊列中移除。此時,如果把處理這個消息的工作者(worker)停掉,正在處理的這條消息就會丢失。同時,所有發送到這個工作者的還沒有處理的消息都會丢失。

我們不想丢失任何任務消息。如果一個工作者(worker)挂掉了,我們希望該消息會重新發送給其他的工作者(worker)。

為了防止消息丢失,RabbitMQ提供了消息響應(acknowledgments)機制。消費者會通過一個ack(響應),告訴RabbitMQ已經收到并處理了某條消息,然後RabbitMQ才會釋放并删除這條消息。

如果消費者(consumer)挂掉了,沒有發送響應,RabbitMQ就會認為消息沒有被完全處理,然後重新發送給其他消費者(consumer)。這樣,即使工作者(workers)偶爾的挂掉,也不會丢失消息。

消息是沒有逾時這個概念的;當工作者與它斷開連的時候,RabbitMQ會重新發送消息。這樣在處理一個耗時非常長的消息任務的時候就不會出問題了。

消息響應預設是開啟的。在之前的例子中使用了no_ack=True辨別把它關閉。是時候移除這個辨別了,當工作者(worker)完成了任務,就發送一個響應。

現在,可以保證,即使正在處理消息的工作者被停掉,這些消息也不會丢失,所有沒有被應答的消息會被重新發送給其他工作者.

一個很常見的錯誤就是忘掉了BasicAck這個方法,這個錯誤很常見,但是後果很嚴重. 當用戶端退出時,待處理的消息就會被重新分發,但是RabitMQ會消耗越來越多的記憶體,因為這些沒有被應答的消息不能夠被釋放。調試這種case,可以使用rabbitmqct列印messages_unacknoledged字段。

前面已經搞定了即使消費者down掉,任務也不會丢失,但是,如果RabbitMQ Server停掉了,那麼這些消息還是會丢失。

當RabbitMQ Server 關閉或者崩潰,那麼裡面存儲的隊列和消息預設是不會儲存下來的。如果要讓RabbitMQ儲存住消息,需要在兩個地方同時設定:需要保證隊列和消息都是持久化的。

首先,要保證RabbitMQ不會丢失隊列,是以要做如下設定:

雖然在文法上是正确的,但是在目前階段是不正确的,因為我們之前已經定義了一個非持久化的hello隊列。RabbitMQ不允許我們使用不同的參數重新定義一個已經存在的同名隊列,如果這樣做就會報錯。現在,定義另外一個不同名稱的隊列:

queueDeclare 這個改動需要在發送端和接收端同時設定。

現在保證了task_queue這個消息隊列即使在RabbitMQ Server重新開機之後,隊列也不會丢失。 然後需要保證消息也是持久化的, 這可以通過設定IBasicProperties.SetPersistent 為true來實作:

需要注意的是,将消息設定為持久化并不能完全保證消息不丢失。雖然他告訴RabbitMQ将消息儲存到磁盤上,但是在RabbitMQ接收到消息和将其儲存到磁盤上這之間仍然有一個小的時間視窗。 RabbitMQ 可能隻是将消息儲存到了緩存中,并沒有将其寫入到磁盤上。持久化是不能夠一定保證的,但是對于一個簡單任務隊列來說已經足夠。如果需要消息隊列持久化的強保證,可以使用publisher confirms

你可能會注意到,消息的分發可能并沒有如我們想要的那樣公平配置設定。比如,對于兩個工作者。當奇數個消息的任務比較重,但是偶數個消息任務比較輕時,奇數個工作者始終處理忙碌狀态,而偶數個工作者始終處理空閑狀态。但是RabbitMQ并不知道這些,他仍然會平均依次的分發消息。

為了改變這一狀态,我們可以使用basicQos方法,設定perfetchCount=1 。這樣就告訴RabbitMQ 不要在同一時間給一個工作者發送多于1個的消息,或者換句話說。在一個工作者還在處理消息,并且沒有響應消息之前,不要給他分發新的消息。相反,将這條新的消息發送給下一個不那麼忙碌的工作者。

現在将所有這些放在一起:

發送端代碼如下:

接收端代碼如下:

RabbitMQ還有一個管理界面,通過該界面可以檢視RabbitMQ Server 目前的狀态,該界面是以插件形式提供的,并且在安裝RabbitMQ的時候已經自帶了該插件。需要做的是在RabbitMQ控制台界面中啟用該插件,指令如下:

.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻
.NET 環境中使用RabbitMQ一 環境搭建二 開始使用三 管理界面四 總結五 參考文獻

在該界面上可以看到目前RabbitMQServer的所有狀态。

本文簡單介紹了消息隊列的相關概念,并介紹了RabbitMQ消息代理的基本原理以及在Windows 上如何安裝RabbitMQ和在.NET中如何使用RabbitMQ。消息隊列在建構分布式系統和提高系統的可擴充性和響應性方面有着很重要的作用,希望本文對您了解消息隊列以及如何使用RabbitMQ有所幫助。

<a href="http://www.infoq.com/cn/articles/message-based-distributed-architecture" target="_blank">http://www.infoq.com/cn/articles/message-based-distributed-architecture</a>

<a href="http://www.rabbitmq.com/getstarted.html" target="_blank">http://www.rabbitmq.com/getstarted.html</a>

<a href="http://www.codethinked.com/using-rabbitmq-with-c-and-net" target="_blank">http://www.codethinked.com/using-rabbitmq-with-c-and-net</a>

<a href="http://www.infoq.com/cn/articles/AMQP-RabbitMQ" target="_blank">http://www.infoq.com/cn/articles/AMQP-RabbitMQ</a>

<a href="http://www.infoq.com/cn/articles/ebay-scalability-best-practices" target="_blank">http://www.infoq.com/cn/articles/ebay-scalability-best-practices</a>