在之前的兩篇文章中,主要介紹了RabbitMQ環境配置,簡單示例的編寫。今天将會介紹如何使用WCF将RabbitMQ列隊以服務的方式進行釋出。
注:因為RabbitMQ的官方.net用戶端中包括了WCF的SAMPLE代碼示範,很适合初學者,是以我就偷了個懶,直接對照它的SAMPLE來說明了,算是借花獻佛吧,呵呵。
首先我們下載下傳相應源碼(基于.NET 3.0),本文主要對該源碼包中的代碼進行講解,連結如下:
Binary, compiled for .NET 3.0 and newer (zip) - includes example code, the WCF binding and WCF examples
當然官方還提供了基本.NET 2.0 版本的示例版本,但其中隻是一些簡單的示例,并不包括WCF部分,這裡隻發個連結,感興趣的朋友可自行研究。
Binary, compiled for .NET 2.0 (zip) - includes example code
下載下傳基于.NET 3.0的版本源碼之後,解壓其中的projects\examples\wcf目錄,可看到如下的項目:

幾個檔案夾分别對應如下應用場景:
OneWay: 單向通信(無傳回值)
TwoWay: 雙向通信(請求/響應)
Session:會話方式
Duplex: 雙向通信(可以指定一個Callback回調函數)
下面逐一進行介紹:
OneWay
在OneWayTest示例中,示範了插入日志資料,因為日志操作一般隻是單純的寫入操作,不考慮傳回值,是以使用OneWay方式。
下面是其WCF接口聲明和執行個體代碼,如下:
[ServiceContract]
public interface ILogServiceContract
{
[OperationContract(IsOneWay=true)]
void Log(LogData entry);
}
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
public class LogService : ILogServiceContract
{
public int m_i;
public void Log(LogData entry)
{
Util.WriteLine(ConsoleColor.Magenta, " [SVC] {3} [{0,-6}] {1, 12}: {2}", entry.Level, entry.TimeStamp, entry.Message, m_i++);
}
}
其隻包含一個方法:Log(LogData entry) ---用于添加日志記錄,可以看出它與我們以往寫WCF代碼沒什麼兩樣。
不過這裡要說明一下,在類屬性InstanceContextMode枚舉類型中,使用了“Single”模式,而該枚舉提供了如下三種情況:
Single - 為所有用戶端調用配置設定一個服務執行個體。
PerCall – 為每個用戶端調用配置設定一個服務執行個體。
PerSession – 為每個用戶端會話配置設定一個服務執行個體。每個Session内多線程操作執行個體的話會有并發問題。
InstanceContextMode 的預設設定為 PerSession
這三個值通常是要與并發模式(ConcurrencyMode)搭配使用,以解決并發效率,共享資源等複雜場景下的問題的。下面是并發模式的說明:
ConcurrencyMode 控制一次允許多少個線程進入服務。ConcurrencyMode 可以設定為以下值之一:
Single - 一次可以有一個線程進入服務。
Reentrant - 一次可以有一個線程進入服務,但允許回調。
Multiple - 一次可以有多個線程進入服務。
ConcurrencyMode 的預設設定為 Single。
InstanceContextMode 和 ConcurrencyMode 設定會互相影響,是以為了提升并發效能,必須協調這兩項設定。
例如,将 InstanceContextMode 設定為 PerCall 時,會忽略 ConcurrencyMode 設定。這是因為,每個用戶端調用都将路由到新的服務執行個體,是以一次隻會有一個線程在服務執行個體中運作。對于PerCall的執行個體模型,每個用戶端請求都會與服務端的一個獨立的服務執行個體進行互動,就不會出現多個用戶端請求争用一個服務執行個體的情況,也就不會出現并發沖突,不會影響吞吐量的問題。但對于執行個體内部的共享變量(static)還是會可能出現沖突。
但對于目前Single設定,原因很多,可能包括:
1. 建立服務執行個體需要大量的處理工作。當多個用戶端通路服務時,僅允許建立一個服務執行個體可以降低所需處理量。
2. 可以降低垃圾回收成本,因為不必為每個調用建立和銷毀服務建立的對象。
3. 可以在多個用戶端之間共享服務執行個體。
4. 避免對static靜态屬性的通路沖突。
但如果使用Single,問題也就出來了---就是性能,因為如果 ConcurrencyMode也同時設定成Single時,目前示例中的(唯一)服務執行個體不會同時處理多個(單線程用戶端)請求。因為服務在處理請求時會對目前服務加鎖,如果再有其它請求需要該服務處理的時候,需要排隊等候。如果有大量用戶端通路,這可能會導緻較大的瓶頸。
當然如果考慮到多線程用戶端使用的情況,可能問題會更嚴重。
聊了這些,無非就是要結合具體應用場景來靈活搭配ConcurrencyMode,InstanceContextMode這兩個枚舉值。
下面言歸正傳,來看一下如何将該服務與RabbitMQ進行綁定,以實作以WCF方式通路RabbitMQ服務的效果。這裡暫且略過LogData資料結構資訊類,直接看一下如果綁定服務代碼(位于OneWayTest.cs):
private ServiceHost m_host;
public void StartService(Binding binding)
{
m_host = new ServiceHost(typeof(LogService), new Uri("soap.amqp:///"));
((RabbitMQBinding)binding).OneWayOnly = true;
m_host.AddServiceEndpoint(typeof(ILogServiceContract), binding, "LogService");
m_host.Open();
m_serviceStarted = true;
}
StartService方法的主體與我們平時啟動WCF服務的方法差不多,隻不過是将其中的URL協定部分換成了“soap.amqp”形式,而其中的傳入參數binding則是RabbitMQBinding類型,該類型是rabbitmq用戶端類庫提供的用于對應Binding類的RabbitMQBinding實作。下面就是其類實始化代碼:
return new RabbitMQBinding(System.Configuration.ConfigurationManager.AppSettings["manual-test-broker-uri"],RabbitMQ.Client.Protocols.FromConfiguration("manual-test-broker-protocol"));
其包括兩個參數,一個是rabbitmq服務位址,一個是所用的協定,其對應示例app.config檔案中的如下結點:
<add key="manual-test-broker-uri" value="amqp://10.0.4.79:5672/"/><!--本系列第一篇中的環境設定-->
<add key="manual-test-broker-protocol" value="AMQP_0_8"/>
這樣,我們就完成了初始化服務執行個體工作。
接着來構造用戶端代碼,如下:
private ChannelFactory<ILogServiceContract> m_factory;
private ILogServiceContract m_client;
public ILogServiceContract GetClient(Binding binding)
{
((RabbitMQBinding)binding).OneWayOnly = true;
m_factory = new ChannelFactory<ILogServiceContract>(binding, "soap.amqp:///LogService");
m_factory.Open();
return m_factory.CreateChannel();
}
與平時寫的代碼相似,但傳入參數就是上面提到的那個RabbitMQBinding執行個體,這樣通過下面代碼通路WCF中的LOG方法:
m_client = GetClient(Program.GetBinding());
m_client.Log(new LogData(LogLevel.High, "Hello Rabbit"));
m_client.Log(new LogData(LogLevel.Medium, "Hello Rabbit"));
....
到這裡,我們可以看出,它的實作還是很簡單的。我們隻要把10.0.4.79:5672上的rabbitmq的環境跑起來,就可以看出最終的效果了。
之後我将C#的服務端(startservice)與用戶端(getclient)分開布署到不同IP的主機上,也實作了示例中的結果。
TwoWay
下面介紹一下 TwoWay雙向通信示例,首先是WCF接口聲明和實作:
[ServiceContract]
public interface ICalculator
{
[OperationContract]
int Add(int x, int y);
[OperationContract]
int Subtract(int x, int y);
}
[ServiceBehavior(InstanceContextMode=InstanceContextMode.PerCall)] /*為每個用戶端調用配置設定一個服務執行個體*/
public sealed class Calculator : ICalculator
{
public int Add(int x, int y)
{
return x + y;
}
public int Subtract(int x, int y)
{
return x - y;
}
}
因為其服務的啟動startservice和用戶端執行個體構造與oneway方法類似,為了節約篇幅,這時就略過了,下面是其最終調用代碼(位于TwoWayTest.cs):
public void Run()
{
StartService(Program.GetBinding());
ICalculator calc = GetClient(Program.GetBinding());
int result = 0, x = 3, y = 4;
Util.WriteLine(ConsoleColor.Magenta, " {0} + {1} = {2}", x, y, result = calc.Add(x, y));
if (result != x + y)
throw new Exception("Test Failed");
......
}
與普通的WCF TWOWAY 傳回調用方式相同,就不多說了。
Session
下面是基于Session會話方式的代碼,WCF接口聲明和實作:
[ServiceContract(SessionMode= SessionMode.Required)]
public interface ICart
{
[OperationContract]
void Add(CartItem item);
[OperationContract]
double GetTotal();
Guid Id { [OperationContract]get; }
}
[ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
public class Cart : ICart
{
public Cart()
{
Items = new List<CartItem>();
m_id = Guid.NewGuid();
}
private Guid m_id;
private List<CartItem> m_items;
private List<CartItem> Items {
get { return m_items; }
set { m_items = value; }
}
public void Add(CartItem item)
{
Items.Add(item);
}
public double GetTotal()
{
double total = 0;
foreach (CartItem i in Items)
total += i.Price;
return total;
}
public Guid Id { get { return m_id; } }
}
該接口實作一個購物車功能,可以添加商品并計算總價,考慮到并發場景,這裡将其執行個體為PerSession枚舉類型,即為每個用戶端會話配置設定一個服務執行個體。這樣就可以在使用者點選購買一件商品裡,為其購物車商品清單List<CartItem>添加一條資訊,而不會與其它使用者的購物車商品清單相沖突。
其最終的調用方法如下:
public void Run()
{
StartService(Program.GetBinding());
ICart cart = GetClient(Program.GetBinding());
AddToCart(cart, "Beans", 0.49);//添加商品到購物車
AddToCart(cart, "Bread", 0.89);
AddToCart(cart, "Toaster", 4.99);
double total = cart.GetTotal();//計算總價
if (total != (0.49 + 0.89 + 4.99))
throw new Exception("Incorrect Total");
......
}
Duplex
最後,再介紹一下如何基于Duplex雙向通信模式進行開發,DuplexTest這是個“PIZZA訂單”的場景,使用者下單之後,等待服務端将PIZZA加工完畢,然後服務端用callback方法通知用戶端PIZZA已做好,相應WCF接口聲明和實作如下:
[ServiceContract(CallbackContract=typeof(IPizzaCallback))] /*綁定回調接口*/
public interface IPizzaService
{
[OperationContract(IsOneWay=true)]
void PlaceOrder(Order order);
}
[ServiceContract]
public interface IPizzaCallback
{
[OperationContract(IsOneWay=true)]
void OrderReady(Guid id); /*用于通知用戶端*/
}
public class PizzaService : IPizzaService
{
public void PlaceOrder(Order order)
{
foreach (Pizza p in order.Items)
{
Util.WriteLine(ConsoleColor.Magenta, " [SVC] Cooking a {0} {1} Pizza...", p.Base, p.Toppings);
}
Util.WriteLine(ConsoleColor.Magenta, " [SVC] Order {0} is Ready!", order.Id);
Callback.OrderReady(order.Id);
}
IPizzaCallback Callback
{
get { return OperationContext.Current.GetCallbackChannel<IPizzaCallback>(); } //目前上下文中調用用戶端綁定的回調方法
}
}
這裡要說明的是IPizzaCallback接口的OrderReady方法被綁定了IsOneWay=true屬性,主要是因為如果使用“請求-響應”模式,用戶端必須等服務端“響應”完成上一次“請求”後才能發出下一步“請求”。是以雖然用戶端可以使用多線程方式來調用服務,但最後的執行結果仍然表現出順序處理(效率低)。要想使服務端能夠并行處理用戶端請求的話,那我們就不能使用“請求-響應”的調用模式,是以這裡使用One-Way的方式來調用服務。
下面是用戶端回調接口實作:
public class PizzaClient : DuplexClientBase<IPizzaService>, IPizzaService
{
public PizzaClient(InstanceContext context, Binding binding, EndpointAddress remoteAddress)
: base(context, binding, remoteAddress) { }
public void PlaceOrder(Order order)
{
Channel.PlaceOrder(order);
}
}
最終用戶端執行個體化(startservice)略過,因與之前示例類似。
public IPizzaService GetClient(Binding binding)
{
PizzaClient client = new PizzaClient(new InstanceContext(this), binding, new EndpointAddress(serverUri.ToString()));
client.Open();
return client;
}
上面的方法中将目前用戶端執行個體this(實作了IServiceTest<IPizzaService>, IPizzaCallback接口)注冊到上下文中,目的是為了将其方法的回傳調用傳遞到服務端(還記得服務端的這行代碼嗎?=>Callback.OrderReady(order.Id))
public void OrderReady(Guid id)
{
Util.WriteLine(ConsoleColor.Magenta, " [CLI] Order {0} has been delivered.",id);
mre.Set();
}
這樣,服務端完成pizza時,就可以調用用戶端的OrderReady方法來實作通知功能了。
下面就是一個整個的下單流程實作:
public void Run()
{
......
StartService(Program.GetBinding());
IPizzaService client = GetClient(Program.GetBinding());
Order lunch = new Order();
lunch.Items = new List<Pizza>();
lunch.Items.Add(new Pizza(PizzaBase.ThinCrust, "Meat Feast"));
client.PlaceOrder(lunch);
......
}
好了,今天的主要内容就先到這裡了,在接下來的文章中,将會介紹一個rabbitmq的實際應用場景,也是我們Discuz!NT企業版中的一個功能:記錄系統運作的錯誤日志。
敬請觀注!
原文連結:http://www.cnblogs.com/daizhj/archive/2010/10/22/1858284.html
Tags:Rabbitmq,NET
BLOG: http://daizhj.cnblogs.com/
作者:daizhj,代震軍
相關WCF參考文檔:
WCF開始使用6--并發1
WCF執行個體上下文模式與并發模式對性能的影響