天天看點

分布式消息總線,基于.NET Socket Tcp的釋出-訂閱架構之離線支援,附代碼下載下傳

一、分布式消息總線以及基于Socket的實作

     在前面的分享一個分布式消息總線,基于.NET Socket Tcp的釋出-訂閱架構,附代碼下載下傳一文之中給大家分享和介紹了一個極其簡單也非常容易上的基于.NET Socket Tcp 技術實作的分布消息總線,也是一個簡單的釋出訂閱架構:

分布式消息總線,基于.NET Socket Tcp的釋出-訂閱架構之離線支援,附代碼下載下傳

    并且以案例的形式為大家示範了如何使用這個分布式消息總線架構釋出訂閱架構模式的應用程式,在得到各位同仁的回報的同時,大家也非常想了解訂閱者離線的情況,即支援離線構釋出訂閱架構。

二、離線架構

     不同于訂閱者、釋出者都同時線上的情況,支援訂閱者離線,架構将有所變化,如下圖所示:

分布式消息總線,基于.NET Socket Tcp的釋出-訂閱架構之離線支援,附代碼下載下傳

     也會比原先的結構将更加複雜,其中需要處理以下兩個關鍵點:

     1)訂閱者的持久化存儲。

     2)訂閱者離線之後其所訂閱消息的持久存儲。

三、解決方案

     為解決消息總線的離線支援機制,我們在Socket 架構之中增加了一個接口ISubscribeStorager:

1: using System;      
2: using System.Collections.Generic;      
3: using System.Linq;      
4: using System.Text;      
5:        
6: namespace EAS.Messages      
7: {         
8:     /// <summary>      
9:     /// 消息訂閱存儲接口。      
10:     /// </summary>      
11:     public interface ISubscribeStorager      
12:     {      
13:         /// <summary>      
14:         /// 持久化訂閱。      
15:         /// </summary>      
16:         /// <param name="subscriber">訂閱者。</param>      
17:         /// <param name="topic">消息主題。</param>      
18:         void Subscribe(string subscriber, string topic);      
19:        
20:         /// <summary>      
21:         /// 持久化退訂。      
22:         /// </summary>      
23:         /// <param name="subscriber">訂閱者。</param>      
24:         /// <param name="topic">消息主題。</param>      
25:         void Unsubscribe(string subscriber, string topic);      
26:        
27:         /// <summary>      
28:         /// 裝載訂閱資訊。      
29:         /// </summary>      
30:         /// <returns>系統之中的訂閱清單。</returns>      
31:         List<SubscribeItem> LoadSubscribes();      
32:        
33:         /// <summary>      
34:         /// 寫入消息。      
35:         /// </summary>      
36:         /// <param name="subscriber">訂閱者。</param>      
37:         /// <param name="message">消息對象。</param>      
38:         void Write(string subscriber, QueueMessage message);      
39:        
40:         /// <summary>      
41:         /// 讀消息。      
42:         /// </summary>      
43:         /// <param name="subscriber">訂閱者。</param>      
44:         /// <param name="message">消息對象。</param>      
45:         /// <returns>成功讀取傳回true,否則傳回false。</returns>      
46:         bool Read(string subscriber, out QueueMessage message);      
47:     }      
48: }      

     ISubscribeStorager共提供持久化訂閱持久化消息存儲共五個函數,其中:

     LoadSubscribes:服務端初始化時讀取所有的離線訂閱關系,即那個訂閱都訂閱那那個主題。

     Subscribe:持久化訂閱者,當訂閱才上線訂閱消息時,持久化訂閱關系,供離線檢測之用。

     Unsubscribe:持久化取消訂閱,當訂閱者退訂消息時,從持久化訂閱關系之中删除。

     Write:當訂閱者離線時,把訂閱消息寫入持久化存儲。

     Read:當離線訂閱者上線時,從持久存儲之中讀取一條消息向其發送。

     ISubscribeStorager:可以選擇自己實作這個接口,以建立滿足自己規則的離線存儲機制,當然在AgileEAS.NET SOA 中間件之中提供了兩種離線存儲機制,存儲于資料庫和存儲于MSMQ,下面向大家介紹一下這兩種内置實作。

四、兩種内置離線存儲機制

     在AgileEAS.NET SOA 中間件平台之中提供了兩個ISubscribeStorager的實作,基于資料庫的離線訂閱存儲實作EAS.Messages.DbSubscribeStorager和基于MSMQ的離線訂閱存儲實作EAS.Messages.MsmqSubscribeStorager。

     EAS.Messages.DbSubscribeStorager:存儲訂閱關系在messageSubscribe.Config檔案之中,消息存儲在關系資料庫SOA_SUBSCRIBEEVENTS表之中,使用前必須要建立相應的表結構,以下是SQL Server的DDL腳本:

1: CREATE TABLE [SOA_SUBSCRIBEEVENT](      
2:     [GUID] [varchar](36) NOT NULL,      
3:     [SUBSCRIBER] [nvarchar](128) NOT NULL,      
4:     [TOPIC] [nvarchar](128) NOT NULL,      
5:     [BODY] [image] NULL,      
6:     [FCTIME] [datetime] NOT NULL,      
7:  CONSTRAINT [PK_SOA_SUBSCRIBEEVENT] PRIMARY KEY CLUSTERED       
8: (      
9:     [GUID] ASC      
10: )      
11: )       

      目前理論上支援SQLServer 、Mysql、ORACLE、Sqlite四種資料庫結構,具體建表腳本請自行參考相應資料書寫,也可以使用AgileEAS.NET SOA中間件所提供的資料庫初始化工具建立。

      EAS.Messages.MsmqSubscribeStorager:存儲訂閱關系在messageSubscribe.Config檔案之中,消息存儲Msmq消息隊列之中,使用之前請確定機器上安裝了MSMQ消息對列。

五、關于自定義實作ISubscribeStorager

     有興趣的朋友可以自定義實作接口ISubscribeStorager,這樣就可以按自己的規則進行存儲,比如把離線消息存儲到mongodb、Redis、或者直接存儲在檔案之中,或者其他更多的實作規則,在此就不一一介紹,如有相關興趣,請聯系作者,如确有必要需要給在家介紹一下如何實作,将會另開一文本介紹如何自定義實作ISubscribeStorager接口。

六、改進線上例子支援離線

     還是跟上次一樣,以案例為在家展示一下怎麼進行離線消息,就不重新開始例子,對原有例子做一些改進,改進後例子如下:

分布式消息總線,基于.NET Socket Tcp的釋出-訂閱架構之離線支援,附代碼下載下傳

     其中在原有項目的基礎上增加了:Demo.Subscriber1和Demo.Subscriber2項目,其項目配置代碼、配置檔案基本上同Demo.Subscriber一樣,其中唯一的差别在于,Demo.Subscriber1和Demo.Subscriber2向伺服器送出訂閱的時候都增加一個另friendName參數,其使用IMessageBus接口的以下訂閱函數:

1: /// <summary>      
2: /// 訂閱消息。      
3: /// </summary>      
4: /// <param name="subscriber">訂閱者。</param>      
5: /// <param name="friendName">訂閱者名稱,用于處理離線訂閱。</param>      
6: /// <param name="topic">主題。</param>      
7: /// <param name="notifyHandler">訂閱通知。</param>      
8: void Subscribe(object subscriber,string friendName ,string topic, MessageNotifyHandler notifyHandler);      

                Demo.Publisher項目為釋出者代碼。

                Demo.Subscriber項目為訂閱者代碼。

                Demo.Server項目為服務端代碼。

     Demo.Subscriber1項目之中,其Program.cs代碼如下:

1: using System;      
2: using System.Collections.Generic;      
3: using System.Linq;      
4: using System.Windows.Forms;      
5: using EAS.Messages;      
6:        
7: namespace Demo.Subscriber1      
8: {      
9:     class Program      
10:     {      
11:         static void Main(string[] args)      
12:         {      
13:             var container = EAS.Objects.ContainerBuilder.BuilderDefault();      
14:             var bus = container.GetComponentInstance("MessageBus") as IMessageBus;      
15:             System.Console.WriteLine("Subscriber1");      
16:        
17:             bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);      
18:             System.Console.ReadLine();      
19:         }      
20:        
21:         static void MessageNotify(object m)      
22:         {      
23:             Demo.Messages.Message message = m as Demo.Messages.Message;      
24:             System.Console.WriteLine(string.Format("Subscribe:{0}", message.ID));      
25:         }      
26:     }      
27: }      

     其中bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);在訂閱消息的時候給了一個friendName為Subscriber1,Demo.Subscriber2與Demo.Subscriber1項目的唯一的差别就是此處為Subscriber2.

     我們使用内置的EAS.Messages.DbSubscribeStorager,則不需要修改服務端的代碼,隻需要修改服務端的配置檔案如下:

1: <?xml version="1.0" encoding="utf-8"?>      
2: <configuration>      
3:   <configSections>      
4:     <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />      
5:   </configSections>      
6:   <startup useLegacyV2RuntimeActivationPolicy="true">      
7:     <supportedRuntime version="v4.0"/>      
8:   </startup>      
9:   <eas>      
10:     <objects>      
11:       <!--資料庫連接配接-->      
12:       <object name="DbProvider" assembly="EAS.Data" type="EAS.Data.Access.SqlClientDbProvider" LifestyleType="Thread">      
13:         <property name="ConnectionString" type="string" value="Data Source=.;Initial Catalog=eas_db;Integrated Security=SSPI;Connect Timeout=30" />      
14:       </object>      
15:       <!--資料通路器-->      
16:       <object name="DataAccessor" assembly="EAS.Data" type="EAS.Data.Access.DataAccessor" LifestyleType="Thread">      
17:         <property name="DbProvider" type="object" value="DbProvider"/>      
18:         <property name="Language" type="object" value="TSqlLanguage"/>      
19:       </object>      
20:       <!--ORM通路器-->      
21:       <object name="OrmAccessor" assembly="EAS.Data" type="EAS.Data.ORM.OrmAccessor" LifestyleType="Thread">      
22:         <property name="DataAccessor" type="object" value="DataAccessor"/>      
23:       </object>      
24:       <!--查詢語言-->      
25:       <object name="TSqlLanguage" assembly="EAS.Data" type="EAS.Data.Linq.TSqlLanguage" LifestyleType="Thread"/>      
26:       <!--消息持久化-->      
27:       <object name="SubscribeStorager" assembly="EAS.SOA.BootStrap" type="EAS.Messages.DbSubscribeStorager" LifestyleType="Singleton"/>      
28:       <!--日志管理-->      
29:       <object name="Logger" assembly="EAS.MicroKernel" type="EAS.Loggers.TextLogger" LifestyleType="Singleton">      
30:         <property name="RootPath" type="string" value="G:\App.Work\Pub_Sub\Offline\Publish\logs" />      
31:       </object>      
32:     </objects>      
33:   </eas>      
34: </configuration>      

     在配置檔案的IOC配置之中我們配置了消息存儲對象以及其所依賴的資料庫通路對象、Linq查詢語言表達式,另外需要說明的是,我們需要把配置檔案之中所涉及的EAS.Data.dll、EAS.SOA.BootStrap.dll複制到編譯輸出Publish,這兩個檔案可以從AgileEAS.NET SOA 中間件平台釋出包之中尋找,本案例的下載下傳壓碎包之中會包括這兩個檔案。

     有關于基于Msmq的配置,隻需要修改配置檔案如下:

1: <?xml version="1.0" encoding="utf-8"?>      
2: <configuration>      
3:   <configSections>      
4:     <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />      
5:   </configSections>      
6:   <startup useLegacyV2RuntimeActivationPolicy="true">      
7:     <supportedRuntime version="v4.0"/>      
8:   </startup>      
9:   <eas>      
10:     <objects>      
11:       <!--消息持久化-->      
12:       <object name="SubscribeStorager" assembly="EAS.SOA.BootStrap" type="EAS.Messages.MsmqSubscribeStorager" LifestyleType="Singleton"/>      
13:       <!--日志管理-->      
14:       <object name="Logger" assembly="EAS.MicroKernel" type="EAS.Loggers.TextLogger" LifestyleType="Singleton">      
15:         <property name="RootPath" type="string" value="G:\App.Work\Pub_Sub\Offline\Publish\logs" />      
16:       </object>      
17:     </objects>      
18:   </eas>      
19: </configuration>      

     到此為止,所有代碼均已完成,是不是很簡單,接下來,我們跑起來驗證一下效果。

七、驗證效果

     我們在編譯輸入目錄Publish下先啟動Demo.Server.exe,再各啟動Demo.Subscriber.exe、Demo.Subscriber1.exe、Demo.Subscriber2.exe,再啟動一個Demo.Publisher.exe,在Demo.Publisher.exe控制台按Enter鍵:

分布式消息總線,基于.NET Socket Tcp的釋出-訂閱架構之離線支援,附代碼下載下傳

目前程式三個訂閱者都是線上的,Demo.Publisher釋出了三條消息,三個訂閱者都收到了三條消息,那麼我們關閉Demo.Subscriber2之後再由Demo.Publisher釋出兩條消息:

分布式消息總線,基于.NET Socket Tcp的釋出-訂閱架構之離線支援,附代碼下載下傳

然後我們再啟動Demo.Subscriber2,看是否還能收到其離線之後由Demo.Publisher釋出的兩條消息:

分布式消息總線,基于.NET Socket Tcp的釋出-訂閱架構之離線支援,附代碼下載下傳

OK,到此為止。

八、源代碼下載下傳

     本程式的源代碼已上傳到伺服器,請通過http://112.74.65.50/downloads/eas/Demo.Pub_Sub_Offline.rar進行下載下傳,如果在開發過程之中想要了解更多有關Socket通信架構以及更多AgileEAS.NET SOA中間件平台的技術資源,請通過AgileEAS.NET SOA 網站:http://www.smarteas.net的最新下載下傳欄目進行下載下傳。   

九、問題回報

     麻煩大家在通過視訊進行學習的時候能及時把問題回報給樓主,或者有什麼需要改進的一些建議都請向樓主直接回報,以下是聯系方式:

團隊網站:http://www.agilelab.cn

AgileEAS.NET網站:http://www.agileeas.net

官方部落格:http://eastjade.cnblogs.com

github:https://github.com/agilelab/eas

樓主QQ:47920381

QQ群:113723486(AgileEAS SOA 平台)/上限1000人

199463175(AgileEAS SOA 交流)/上限1000人

120661978(AgileEAS.NET 平台交流)/上限1000人

郵件:[email protected],[email protected],

電話:18629261335。

作者:魏瓊東

出處:http://www.cnblogs.com/eastjade

關于作者:有13年的軟體從業經曆,專注于中小軟體企業軟體開發過程研究,通過在技術與管理幫助中小軟體企業實作技術層面開源節流的目的。熟悉需求分析、企業架構、項目管理。現主要從事基于AgileEAS.NET平台的技術咨詢工作,主要服務于醫療衛生、鐵路、電信、物流、物聯網、制造、零售等行業。如有問題或建議,請多多賜教!

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,如有問題,可以通過[email protected] 聯系我,也可以加入QQ群:113723486、199463175、116773358、116773358、212867943、147168308、59827496、193486983、15118502和大家共同讨論,非常感謝。

繼續閱讀