本文出自EasyNetQ官方文檔,内容為自己了解加翻譯。文檔位址:https://github.com/EasyNetQ/EasyNetQ/wiki/Quick-Start
EasyNetQ簡介
EasyNetQ是基于官方.NET元件RabbitMQ.Client 的又一層封裝,使用起來更加友善,開發者不用關心具體隊列聲明,路由聲明等細節,幾句簡單代碼即可發送消息到隊列,接收消息也很簡單,下面将簡單介紹EasyNetQ的使用方法。不知道什麼是RabbitMQ?您可以關閉網頁了。
安裝EasyNetQ
從NuGet上安裝即可,由于EasyNetQ是依賴RabbitMQ.Client是以,會同時安裝兩個dll。
PM> Install-Package EasyNetQ
連接配接RabbitMQ
使用EasyNetQ連接配接RabbitMQ,是在應用程式啟動時建立一個IBus對象,并且,在應用程式關閉時釋放該對象。RabbitMQ連接配接是基于IBus接口的,當IBus中的方法被調用,連接配接才會開啟。建立一個IBus對象的方法如下:
var bus = RabbitHutch.CreateBus(“host=myServer;virtualHost=myVirtualHost;username=mike;password=topsecret”);
根據上述代碼可以看出,連接配接字元串中是基于Key/Value形式的,每個Key中間用分号(;)斷開。其中host是必須要寫的,其他的值都是可以不用寫,會采用預設的配置。連接配接中可能用到的Key如下:
- host,host=localhost 或者host =192.168.1.102或者host=my.rabbitmq.com,如果用到叢集配置的話,那麼可以用逗号将服務位址隔開,例如host=a.com,b.com,c.com
- virtualHost,虛拟主機,預設為'/'
- username,使用者登入名
- password,使用者登入密碼
- requestedHeartbeat,心跳設定,預設是10秒
- prefetchcount,預設是50
- pubisherConfirms,預設為false
- persistentMessages,消息持久化,預設為true
- product,産品名
- platform,平台
- timeout,預設為10秒
關閉連接配接,可以使用 bus.Dispose();
EasyNetQ日志(Logging)
EasyNetQ提供了一個日志接口 IEasyNetQLogger
public interface IEasyNetQLogger
{
void DebugWrite(string format, params object[] args);
void InfoWrite(string format, params object[] args);
void ErrorWrite(string format, params object[] args);
void ErrorWrite(Exception exception);
}
内部預設用的是NullLogger,即什麼也不做,不記錄日志。在測試的時候也可以用ConsoleLogger來顯示EasyNetQ運作中的各種資訊。不過一般在正式使用環境中,可以自定義日志并實作IEasyNetQLogger接口。然後在RabbitHutch.CreateBus的重載方法中注冊想用的日志類型。(日志中會記錄連接配接RabbitMQ的過程和隊列建立細節等資訊,對于不懂RabbitMQ的同學,可能這些日志沒有什麼意義)。代碼如下:
var logger = new MyLogger() // 繼承自 IEasyNetQLogger
var bus = RabbitHutch.CreateBus(“my connection”, x => x.Register<IEasyNetQLogger>(_ => logger));
消息釋出(Publish)
EasyNetQ支援最簡單的消息模式是釋出和訂閱。釋出消息後,任意消費者可以訂閱該消息,也可以多個消費者訂閱。并且不需要額外配置。首先,如上文中需要先建立一個IBus對象,然後,在建立一個可序列化的.NET對象。調用Publish方法即可。
var bus = RabbitHutch.CreateBus(...);
var message = new MyMessage { Text = "Hello Rabbit" };
bus.Publish<MyMessage>(message);
警告,Publish隻顧發送消息到隊列,但是不管有沒有消費端訂閱,是以,釋出之後,如果沒有消費者,該消息将不會被消費甚至丢失。
消息訂閱(Subscribe)
EasyNetQ提供了消息訂閱,當調用Subscribe方法時候,EasyNetQ會建立一個用于接收消息的隊列,不過與消息釋出不同的是,消息訂閱增加了一個參數,subscribe_id.代碼如下:
bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));
第一個參數是訂閱id,另外一個是delegate參數,用于處理接收到的消息。這裡要注意的是,subscribe_id參數很重要,假如開發者用同一個subscribeid訂閱了同一種消息類型兩次或者多次,RabbitMQ會以輪訓的方式給每個訂閱的隊列發送消息。接收到之後,其他隊列就接收不到該消息。如果用不同的subscribeid訂閱同一種消息類型,那麼生成的每一個隊列都會收到該消息。
舉個例子:出庫發貨,我們有五個商品倉庫,每個倉庫的商品都是一樣的,假如來了一堆訂單,那麼我們需要五個倉庫共同工作,分别處理訂單。而同樣,總倉庫需要知道總出貨量,正常情況下,可以用每個倉庫的出貨量相加即可。不過如果我們在總倉庫也監聽商品訂單消息,那麼,每次來訂單,總倉庫也都會收到一份,那麼可以作相應的統計了。
需要注意的是,在收到消息處理消息時候,不要占用太多的時間,會影響消息的處理效率,是以,遇到占用長時間的處理方法,最好用異步處理。代碼如下:
bus.SubscribeAsync<MyMessage>("subscribe_async_test", message =>
new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
.ContinueWith(task =>
Console.WriteLine("Received: '{0}', Downloaded: '{1}'",
message.Text,
task.Result)));
取消訂閱,可以用如下方法:
var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler);
...
subscriptionResult.Dispose();
或者直接IBus.Dispose();
消息發送(Send)和接收(Receive)
與Publish/Subscribe略有不同的是,Send/Receive 可以自己定義隊列名稱。
bus.Send("my.queue", new MyMessage{ Text = "Hello Widgets!" });
bus.Receive<MyMessage>("my.queue", message => Console.WriteLine("MyMessage: {0}", message.Text));
并且,也可以在同一個隊列上發送不同的消息類型,Receive方法可以這麼寫:
bus.Receive("my.queue", x => x
.Add<MyMessage>(message => deliveredMyMessage = message)
.Add<MyOtherMessage>(message => deliveredMyOtherMessage = message));
如果消息到達隊列,但是沒有發現相應消息類型的處理時,EasyNetQ會發送一條消息到error隊列,并且,帶上一個異常資訊:No handler found for message type <message type>。與Subscribe類型,如果在同一個隊列,同一個消息類型,多次調用Receive方法時,消息會通過輪詢的形式發送給每個Receive端。
消息路由(Topic Based Routing)
Publish方法,可以加一個topic參數。
bus.Publish(message, "X.A");
消息訂閱方可以通過路由來過濾相應的消息。
* 比對一個字元
#比對0個或者多個字元
是以 X.A.2 會比對到 "#", "X.#", "*.A.*" 但不會比對 "X.B.*" 或者 "A". 當消息訂閱需要用到topic時候,需要調用Subscribe的重載方法
bus.Subscribe("my_id", handlerOfXDotStar, x => x.WithTopic("X.*"));
bus.Subscribe("my_id", handlerOfStarDotB, x => x.WithTopic("*.B"));
上述這種方式,會将消息輪詢發送給兩個訂閱者,如果隻需要一個訂閱者的話,可以這麼調用:
bus.Subscribe("my_id", handler, x => x.WithTopic("X.*").WithTopic("*.B"));
總結
以上就是EasyNetQ的一些基本用法了,是不是很簡單呢,就這麼輕松實作了消息隊列的使用。當然,要深入内部還是有很多東西的。比如依賴注入,自定義EasyNetQ元件,RPC實作等。而且,他的源碼也是比較有參考價值的,相對于之前自己寫的基于RabbitMQ的封裝,自己的簡直是不能看呀。希望本文能給讀完的你帶來幫助。
多學點,總不會吃虧的。