天天看點

NET下RabbitMQ實踐[WCF釋出篇]

     在之前的兩篇文章中,主要介紹了RabbitMQ環境配置,簡單示例的編寫。今天将會介紹如何使用WCF将RabbitMQ列隊以服務的方式進行釋出。

     注:因為RabbitMQ的官方.net用戶端中包括了WCF的SAMPLE代碼示範,很适合初學者,是以我就偷了個懶,直接對照它的SAMPLE來說明了,算是借花獻佛吧,呵呵。

     首先我們下載下傳相應源碼(基于.NET 3.0),本文主要對該源碼包中的代碼進行講解,連結如下:

    Binary, compiled for .NET 3.0 and newer (zip) - includes example code, the WCF binding and WCF examples

    當然官方還提供了基本.NET 2.0 版本的示例版本,但其中隻是一些簡單的示例,并不包括WCF部分,這裡隻發個連結,感興趣的朋友可自行研究。

    Binary, compiled for .NET 2.0 (zip) - includes example code

    下載下傳基于.NET 3.0的版本源碼之後,解壓其中的projects\examples\wcf目錄,可看到如下的項目:

NET下RabbitMQ實踐[WCF釋出篇]

    幾個檔案夾分别對應如下應用場景:

    OneWay: 單向通信(無傳回值)

    TwoWay: 雙向通信(請求/響應)

    Session:會話方式

    Duplex: 雙向通信(可以指定一個Callback回調函數)

    下面逐一進行介紹:

   OneWay  

    在OneWayTest示例中,示範了插入日志資料,因為日志操作一般隻是單純的寫入操作,不考慮傳回值,是以使用OneWay方式。

   下面是其WCF接口聲明和執行個體代碼,如下:  

    [ServiceContract]

    public interface ILogServiceContract

    {

        [OperationContract(IsOneWay=true)]

        void Log(LogData entry);

    }

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]

    public class LogService : ILogServiceContract

    {

        public int m_i;

        public void Log(LogData entry)

        {

            Util.WriteLine(ConsoleColor.Magenta, "  [SVC] {3} [{0,-6}] {1, 12}: {2}", entry.Level, entry.TimeStamp, entry.Message, m_i++);

        }

    }

    其隻包含一個方法:Log(LogData entry) ---用于添加日志記錄,可以看出它與我們以往寫WCF代碼沒什麼兩樣。

    不過這裡要說明一下,在類屬性InstanceContextMode枚舉類型中,使用了“Single”模式,而該枚舉提供了如下三種情況:

    Single - 為所有用戶端調用配置設定一個服務執行個體。

    PerCall – 為每個用戶端調用配置設定一個服務執行個體。

    PerSession – 為每個用戶端會話配置設定一個服務執行個體。每個Session内多線程操作執行個體的話會有并發問題。  

    InstanceContextMode 的預設設定為 PerSession

    這三個值通常是要與并發模式(ConcurrencyMode)搭配使用,以解決并發效率,共享資源等複雜場景下的問題的。下面是并發模式的說明:

    ConcurrencyMode 控制一次允許多少個線程進入服務。ConcurrencyMode 可以設定為以下值之一:

    Single - 一次可以有一個線程進入服務。

    Reentrant - 一次可以有一個線程進入服務,但允許回調。

    Multiple - 一次可以有多個線程進入服務。    

    ConcurrencyMode 的預設設定為 Single。

    InstanceContextMode 和 ConcurrencyMode 設定會互相影響,是以為了提升并發效能,必須協調這兩項設定。

    例如,将 InstanceContextMode 設定為 PerCall 時,會忽略 ConcurrencyMode 設定。這是因為,每個用戶端調用都将路由到新的服務執行個體,是以一次隻會有一個線程在服務執行個體中運作。對于PerCall的執行個體模型,每個用戶端請求都會與服務端的一個獨立的服務執行個體進行互動,就不會出現多個用戶端請求争用一個服務執行個體的情況,也就不會出現并發沖突,不會影響吞吐量的問題。但對于執行個體内部的共享變量(static)還是會可能出現沖突。

    但對于目前Single設定,原因很多,可能包括:

    1. 建立服務執行個體需要大量的處理工作。當多個用戶端通路服務時,僅允許建立一個服務執行個體可以降低所需處理量。

    2. 可以降低垃圾回收成本,因為不必為每個調用建立和銷毀服務建立的對象。

    3. 可以在多個用戶端之間共享服務執行個體。

    4. 避免對static靜态屬性的通路沖突。

    但如果使用Single,問題也就出來了---就是性能,因為如果 ConcurrencyMode也同時設定成Single時,目前示例中的(唯一)服務執行個體不會同時處理多個(單線程用戶端)請求。因為服務在處理請求時會對目前服務加鎖,如果再有其它請求需要該服務處理的時候,需要排隊等候。如果有大量用戶端通路,這可能會導緻較大的瓶頸。

    當然如果考慮到多線程用戶端使用的情況,可能問題會更嚴重。

    聊了這些,無非就是要結合具體應用場景來靈活搭配ConcurrencyMode,InstanceContextMode這兩個枚舉值。

    下面言歸正傳,來看一下如何将該服務與RabbitMQ進行綁定,以實作以WCF方式通路RabbitMQ服務的效果。這裡暫且略過LogData資料結構資訊類,直接看一下如果綁定服務代碼(位于OneWayTest.cs):

private ServiceHost m_host;

public void StartService(Binding binding)

{

    m_host = new ServiceHost(typeof(LogService), new Uri("soap.amqp:///"));

    ((RabbitMQBinding)binding).OneWayOnly = true;    

    m_host.AddServiceEndpoint(typeof(ILogServiceContract), binding, "LogService");

    m_host.Open();

    m_serviceStarted = true;  

}

    StartService方法的主體與我們平時啟動WCF服務的方法差不多,隻不過是将其中的URL協定部分換成了“soap.amqp”形式,而其中的傳入參數binding則是RabbitMQBinding類型,該類型是rabbitmq用戶端類庫提供的用于對應Binding類的RabbitMQBinding實作。下面就是其類實始化代碼:

     return new RabbitMQBinding(System.Configuration.ConfigurationManager.AppSettings["manual-test-broker-uri"],RabbitMQ.Client.Protocols.FromConfiguration("manual-test-broker-protocol"));

    其包括兩個參數,一個是rabbitmq服務位址,一個是所用的協定,其對應示例app.config檔案中的如下結點:   

<add key="manual-test-broker-uri" value="amqp://10.0.4.79:5672/"/><!--本系列第一篇中的環境設定-->

<add key="manual-test-broker-protocol" value="AMQP_0_8"/>

    這樣,我們就完成了初始化服務執行個體工作。

    接着來構造用戶端代碼,如下:

private ChannelFactory<ILogServiceContract> m_factory;

private ILogServiceContract m_client;                          

public ILogServiceContract GetClient(Binding binding)

{

    ((RabbitMQBinding)binding).OneWayOnly = true;

    m_factory = new ChannelFactory<ILogServiceContract>(binding, "soap.amqp:///LogService");

    m_factory.Open();

    return m_factory.CreateChannel();

}

    與平時寫的代碼相似,但傳入參數就是上面提到的那個RabbitMQBinding執行個體,這樣通過下面代碼通路WCF中的LOG方法:

    m_client = GetClient(Program.GetBinding());

    m_client.Log(new LogData(LogLevel.High, "Hello Rabbit"));

    m_client.Log(new LogData(LogLevel.Medium, "Hello Rabbit"));

    ....

    到這裡,我們可以看出,它的實作還是很簡單的。我們隻要把10.0.4.79:5672上的rabbitmq的環境跑起來,就可以看出最終的效果了。

    之後我将C#的服務端(startservice)與用戶端(getclient)分開布署到不同IP的主機上,也實作了示例中的結果。

    TwoWay   

    下面介紹一下 TwoWay雙向通信示例,首先是WCF接口聲明和實作:   

    [ServiceContract]

    public interface ICalculator

    {

        [OperationContract]

        int Add(int x, int y);

        [OperationContract]

        int Subtract(int x, int y);

    }

   [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerCall)] /*為每個用戶端調用配置設定一個服務執行個體*/

    public sealed class Calculator : ICalculator

    {

        public int Add(int x, int y)

        {

            return x + y;

        }

        public int Subtract(int x, int y)

        {

            return x - y;

        }

    }

      因為其服務的啟動startservice和用戶端執行個體構造與oneway方法類似,為了節約篇幅,這時就略過了,下面是其最終調用代碼(位于TwoWayTest.cs):   

  public void Run()

 {

     StartService(Program.GetBinding());

     ICalculator calc = GetClient(Program.GetBinding());

     int result = 0, x = 3, y = 4;

     Util.WriteLine(ConsoleColor.Magenta, "  {0} + {1} = {2}", x, y, result = calc.Add(x, y));

     if (result != x + y)

         throw new Exception("Test Failed");

    ......

 }   

    與普通的WCF TWOWAY 傳回調用方式相同,就不多說了。

    Session   

    下面是基于Session會話方式的代碼,WCF接口聲明和實作:

    [ServiceContract(SessionMode= SessionMode.Required)]

    public interface ICart

    {

        [OperationContract]

        void Add(CartItem item);

        [OperationContract]

        double GetTotal();

        Guid Id { [OperationContract]get; }

    }

    [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]

    public class Cart : ICart

    {

        public Cart()

        {

            Items = new List<CartItem>();

            m_id = Guid.NewGuid();

        }

        private Guid m_id;

        private List<CartItem> m_items;

        private List<CartItem> Items {

            get { return m_items; }

            set { m_items = value; }

        }

        public void Add(CartItem item)

        {

            Items.Add(item);

        }

        public double GetTotal()

        {

            double total = 0;

            foreach (CartItem i in Items)

                total += i.Price;

            return total;

        }

        public Guid Id { get { return m_id; } }

    }

    該接口實作一個購物車功能,可以添加商品并計算總價,考慮到并發場景,這裡将其執行個體為PerSession枚舉類型,即為每個用戶端會話配置設定一個服務執行個體。這樣就可以在使用者點選購買一件商品裡,為其購物車商品清單List<CartItem>添加一條資訊,而不會與其它使用者的購物車商品清單相沖突。

    其最終的調用方法如下:

public void Run()

{

    StartService(Program.GetBinding());

    ICart cart = GetClient(Program.GetBinding());

    AddToCart(cart, "Beans", 0.49);//添加商品到購物車

    AddToCart(cart, "Bread", 0.89);

    AddToCart(cart, "Toaster", 4.99);

    double total = cart.GetTotal();//計算總價

    if (total != (0.49 + 0.89 + 4.99))

        throw new Exception("Incorrect Total");

    ......

}  

    Duplex   

    最後,再介紹一下如何基于Duplex雙向通信模式進行開發,DuplexTest這是個“PIZZA訂單”的場景,使用者下單之後,等待服務端将PIZZA加工完畢,然後服務端用callback方法通知用戶端PIZZA已做好,相應WCF接口聲明和實作如下:

   [ServiceContract(CallbackContract=typeof(IPizzaCallback))] /*綁定回調接口*/

    public interface IPizzaService

    {

        [OperationContract(IsOneWay=true)]

        void PlaceOrder(Order order);

    }

    [ServiceContract]

    public interface IPizzaCallback

    {

        [OperationContract(IsOneWay=true)]

        void OrderReady(Guid id); /*用于通知用戶端*/

    }    

    public class PizzaService : IPizzaService

    {

        public void PlaceOrder(Order order)

        {

            foreach (Pizza p in order.Items)

            {

                Util.WriteLine(ConsoleColor.Magenta, "  [SVC] Cooking a {0} {1} Pizza...", p.Base, p.Toppings);

            }

            Util.WriteLine(ConsoleColor.Magenta, "  [SVC] Order {0} is Ready!", order.Id);

            Callback.OrderReady(order.Id);

        }

        IPizzaCallback Callback

        {

            get { return OperationContext.Current.GetCallbackChannel<IPizzaCallback>(); } //目前上下文中調用用戶端綁定的回調方法

        }

    }

    這裡要說明的是IPizzaCallback接口的OrderReady方法被綁定了IsOneWay=true屬性,主要是因為如果使用“請求-響應”模式,用戶端必須等服務端“響應”完成上一次“請求”後才能發出下一步“請求”。是以雖然用戶端可以使用多線程方式來調用服務,但最後的執行結果仍然表現出順序處理(效率低)。要想使服務端能夠并行處理用戶端請求的話,那我們就不能使用“請求-響應”的調用模式,是以這裡使用One-Way的方式來調用服務。

    下面是用戶端回調接口實作:   

    public class PizzaClient : DuplexClientBase<IPizzaService>, IPizzaService

    {

        public PizzaClient(InstanceContext context, Binding binding, EndpointAddress remoteAddress)

            : base(context, binding, remoteAddress) { }

        public void PlaceOrder(Order order)

        {

            Channel.PlaceOrder(order);

        }

    }

    最終用戶端執行個體化(startservice)略過,因與之前示例類似。

    public IPizzaService GetClient(Binding binding)

    {

        PizzaClient client = new PizzaClient(new InstanceContext(this), binding, new EndpointAddress(serverUri.ToString()));

        client.Open();

        return client;

    }

     上面的方法中将目前用戶端執行個體this(實作了IServiceTest<IPizzaService>, IPizzaCallback接口)注冊到上下文中,目的是為了将其方法的回傳調用傳遞到服務端(還記得服務端的這行代碼嗎?=>Callback.OrderReady(order.Id))

public void OrderReady(Guid id)

{

    Util.WriteLine(ConsoleColor.Magenta, "  [CLI] Order {0} has been delivered.",id);

    mre.Set();

}

    這樣,服務端完成pizza時,就可以調用用戶端的OrderReady方法來實作通知功能了。

    下面就是一個整個的下單流程實作:

public void Run()

{

       ......

       StartService(Program.GetBinding());

       IPizzaService client = GetClient(Program.GetBinding());

       Order lunch = new Order();

       lunch.Items = new List<Pizza>();

       lunch.Items.Add(new Pizza(PizzaBase.ThinCrust, "Meat Feast"));

       client.PlaceOrder(lunch);

       ......

}

    好了,今天的主要内容就先到這裡了,在接下來的文章中,将會介紹一個rabbitmq的實際應用場景,也是我們Discuz!NT企業版中的一個功能:記錄系統運作的錯誤日志。

    敬請觀注!        

    原文連結:http://www.cnblogs.com/daizhj/archive/2010/10/22/1858284.html   

    Tags:Rabbitmq,NET

    BLOG: http://daizhj.cnblogs.com/

    作者:daizhj,代震軍    

    相關WCF參考文檔:       

    WCF開始使用6--并發1       

    WCF執行個體上下文模式與并發模式對性能的影響