天天看點

消息與.NET Remoting分布式處理架構

消息與.Net Remoting的分布式處理架構

  分布式處理在大型企業應用系統中,最大的優勢是将負載分布。通過多台伺服器處理多個任務,以優化整個系統的處理能力和運作效率。分布式處理的技術核心是完成服務與服務之間、服務端與用戶端之間的通信。在.Net 1.1中,可以利用Web Service或者.Net Remoting來實作服務程序之間的通信。本文将介紹一種基于消息的分布式處理架構,利用了.Net Remoting技術,并參考了CORBA Naming Service的處理方式,且定義了一套消息體制,來實作分布式處理。 

   一、消息的定義

      要實作程序間的通信,則通信内容的載體——消息,就必須在服務兩端具有統一的消息标準定義。從通信的角度來看,消息可以分為兩類:Request Messge和Reply Message。為簡便起見,這兩類消息可以采用同樣的結構。

      消息的主體包括ID,Name和Body,我們可以定義如下的接口方法,來獲得消息主體的相關屬性:

    public interface IMessage:ICloneable

    ...{

        IMessageItemSequence GetMessageBody();

        string GetMessageID();

        string GetMessageName();

        void SetMessageBody(IMessageItemSequence aMessageBody);

        void SetMessageID(string aID);

        void SetMessageName(string aName);

    }

  消息主體類Message實作了IMessage接口。在該類中,消息體Body為IMessageItemSequence類型。這個類型用于Get和Set消息的内容:Value和Item: 

    public interface IMessageItemSequence:ICloneable

    ...{        

        IMessageItem GetItem(string aName);

        void SetItem(string aName,IMessageItem aMessageItem);

        string GetValue(string aName);        

        void SetValue(string aName,string aValue);

  Value為string類型,并利用HashTable來存儲Key和Value的鍵值對。而Item則為IMessageItem類型,同樣的在IMessageItemSequence的實作類中,利用HashTable存儲了Key和Item的鍵值對。

      IMessageItem支援了消息體的嵌套。它包含了兩部分:SubValue和SubItem。實作的方式和IMessageItemSequence相似。定義這樣的嵌套結構,使得消息的擴充成為可能。一般的結構如下:

      IMessage——Name

                     ——ID

                     ——Body(IMessageItemSequence)

                            ——Value

                            ——Item(IMessageItem)

                                   ——SubValue

                                   ——SubItem(IMessageItem)

                                          ——……

  各個消息對象之間的關系如下:

  在實作服務程序通信之前,我們必須定義好各個服務或各個業務的消息格式。通過消息體的方法在服務的一端設定消息的值,然後發送,并在服務的另一端獲得這些值。例如發送消息端定義如下的消息體:

  IMessageFactory factory = new MessageFactory();

   IMessageItemSequence body = factory.CreateMessageItemSequence();

   body.SetValue("name1","value1");

   body.SetValue("name2","value2");

   IMessageItem item = factory.CreateMessageItem();

   item.SetSubValue("subname1","subvalue1");

   item.SetSubValue("subname2","subvalue2");

   IMessageItem subItem1 = factory.CreateMessageItem();

    subItem1.SetSubValue("subsubname11","subsubvalue11");

    subItem1.SetSubValue("subsubname12","subsubvalue12");

    IMessageItem subItem2 = factory.CreateMessageItem();

    subItem1.SetSubValue("subsubname21","subsubvalue21");

    subItem1.SetSubValue("subsubname22","subsubvalue22");

    item.SetSubItem("subitem1",subItem1);

    item.SetSubItem("subitem2",subItem2);

    body.SetItem("item",item);

    //Send Request Message

    MyServiceClient service = new MyServiceClient("Client");

   IMessageItemSequence reply = service.SendRequest("TestService","Test1",body);

  在接收消息端就可以通過獲得body的消息體内容,進行相關業務的處理。

  二、.Net Remoting服務

  在.Net中要實作程序間的通信,主要是應用Remoting技術。根據前面對消息的定義可知,實際上服務的實作,可以認為是對消息的處理。是以,我們可以對服務進行抽象,定義接口IService:

    public interface IService

    {

         IMessage Execute(IMessage aMessage);

  Execute()方法接受一條Request Message,對其進行處理後,傳回一條Reply Message。在整個分布式處理架構中,可以認為所有的服務均實作該接口。但受到Remoting技術的限制,如果要實作服務,則該服務類必須繼承自MarshalByRefObject,同時必須在服務端被Marshal。随着服務類的增多,必然要在服務兩端都要對這些服務的資訊進行管理,這加大了系統實作的難度與管理的開銷。如果我們從另外一個角度來分析服務的性質,基于消息處理而言,所有服務均是對Request Message的處理。我們完全可以定義一個Request服務負責此消息的處理。

  然而,Request服務處理消息的方式雖然一緻,但畢竟服務實作的業務,即對消息處理的具體實作,卻是不相同的。對我們要實作的服務,可以分為兩大類:業務服務與Request服務。實作的過程為:首先,具體的業務服務向Request服務發出Request請求,Request服務偵聽到該請求,然後交由其偵聽的服務來具體處理。

  業務服務均具有發出Request請求的能力,且這些服務均被Request服務所偵聽,是以我們可以為業務服務抽象出接口IListenService:

    public interface IListenService

         IMessage OnRequest(IMessage aMessage);   

  Request服務實作了IService接口,并包含IListenService類型對象的委派,以執行OnRequest()方法:

    public class RequestListener:MarshalByRefObject,IService

         public RequestListener(IListenService listenService)

         {

             m_ListenService = listenService;

         }

         private IListenService m_ListenService;

         #region IService Members

         public IMessage Execute(IMessage aMessage)

             return this.m_ListenService.OnRequest(aMessage);

         }       

         #endregion

         public override object InitializeLifetimeService()

             return null;

  在RequestListener服務中,繼承了MarshalByRefObject類,同時實作了IService接口。通過該類的構造函數,接收IListService對象。

  由于Request消息均由Request服務即RequestListener處理,是以,業務服務的類均應包含一個RequestListener的委派,唯一的差別是其服務名不相同。業務服務類實作IListenService接口,但不需要繼承MarshalByRefObject,因為被Marshal的是該業務服務内部的RequestListener對象,而非業務服務本身:

    public abstract class Service:IListenService

         public Service(string serviceName)

             m_ServiceName = serviceName;   

             m_RequestListener = new RequestListener(this);  

         #region IListenService Members

         public IMessage OnRequest(IMessage aMessage)

             //……

         }  

         private string m_ServiceName;

         private RequestListener m_RequestListener;      

  Service類是一個抽象類,所有的業務服務均繼承自該類。最後的服務架構如下:

  我們還需要在Service類中定義發送Request消息的行為,通過它,才能使業務服務被RequestListener所偵聽。

   public IMessageItemSequence SendRequest(string aServiceName,string                                        aMessageName,IMessageItemSequence aMessageBody)

IMessage message = m_Factory.CreateMessage();

             message.SetMessageName(aMessageName);

             message.SetMessageID("");

             message.SetMessageBody(aMessageBody);

             IService service = FindService(aServiceName);

             IMessageItemSequence replyBody = m_Factory.CreateMessageItemSequence();

             if (service != null)

             {

                  IMessage replyMessage = service.Execute(message);

                  replyBody = replyMessage.GetMessageBody();           

             }

             else

             {           

                  replyBody.SetValue("result","Failure");           

             return replyBody;

  注意SendRequest()方法的定義,其參數包括服務名,消息名和被發送的消息主體。而在實作中最關鍵的一點是FindService()方法。我們要查找的服務正是與之對應的RequestListener服務。不過,在此之前,我們還需要先将服務Marshal:

         public void Initialize()

         {                                           

             RemotingServices.Marshal(this.m_RequestListener,this.m_ServiceName +  ".RequestListener");

  我們Marshal的對象,是業務服務中的Request服務對象m_RequestListener,這個對象在Service的構造函數中被執行個體化:

  m_RequestListener = new RequestListener(this); 

  注意,在執行個體化的時候是将this作為IListenService對象傳遞給RequestListener。是以,此時被Marshal的服務對象,保留了業務服務本身即Service的指引。可以看出,在Service和RequestListener之間,采用了“雙重委派”的機制。

  通過調用Initialize()方法,初始化了一個服務對象,其類型為RequestListener(或IService),其服務名為:Service的服務名 + ".RequestListener"。而該服務正是我們在SendRequest()方法中要查找的Service:

       IService service = FindService(aServiceName);

  下面我們來看看FindService()方法的實作:

         protected IService FindService(string aServiceName)

             lock (this.m_Services)

                  IService service = (IService)m_Services[aServiceName];

                  if (service != null)

                  {

                      return service;

                  }

                  else

                      IService tmpService = GetService(aServiceName);

                      AddService(aServiceName,tmpService);

                      return tmpService;

  可以看到,這個服務是被添加到m_Service對象中,該對象為SortedList類型,服務名為Key,IService對象為Value。如果沒有找到,則通過私有方法GetService()來獲得:

         private IService GetService(string aServiceName)

             IService service = (IService)Activator.GetObject(typeof(RequestListener),

                  "tcp://localhost:9090/" + aServiceName + ".RequestListener");

             return service;

  在這裡,Channel、IP、Port應該從配置檔案中擷取,為簡便起見,這裡直接賦為常量。

  再分析SendRequest方法,在找到對應的服務後,執行了IService的Execute()方法。此時的IService為RequestListener,而從前面對RequestListener的定義可知,Execute()方法執行的其實是其偵聽的業務服務的OnRequest()方法。

  我們可以定義一個具體的業務服務類,來分析整個消息傳遞的過程。該類繼承于Service抽象類:

    public class MyService:Service

         public MyService(string aServiceName):base(aServiceName)

         {}           

  假設把程序分為服務端和用戶端,那麼對消息處理的步驟如下:

  1、 在用戶端調用MyService的SendRequest()方法發送Request消息;

2、 查找被Marshal的服務,即RequestListener對象,此時該對象應包含對應的業務服務對象MyService;

3、 在服務端調用RequestListener的Execute()方法。該方法則調用業務服務MyService的OnRequest()方法。

  在這些步驟中,除了第一步在用戶端執行外,其他的步驟均是在服務端進行。

  三、業務服務對于消息的處理

  前面實作的服務架構,已經較為完整地實作了分布式的服務處理。但目前的實作,并未展現對消息的處理。我認為,對消息的處理,等價與具體的業務處理。這些業務邏輯必然是在服務端完成。每個服務可能會處理單個業務,也可能會處理多個業務。并且,服務與服務之間仍然存在通信,某個服務在處理業務時,可能需要另一個服務的業務行為。也就是說,每一種類的消息,處理的方式均有所不同,而這些消息的唯一辨別,則是在SendRequest()方法已經有所展現的aMessageName。

  雖然,處理的消息不同,所需要的服務不同,但是根據我們對消息的定義,我們仍然可以将這些消息處理機制抽象為一個統一的格式;在.Net中,展現這種機制的莫過于委托delegate。我們可以定義這樣的一個委托:

  public delegate void RequestHandler(string aMessageName,IMessageItemSequence aMessageBody,ref IMessageItemSequence aReplyMessageBody);

  在RequestHandler委托中,它代表了這樣一族方法:接收三個入參,aMessageName,aMessageBody,aReplyMessageBody,傳回值為void。其中,aMessageName代表了消息名,它是消息的唯一辨別;aMessageBody是待處理消息的主體,業務所需要的所有資料都存儲在aMessageBody對象中。aReplyMessageBody是一個引用對象,它存儲了消息處理後的傳回結果,通常情況下,我們可以用<"result","Success">或<"result", "Failure">來代表處理的結果是成功還是失敗。

  這些委托均在服務初始化時被添加到服務類的SortedList對象中,鍵值為aMessageName。是以我們可以在抽象類中定義如下方法:     

         protected abstract void AddRequestHandlers();

         protected void AddRequestHandler(string aMessageName,RequestHandler handler)

             lock (this.m_EventHandlers)

                  if (!this.m_EventHandlers.Contains(aMessageName))

                      this.m_EventHandlers.Add(aMessageName,handler);

         protected RequestHandler FindRequestHandler(string aMessageName)

                  RequestHandler handler = (RequestHandler)m_EventHandlers[aMessageName];

                  return handler;

         } 

  AddRequestHandler()用于添加委托對象與aMessageName的鍵值對,而FindRequestHandler()方法用于查找該委托對象。而抽象方法AddRequestHandlers()則留給Service的子類實作,簡單的實作如MyService的AddRequestHandlers()方法:

public class MyService:Service

         {}

         protected override void AddRequestHandlers()

             this.AddRequestHandler("Test1",new RequestHandler(Test1));

             this.AddRequestHandler("Test2",new RequestHandler(Test2));

         private void Test1(string aMessageName,IMessageItemSequence aMessageBody,ref  IMessageItemSequence aReplyMessageBody)

             Console.WriteLine("MessageName:{0}\n",aMessageName);

             Console.WriteLine("MessageBody:{0}\n",aMessageBody);

             aReplyMessageBody.SetValue("result","Success");

         private void Test2(string aMessageName,IMessageItemSequence aMessageBody,ref   IMessageItemSequence aReplyMessageBody)

             Console.WriteLine("Test2" + aMessageBody.ToString());

  Test1和Test2方法均為比對RequestHandler委托簽名的方法,然後在AddRequestHandlers()方法中,通過調用AddRequestHandler()方法将這些方法與MessageName對應起來,添加到m_EventHandlers中。

  需要注意的是,本文為了簡要的說明這種處理方式,是以簡化了Test1和Test2方法的實作。而在實際開發中,它們才是實作具體業務的重要方法。而利用這種方式,則解除了服務之間依賴的耦合度,我們随時可以為服務添加新的業務邏輯,也可以友善的增加服務。

  通過這樣的設計,Service的OnRequest()方法的最終實作如下所示:

             string messageName = aMessage.GetMessageName();

             string messageID = aMessage.GetMessageID();

             IMessage message = m_Factory.CreateMessage();

             IMessageItemSequence replyMessage = m_Factory.CreateMessageItemSequence();

             RequestHandler handler = FindRequestHandler(messageName);

             handler(messageName,aMessage.GetMessageBody(),ref replyMessage);

             message.SetMessageName(messageName);

             message.SetMessageID(messageID);

             message.SetMessageBody(replyMessage);

             return message;

  利用這種方式,我們可以非常友善的實作服務間通信,以及用戶端與服務端間的通信。例如,我們分别在服務端定義MyService(如前所示)和TestService:

    public class TestService:Service

         public TestService(string aServiceName):base(aServiceName)

             this.AddRequestHandler("Test1",new RequestHandler(Test1));          

         {            

             aReplyMessageBody = SendRequest("MyService",aMessageName,aMessageBody);

             aReplyMessageBody.SetValue("result2","Success");

  注意在TestService中的Test1方法,它并未直接處理消息aMessageBody,而是通過調用SendRequest()方法,将其傳遞到MyService中。

  對于用戶端而言,情況比較特殊。根據前面的分析,我們知道除了發送消息的操作是在用戶端完成外,其他的具體執行都在服務端實作。是以諸如MyService和TestService等服務類,隻需要部署在服務端即可。而用戶端則隻需要定義一個實作Service的空類即可:

    public class MyServiceClient:Service

        public MyServiceClient(string aServiceName):base(aServiceName)

  MyServiceClient類即為用戶端定義的服務類,在AddRequestHandlers()方法中并不需要實作任何代碼。如果我們在Service抽象類中,将AddRequestHandlers()方法定義為virtual而非abstract方法,則這段代碼在用戶端服務中也可以省去。另外,用戶端服務類中的aServiceName可以任意指派,它與服務端的服務名并無實際聯系。至于用戶端具體會調用哪個服務,則由SendRequest()方法中的aServiceName決定:

           IMessageFactory factory = new MessageFactory();

           IMessageItemSequence body = factory.CreateMessageItemSequence();

           //……

           MyServiceClient service = new MyServiceClient("Client");

           IMessageItemSequence reply = service.SendRequest("TestService","Test1",body);

  對于service.SendRequest()的執行而言,會先調用TestService的Test1方法;然後再通過該方法向MyService發送,最終調用MyService的Test1方法。

  我們還需要另外定義一個類,負責添加服務,并初始化這些服務:

    public class Server

         public Server()

             m_Services = new ArrayList();

         private ArrayList m_Services;     

         public void AddService(IListenService service)

             this.m_Services.Add(service);

         {  

             IDictionary tcpProp = new Hashtable();

             tcpProp["name"] = "tcp9090";

             tcpProp["port"] = 9090;

             TcpChannel channel = new TcpChannel(tcpProp,new                                             BinaryClientFormatterSinkProvider(),new BinaryServerFormatterSinkProvider());            

             ChannelServices.RegisterChannel(channel);

             foreach (Service service in m_Services)

                  service.Initialize();

             }            

  同理,這裡的Channel,IP和Port均應通過配置檔案讀取。最終的類圖如下所示:

       在服務端,可以調用Server類來初始化這些服務:

      static void Main(string[] args)

         {   

             MyService service = new MyService("MyService");

             TestService service1 = new TestService("TestService");

             Server server = new Server();

             server.AddServ

ice(service);

             server.AddService(service1);

             server.Initialize();

             Console.ReadLine();

  四、結論

  利用這個基于消息與.Net Remoting技術的分布式架構,可以将企業的業務邏輯轉換為對消息的定義和處理。要增加和修改業務,就展現在對消息的修改上。服務間的通信機制則完全交給整個架構來處理。如果我們将每一個委托所實作的業務(或者消息)了解為Contract,則該結構已經具備了SOA的雛形。當然,該架構僅僅處理了消息的傳遞,而忽略了對底層事件的處理(類似于Corba的Event Service),這個功能我想留待後面實作。

  唯一遺憾的是,我缺乏驗證這個架構穩定性和效率的環境。應該說,這個架構是我們在企業項目解決方案中的一個實踐。但是解決方案則是利用了CORBA中間件,在Unix環境下實作并運作。本架構僅僅是借鑒了核心的實作思想和設計理念,進而完成的在.Net平台下的移植。由于Unix與Windows Server的差別,其實際的優勢還有待驗證。

本文轉自斯克迪亞部落格園部落格,原文連結:http://www.cnblogs.com/sgsoft/archive/2007/05/07/738399.html,如需轉載請自行聯系原作者

繼續閱讀