天天看點

分享一個分布式消息總線,基于.NET Socket Tcp的釋出-訂閱架構,附代碼下載下傳

一、分布式消息總線

     在很多MIS項目之中都有這樣的需求,需要一個及時、高效的的通知機制,即比如當使用者A完成了任務X,就需要立即告知使用者B任務X已經完成,在通常的情況下,開發人中都是在使用者B所使用的程式之中寫資料庫輪循代碼,這樣就會産品一個很嚴重的兩個問題,第一個問題是延遲,輪循機制要定時執行,必須會引起延遲,第二個問題是資料庫壓力過大,當進行高頻度的輪循會生産大量的資料庫查詢,并且如果有大量的使用者進行輪循,那資料庫的壓力就更大了。

     那麼在這個時間,就需要一套能支援釋出-訂閱模式的分布式消息總線,那這個問題就可以很好的解決了,比如采用一些成熟的消息總線進行實作,比如MSMQ或者采用比如開源的NServiceBus的釋出訂閱機制就可以實作處理這種需求,其系統結構就會變成如下所示:

分享一個分布式消息總線,基于.NET Socket Tcp的釋出-訂閱架構,附代碼下載下傳

    本分布式消息總線,目前廣泛的被應用于分布式緩存的更新通知,當在N百台客戶短在使用緩存的過程之中,某個操作修改了緩存的資料,必須會導緻其他終端緩存的失效,那麼使用基于Socket的分布式消息總線之後,我們可以做了修改了即可實時通知,做到緩存資料保持最新,再比如醫療應用之中的危急值管理,當發現檢驗、檢查危急值之後,需要及時通知病區啟動聲光報警系統等,提醒醫護從業人員及相關上司做出相應的措施,再比如應用于異構系統整合,當檢驗系統做出檢驗報告,通過消息總線進行釋出,HIS系統則即時會收到檢驗報告資料而實作系統的整合。

二、基于Socket的實作

     目前能夠實作釋出訂閱模式的開源産品非常之多,為什麼還要制造輪子呢,其主要原因有以下幾點

     1)像NServiceBus這種東西基于MSMQ,在大量的釋出者-訂閱者的情況下性能不佳。

     2)此類東西太過于龐大、不易使用和配置。

     3)學習成本過高。

     那為什麼要使用Socket技術進行實作呢,其主要原因是有以下幾點:

     1)使用高效的Socket通信技術,高效、支援更多的用戶端。

     2)使用簡單,不需要定義太多額外的東西,隻需要定義主題和消息即可使用。

     目前本釋出訂閱架構是基于AgileEAS.NET SOA中間件平台Socket架構實作的,有關于些Socket架構的技術細節請參考AgileEAS.NET SOA 中間件平台.Net Socket通信架構-介紹、AgileEAS.NET SOA 中間件平台.Net Socket通信架構-簡單例子-實作簡單的服務端用戶端消息應答、AgileEAS.NET SOA 中間件平台.Net Socket通信架構-完整應用例子-線上聊天室系統-下載下傳配置、AgileEAS.NET SOA 中間件平台.Net Socket通信架構-完整應用例子-線上聊天室系統-代碼解析文章進行了解和學習。

     目前本釋出訂閱架構并直接內建于AgileEAS.NET SOA Socket通信架構之中并且随其一并釋出,下面簡單介紹一下其API:

在本架構之中定義了一個消息總線接口IMessageBus:

1: using System;      
2: using System.Collections.Generic;      
3: using System.Linq;      
4: using System.Text;      
5: using System.Collections;      
6:        
7: namespace EAS.Messages      
8: {      
9:     /// <summary>      
10:     /// 消息總線接口定義。      
11:     /// </summary>      
12:     public interface IMessageBus : IDisposable      
13:     {      
14:         /// <summary>      
15:         /// 注冊釋出者。      
16:         /// </summary>      
17:         /// <param name="publisher">釋出者。</param>      
18:         void AddPublisher(string publisher);      
19:        
20:         /// <summary>      
21:         /// 注冊釋出者。      
22:         /// </summary>      
23:         /// <param name="publisher">釋出者。</param>      
24:         /// <param name="topic">主題。</param>      
25:         void AddPublisher(string publisher, string topic);      
26:        
27:         /// <summary>      
28:         /// 釋出一條消息到總線。      
29:         /// </summary>      
30:         /// <param name="topic">主題。</param>      
31:         /// <param name="message">釋出的消息。</param>      
32:         void Publish(string topic, object message);      
33:        
34:         /// <summary>      
35:         /// 訂閱消息。      
36:         /// </summary>      
37:         /// <param name="subscriber">訂閱者。</param>      
38:         /// <param name="topic">主題。</param>      
39:         /// <param name="notifyHandler">訂閱通知。</param>      
40:         void Subscribe(object subscriber, string topic, MessageNotifyHandler notifyHandler);      
41:        
42:         /// <summary>      
43:         /// 訂閱消息。      
44:         /// </summary>      
45:         /// <param name="subscriber">訂閱者。</param>      
46:         /// <param name="friendName">訂閱者名稱,用于處理離線訂閱。</param>      
47:         /// <param name="topic">主題。</param>      
48:         /// <param name="notifyHandler">訂閱通知。</param>      
49:         void Subscribe(object subscriber,string friendName ,string topic, MessageNotifyHandler notifyHandler);      
50:        
51:         /// <summary>      
52:         /// 訂閱消息。      
53:         /// </summary>      
54:         /// <param name="subscriber">訂閱者。</param>      
55:         /// <param name="friendName">訂閱者名稱,用于處理離線訂閱。</param>      
56:         /// <param name="topic">主題。</param>      
57:         /// <param name="notifyHandler">訂閱通知。</param>      
58:         /// <param name="changedHandler">釋出者狀态變化委托。</param>      
59:         void Subscribe(object subscriber, string friendName, string topic, MessageNotifyHandler notifyHandler,PublisherSstatusChangedHandler changedHandler);      
60:        
61:         /// <summary>      
62:         /// 退訂消息。      
63:         /// </summary>      
64:         /// <param name="subscriber">訂閱者。</param>      
65:         void Unsubscribe(object subscriber);      
66:        
67:         /// <summary>      
68:         /// 退訂消息。      
69:         /// </summary>      
70:         /// <param name="subscriber">訂閱者。</param>      
71:         /// <param name="topic">主題。</param>      
72:         void Unsubscribe(object subscriber, string topic);      
73:        
74:         /// <summary>      
75:         /// 退訂消息。      
76:         /// </summary>      
77:         /// <param name="subscriber">訂閱者。</param>      
78:         /// <param name="friendName">訂閱者名稱,用于處理離線訂閱。</param>      
79:         /// <param name="topic">主題。</param>      
80:         void Unsubscribe(object subscriber, string friendName, string topic);      
81:     }      
82: }      

    IMessageBus就基于Socket釋出訂閱消息總線的靈魂接口,也是基唯一的釋出者調用者功能入口,也就是說不管你是釋出者還是訂閱者都需要調用這個接口,如果你是釋出者請調用IMessageBus接口的Publish方法向消息總線釋出消息,如果是你訂閱者請通過IMessageBus的訂閱方法進行訂閱,當你訂閱了某個主題之後,有釋出者釋出該主題的消息,你即可以收到消息并調用訂閱回調函數進行處理。

三、實作一個簡單的例子

     現在我們開始一個簡單的應用消息總線的例子,本例子代碼解決方案由下圖4個項目組成:

分享一個分布式消息總線,基于.NET Socket Tcp的釋出-訂閱架構,附代碼下載下傳

     其中:Demo.Messages項目定義釋出者、訂閱者所使用的消息對象和消息主題。

                Demo.Publisher項目為釋出者代碼。

                Demo.Subscriber項目為訂閱者代碼。

                Demo.Server項目為服務端代碼。

     在Demo.Messages項目之中,我們定義了一個消息Message:

1: using System;      
2: using System.Collections.Generic;      
3: using System.Linq;      
4: using System.Text;      
5: using System.Xml.Serialization;      
6:        
7: namespace Demo.Messages      
8: {      
9:     [Serializable]      
10:     public class Message      
11:     {      
12:         [XmlElement]      
13:         public Guid ID      
14:         {      
15:             get;      
16:             set;      
17:         }      
18:     }      
19: }      

     消息Message很簡單,隻有一個屬性ID,同時 還需要定義一個消息主題:

1: using System;      
2: using System.Collections.Generic;      
3: using System.Linq;      
4: using System.Text;      
5:        
6: namespace Demo.Messages      
7: {      
8:     public class Topics      
9:     {      
10:         public static readonly string DEMO_TOPIC = "示範消息";      
11:     }      
12: }      

     我們定義了一個消息主題為“示範消息”。

     在Demo.Publisher項目之中,沒有太多額外的代碼,隻有在Program.cs寫了以下簡單的調用代碼:

1: using System;      
2: using System.Collections.Generic;      
3: using System.Linq;      
4: using System.Text;      
5: using EAS.Messages;      
6:        
7: namespace Demo.Publisher      
8: {      
9:     class Program      
10:     {      
11:         static void Main(string[] args)      
12:         {      
13:             var container = EAS.Objects.ContainerBuilder.BuilderDefault();      
14:             var bus = container.GetComponentInstance("MessageBus") as IMessageBus;      
15:             System.Console.WriteLine("Publisher");      
16:        
17:             while (System.Console.ReadLine()!="exit")      
18:             {      
19:                 var m = new Messages.Message { ID = Guid.NewGuid() };      
20:                 bus.Publish(Demo.Messages.Topics.DEMO_TOPIC, m);      
21:                 System.Console.WriteLine(string.Format("Publish:{0}", m.ID));      
22:             }      
23:         }      
24:     }      
25: }      

     從IOC容器擷取一個消息總線IMessageBus對象,并調用Publish函數釋出消息”bus.Publish(Demo.Messages.Topics.DEMO_TOPIC, m);“。

     當然了,使用了IOC容器,就離來開配置檔案了,其App.config檔案内容如下:

1: <?xml version="1.0" encoding="utf-8"?>      
2: <configuration>      
3:   <configSections>      
4:     <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />      
5:   </configSections>      
6:   <eas>      
7:     <objects>      
8:       <!--消息總線-->      
9:       <object name="MessageBus" assembly="EAS.MicroKernel" type="EAS.Sockets.Bus.SocketBus" LifestyleType="Singleton">      
10:         <property name="Url" type="string" value="socket.tcp://127.0.0.1:6606/"/>      
11:       </object>      
12:     </objects>      
13:   </eas>      
14: </configuration>      

     在Demo.Subscriber項目之中,使用與Demo.Publisher一模一樣的配置檔案,其Program.cs代碼如下:

1: using System;      
2: using System.Collections.Generic;      
3: using System.Linq;      
4: using System.Text;      
5: using EAS.Messages;      
6:        
7: namespace Demo.Subscriber      
8: {      
9:     class Program      
10:     {      
11:         static void Main(string[] args)      
12:         {      
13:             var container = EAS.Objects.ContainerBuilder.BuilderDefault();      
14:             var bus = container.GetComponentInstance("MessageBus") as IMessageBus;      
15:             System.Console.WriteLine("Subscriber");      
16:        
17:             bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);      
18:             System.Console.ReadLine();      
19:         }      
20:        
21:         static void MessageNotify(object m)      
22:         {      
23:             Demo.Messages.Message message = m as Demo.Messages.Message;      
24:             System.Console.WriteLine(string.Format("Subscribe:{0}", message.ID));      
25:         }      
26:     }      
27: }      

     其中代碼bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);:完成把消息訂閱到MessageNotify通知函數之中。

     在Demo.Server項目之中,啟動伺服器并且開始接收訂閱和釋出:

1: using System;      
2: using System.Collections.Generic;      
3: using System.Linq;      
4: using System.Text;      
5: using EAS.Sockets;      
6:        
7: namespace Demo.Server      
8: {      
9:     class Program      
10:     {      
11:         static void Main(string[] args)      
12:         {      
13:             SocketServer socketServer = new SocketServer(128);      
14:             socketServer.Port = 6606;      
15:             socketServer.Logger = new EAS.Loggers.ConsoleLogger();       
16:             socketServer.Initialize();      
17:             System.Console.WriteLine("Server Starting...");      
18:             socketServer.StartServer();      
19:             System.Console.WriteLine("Server Startup!");      
20:             System.Console.ReadLine();      
21:         }      
22:     }      
23: }      

     到此為止,所有代碼均已完成,是不是很簡單,接下來,我們跑起來驗證一下效果。

四、驗證效果

     我們在編譯輸入目錄Publish下先啟動Demo.Server.exe,再啟動兩個Demo.Subscriber.exe,再啟動一個Demo.Publisher.exe,在Demo.Publisher.exe控制台按Enter鍵:

分享一個分布式消息總線,基于.NET Socket Tcp的釋出-訂閱架構,附代碼下載下傳

OK,搞定。

五、源代碼下載下傳

     本程式的源代碼已上傳到伺服器,請通過http://112.74.65.50/downloads/eas/Demo.Pub_Sub.rar進行下載下傳,如果在開發過程之中想要了解更多有關Socket通信架構以及更多AgileEAS.NET SOA中間件平台的技術資源,請通過AgileEAS.NET SOA 網站:http://www.smarteas.net的最新下載下傳欄目進行下載下傳。    

六、問題回報

     麻煩大家在通過視訊進行學習的時候能及時把問題回報給樓主,或者有什麼需要改進的一些建議都請向樓主直接回報,以下是聯系方式:

團隊網站:http://www.agilelab.cn

AgileEAS.NET網站:http://www.agileeas.net

官方部落格:http://eastjade.cnblogs.com

github:https://github.com/agilelab/eas

QQ群:113723486(AgileEAS SOA 平台)/上限1000人

199463175(AgileEAS SOA 交流)/上限1000人

120661978(AgileEAS.NET 平台交流)/上限1000人

郵件:[email protected],[email protected],

電話:18629261335。

作者:魏瓊東

出處:http://www.cnblogs.com/eastjade

關于作者:有13年的軟體從業經曆,專注于中小軟體企業軟體開發過程研究,通過在技術與管理幫助中小軟體企業實作技術層面開源節流的目的。熟悉需求分析、企業架構、項目管理。現主要從事基于AgileEAS.NET平台的技術咨詢工作,主要服務于醫療衛生、鐵路、電信、物流、物聯網、制造、零售等行業。如有問題或建議,請多多賜教!

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,如有問題,可以通過[email protected] 聯系我,也可以加入QQ群:113723486、199463175、116773358、116773358、212867943、147168308、59827496、193486983、15118502和大家共同讨論,非常感謝。

繼續閱讀