<a></a>
在程式設計與實際應用中,Socket資料包接收伺服器夠得上一個經典問題了:需要計算機與網絡程式設計知識(主要是Socket),與業務處理邏輯密切(如:包組成規則),同時還要兼顧系統運作的穩定、效率、安全與管理等。具體應用時,在滿足業務處理邏輯要求的基礎上,存在側重點:有些需要考慮并發與效率,有些需要強調穩定與可靠等等。雖然.NET 2.0 Framework上的IOCP(I/O完成端口)異步技術可以有效解決并發等問題,但完全的異步模式也缺乏一些控制上的靈活性,例如:Socket暫停操作等。
今年暑假,筆者修改了原Socket接收伺服器代碼,即EMTASS 1.1。最近,又按架構的可擴充性、可重用性等要求重新構思和設計了EMTASS,即EMTASS 2.0。下面的介紹共分六個部分:
<a href="http://blog.csdn.net/hulihui/article/details/3158613#chap1">總體思路與架構</a>
<a href="http://blog.csdn.net/hulihui/article/details/3158613#chap2">關鍵實作技術</a>
<a href="http://blog.csdn.net/hulihui/article/details/3158613#chap3">架構使用簡介</a>
<a href="http://blog.csdn.net/hulihui/article/details/3158613#chap4">一般測試結果</a>
<a href="http://blog.csdn.net/hulihui/article/details/3158613#chap5">總結與展望</a>
<a href="http://blog.csdn.net/hulihui/article/details/3158613#chap6">版本與源碼</a>
總體構思上,主要考慮多線程、異步Socket和可擴充性三個方面。
在Internet環境下的Socket應用中,用戶端和網絡容易出現異常,此時必須釋放異常退出的Socket資源。考慮到伺服器的高并發能力,一般采取包接收和處理分開的政策:将接收到的包添加到包隊列,然後處理隊列中的資料包。當然,偵聽遠端用戶端的連接配接請求可以用Socket的AcceptAsync()異步方法(IOCP,I/O完成端口由此開始)。考慮到暫停、關閉同步操作,仍然用一個線程。這樣,清理資源、處理資料包、偵停客戶連接配接請求就是組成了EMTASS架構的三個核心線程,它們由.NET線程池統一管理:
用戶端連接配接偵聽線程 StartServerListen():循環偵聽遠端用戶端的Socket連接配接請求。如果存在,通過适當規範性判斷後建立該Socket的用戶端會話TSessionBase對象(實際上是該類的派生類對象),同時調用該會話對象的Socket異步資料接收方法BeginReceive(),接收到的資料包存放在會話對象的包隊列中。當然,新增的TSessionBase對象将添加到會話隊列m_sessionTable(一個Dictionary<>泛型對象)中,該隊清單就是清理和處理線程的周遊對象;
資料包處理線程 CheckDatagramQueue():循環檢測TSessonBase隊列中的會話對象,調用該對象的相關方法完成資料包解析、判斷類型、資料存儲等任務;
會話表檢測線程 CheckSessionTable():循環檢查會話表m_sessionTable中的各個會話對象,分步驟清理已經逾時、無效或異常的會話對象,清理會話對象的緩沖區,釋放其Socket資源。
.NET Framework中的Socket具有完整的異步處理能力:偵聽後異步接收(AcceptAsync())、資料異步接收(BeginReceive())、資料異步發送(BeginSend())等。EMTASS架構采取了異步接收和發送方式,并封裝在TSessionBase類中。在EMTASS的版本1.0、1.1中,這些方法在主類TSocketServerBase中實作,顯然不符合類封裝原則。
可擴充性主要考慮不同的業務處理邏輯和應用場景,即:資料包格式、資料存儲方法、資料庫伺服器等。架構EMTASS的可擴充性展現在類的泛型與抽象設計、方法虛拟和保護等方面:
抽象類:會話基類TSessionBase、資料庫基類TDatabaseBase均是抽象類,分别提供了資料包分析與判斷、資料存儲的虛拟方法;
泛型類:主要基類TSocketServerBase有TSessionBase、TDatabaseBase兩個泛型限制參數,可以根據這兩個抽象類的派生類産生具體的伺服器類型;
方法抽象(abstract):TSessionBase的資料包分析方法AnalyzeDatagram()、TDatabaseBase的資料庫打開方法Open()均是抽象的,必須在派生類中根據業務處理邏輯和資料庫類型重寫;
方法保護(protected):與事件處理有關的方法、與業務處理邏輯相關的方法全部是protected方法,可以根據實際情況重寫。

(圖1 主要類層次關系)
按應用類别分,EMTASS主要有四組類:Socket伺服器類、Session會話類、Database資料庫類和枚舉類型。
Socket伺服器類
伺服器泛型類 TSocketServerBase:該類包括了一個伺服器Socket對象、一個TDatabaseBase派生類對象、一個會話TSessionBase類派生對象的清單(Dictionary<>泛型對象),封裝了TDatabaseBase、TSessionBase的全部事件,提供統一的對外公開接口和事件;
泛型參數:伺服器基類TSocketServerBase有兩個泛型參數:TSessionBase類和TDatabaseBase類,定制它們的派生類後可以确定泛型類的具體版本。
用戶端會話類
會話核心成員類 TSessionCoreInfo:是TSessionBase的基類,包括會話的核心字段:登入時間、最近會話時間、IP位址、用戶端名、對象狀态等,是會話清單清單和事件參數的基類之一。該類的成員字段全部是protected的,需要在派生類中賦予具體值;
抽象會話類 TSessionBase:封裝了用戶端Socket、資料接收緩沖區、資料包緩沖區、資料包隊列等資料結構,包括了與用戶端通信相關的全部方法:資料接收與發送、資料包處理等。
資料庫類
抽象資料庫基類 TDatabaseBase:封裝了資料庫打開與關閉、異常事件處理等方法,其中資料連接配接屬性DbConnection是虛屬性、資料庫打開方法Open()是抽象方法,需要在派生類中重寫;
基類TSqlServerBase:派生自TDatabaseBase,應用System.Data.SqlClient名稱空間中SqlServer相關類型重定義了資料庫連接配接屬性DbConnection、重寫了Open()方法;
基類TOleDbDatabaseBase:派生自TDatabaseBase,應用System.Data.OleDb名稱空間中OleDb資料通路相關類型重定義了基類的連接配接屬性DbConnection、重寫了Open()方法
枚舉類型
會話狀态類型 TSessioinState:取4個值:Valid、Invalid、Shutdown和Closed。為Valid時表示會話是有效的,為Invalid時表示會話将被清理,為Shutdown時表示會話Socket正在解除安裝,為Closed時表示會話已經關閉、資源已經清理;
會話斷開類型 TDisconnectType:取3個值:Normal、Timeout和Exception,分别表示正常連接配接、逾時斷開、異常斷開。其中,逾時表示最近兩次會話接收資料的時間超過約定的時限,防止某些會話長時間占有資源。
(圖2 事件參數類層次關系)
EMTASS架構的事件包括三類:第一,普通事件,如:伺服器啟動與停止;第二,異常事件,接收與發送資料異常、資料庫連接配接或資料存儲異常等;第三,與會話相關事件,如:增加會話對象、接收到一個合法資料包等。異常與會話結合即是會話異常事件。通過泛型委托EventHandler可以定義類事件,其中的事件參數類型如下:
異常事件參數類 TExceptionEventArgs:封裝了異常Exception對象的Message值;
會話事件參數類 TSessionEventArgs:封裝了一個TSessionCoreInfo對象;
會話異常參數類 TSessionExceptionEventArgs:派生自TSessionEventArgs,包括異常消息字段Message。
下面介紹主要類TSocketServerBase和輔助類TSessionBase、TDatabaseBase中的主要實作方法。
該類包括了全部的對外接口和事件,主要實作前面介紹過的三個線程。
.NET 提供了線程池方法 ThreadPool.QueueUserWorkItem() 自動将委托對象添加到系統線程池,見如下的實作代碼:
其中, 用戶端連接配接請求偵聽方法StartServerListen()、資料包隊列檢查方法CheckDatagramQueue()和會話表檢查方法CheckSessionTable()均使用循環處理方式,循環條件是m_serverClosed為false。隻有該類的Close()方法可以中斷這三個線程。在Close()方法中設定m_serverClosed為true終止線程的同時,還需要考慮線程退出的同步問題,此時使用手工事件信号對象ManualResetEvent。參考如下資料包隊列檢查線程方法的代碼:
上述代碼是不安全的,一般要需要try{}finally{}保證事件信号對象Reset()與Set()比對。但EMTASS中的三個線程方法均有自己的異常處理方式,不會抛出異常。下面是關閉伺服器方法Close()的主要代碼。在設定變量了m_serverCloed為true後,使用了三個事件信号等待,同步三個線程的正常終止。
建立會話對象後,三種情況需要終止會話:1)關閉伺服器;2)會話異常;3)會話逾時。第1種情況将強制終止會話,第2、3種情況需要清理線程終止會話并釋放其資源。為防止立即關閉Socket引發的異常,系統分3個步驟完成:1)标記該會話為Invalid,此時停止一切與該會話的處理操作;2)調用Shutdown()方法:Shutdown會話Socket,标記會話狀态為Shutdown;3)調用Close()方法:清除會話緩沖區和資料包隊列,釋放Socket資源,從會話表中删除該對象。具體操作可以參考TSocketServerBase類中的CheckSessionTable()方法。
EMTASS架構的所有事件,包括TSessionBase類和TDatabaseBase類的事件,都通過伺服器類TSocketServerBase對外釋出。在建立會話對象或資料庫對象時,直接傳遞其事件給TSocketServerBase的相同委托事件,見如下代碼舉例:
上述代碼中,将TSessionBase派生類對象session的兩個事件直接綁定(使用+=方法)到目前TSocketServerBase對象上。具體實作代碼可以參考TSocketServerBase的初始化方法Initiate()和添加會話對象方法AddSession()。
該抽象類包括用戶端Socket、資料接收緩沖區和資料包隊列等成員,封裝了所有與Socket通信的方法。該類還包括資料包處理方法:資料包解析保護方法ResolveSessionBuffer()和資料包分析虛拟方法AnalyzeDatagram()。
TSessionBase包括兩個資料接收緩沖區和一個資料包隊列:
m_receiveBuffer緩沖區:接收用戶端Socket資料的緩沖區,如果資料包比該緩沖區長,Socket将自動(異步)讀取幾次,每次用方法CopyToDatagramBuffer暫到資料包緩沖區m_datagramBuffer中;
m_datagramBuffer緩沖區:如果m_receiveBuffer接收了非完整的資料包,則使用該緩沖區暫存,直到獲得一個完整資料包。一般情況下,設定m_receiveBuffer大于資料包長度,則一次可以接收一個完整包,此時該緩沖區為空。此外,該緩沖區大小根據資料包的長度動态增長;
m_datagramQueue包隊列:位元組數組的隊列(Queue<>泛型),儲存了目前會話的資料包(位元組數組),等待處理線程分析與處理。在EMTASS 1.0與1.1中,該隊列結構封裝在TSocketServerBase内。這種設計增加了TSessionBase類與TSocketServerBase類的偶合程度。
該方法是protected的,可以根據資料包結構與具體業務邏輯重寫代碼。在TSessionBase類中實作的包組成規則是:開始字元是<結束字元是>。特别指出,Socket通信中有兩個必須考慮的著名問題:
資料包界限問題:資料包字元串(位元組數組)如何界限?在交通部的Socket通信協定中,用<>分别作為一個包的開始與結束字元;
資料包廂斷與重疊問題:由于網絡或裝置故障,一個資料包可能分兩次接收。另一種情況就是連續接收到多個資料包。第一種情況,需要先緩存接收到的包直到一個完整的資料包。第二種情況,則需要根據包界限符号分解一個個的包。
該抽象方法是TSessionBase類必須重寫的方法,也是EMTASS架構擴充的主要接口,應該完成如下基本任務:
判斷資料包的有效性與包類型;
分解包中的各字段資料;
校驗包及其資料有效性;
發送确認消息給用戶端(調用方法 SendDatagram());
存儲包資料到資料庫中;
如果資料包中存在用戶端名稱或編号,則填寫m_name字段。
該抽象類定義了3個資料庫異常處理事件:DatabaseOpenException、DatabaseCloseExeption和DatabaseException,以及4個public方法:Open()、Close()、Clear()和Store()。其中,Open()是抽象方法,在派生類中可以增加自己的代碼(見demo的實作部分),Close()方法關閉資料庫連接配接,Clear()方法在Close()中被調用——關閉資料庫前清理相關資源,虛方法Store()用于資料存儲。EMTASS架構給出了該基類的兩個派生類:TSqlServerBase和TOleDatabaseBase,可以滿足一般的資料庫應用需求。
EMTASS架構的使用包括如下步驟:
定制滿足需求的TSqlServerBase或TOleDatabaseBase派生類
增加成員字段,如:DbCommand、DbDataAdapter等;
重寫TDatabaseBase類的虛拟方法Store(),編寫儲存資料包到資料庫中的實作代碼;
在TSessionBase的AnalyzeDatagram()方法中直接或間接調用Store()方法。
定制滿足業務處理邏輯的TSessionBase派生類
重寫ResolveSessionBuffer()方法,按照資料包規則提取緩沖區中的資料封包,并存儲到包隊列中;
重寫AnalyzeDatagram()方法,按前面的要求增加功能。
定制滿足需求的TSocketServerBase派生類
定義TSocketSErverBase泛型類的派生類(該步可省略);
建立泛型類的派生類對象,在構造函數中給出資料庫連接配接字元串和TCP通信端口;
設定泛型類的派生類對象的最大資料包長度等參數;
實作泛型類的派生類對象的相關事件處理方法。
泛型類TSocketServerBase提供了EMTASS架構的所有對外接口(屬性、方法和事件),包括内聯TSessionBase對象和TDatabaseBase對象的對外屬性和事件。
有兩個重載版本,預設端口3130。考慮到可擴充性,必須給出資料庫連接配接串,見如下代碼:
構造函數中的方法Initiate()完成具體的初始化任務。
<code>ServerPort</code>:伺服器端口号,預設值為3130
<code>Closed</code>:伺服器已經關閉
<code>ListenPaused</code>:伺服器暫時停止用戶端連接配接請求
<code>LoopWaitTime</code>:Socket.Listen方法中的等待時間(ms),預設值為25ms
<code>MaxDatagramSize</code>:允許資料包的最大長度,預設值為1024K
<code>MaxListenQueueLength</code>:最大偵聽隊列長度,預設值為16
<code>MaxReceiveBufferSize</code>:允許資料包接收緩沖區的最大長度,預設值為16K
<code>MaxSameIPCount</code>:允許同位址IP的會話Socket個數,預設值為64
<code>MaxSessionTableLength</code>:允許最大會話表長度,預設值為1024
<code>MaxSessionTimeout</code>:允許最大的會話逾時間隔(s),預設值為120s
<code>ErrorDatagramCount</code>:錯誤資料包個數
<code>ReceivedDatagramCount</code>:接收資料包個數
<code>ServerExceptionCount</code>:伺服器異常次數
<code>SessionCount</code>:目前會話個數
<code>SessionExeptionCount</code>:會話異常個數
<code>SessionCoreInfoList</code>:目前會話表資訊清單
<code>Start()</code>:啟動伺服器
<code>Stop()</code>:關閉伺服器
<code>PauseListen()</code>:暫停偵聽連接配接請求
<code>ResumeListen()</code>:恢複偵聽連接配接請求
<code>Dispose()</code>:關閉伺服器并釋放系統資源
<code>CloseSession()</code>:關閉一個會話
<code>CloseAllSessions()</code>:關閉全部會話
<code>SendToSession()</code>:給一個會話發送消息
<code>SendToAllSessions()</code>:給所有會話發送消息
<code>DatabaseCloseException</code>:資料庫關閉異常
<code>DatabaseException</code>:資料庫異常
<code>DatabaseOpenException</code>:資料庫打開異常
<code>DatagramAccepted</code>:接受了一個完整資料包
<code>DatagramDelimiterError</code>:資料包界限符錯誤
<code>DatagramError</code>:資料包錯誤
<code>DatagramHandled</code>:處理了一個資料包
<code>DatagramOversizeError</code>:資料包超長錯誤
<code>ServerStarted</code>:伺服器啟動後
<code>ServerClosed</code>:伺服器關閉後
<code>ServerListenPaused</code>:伺服器暫停連接配接請求後
<code>ServerListenResumed</code>:伺服器恢複連接配接請求後
<code>ServerException</code>:伺服器異常
<code>SessionRejected</code>:連接配接請求被拒絕
<code>SessionConnected</code>:建立一個會話連接配接
<code>SessionDisConnected</code>:斷開一個會話連接配接
<code>SessionReceiveException</code>:會話接收資料異常
<code>SessionSendException</code>:會話發送資料異常
<code>SessionTimeout</code>:會話逾時
下載下傳包中包括EMTASS框源代碼和Demo。其中,VS2005的Demo解決方案檔案為EMTASS.sln,包含兩個項目:伺服器項目和用戶端項目。/bin/檔案夾下的編譯檔案可直接運作:先啟動伺服器,然後運作用戶端。
伺服器端包括兩個部分:第一,接收伺服器窗體程式;第二,Access資料庫。伺服器端窗體程式包含如下實作:
TSessionBase派生類TTestSession:重寫了OnDatagramDelimiterError()和OnDatagramOversizeError()事件處理方法,重寫了資料包分析方法AnalyzeDatagram(),增加了一個自定義方法Store();
TDatabaseBase派生類TAccessDatabase:增加了一個OleDbCommmand字段m_command,重寫了Open()方法和Store()方法。在重寫的Open()方法中,建立了m_command對象及其參數對象,給出了資料庫的Insert語句SQL代碼。重寫的Store()方法中,将TSessionBase的IP、SessionName和資料包長度儲存到Access資料庫中。方法Store()将被TTestSession()的AnalyzeDatagram()方法調用;
TSocketServerBase<>對象:給出資料庫連接配接字元串後,應用前面定義的兩個派生類建立泛型類對象,然後注冊該對象的事件實作方法。注冊的事件方法功能包括:顯示伺服器的如果計數情況,顯示伺服器運作狀态。
伺服器端的主要代碼如下:
下面是伺服器端Demo運作圖檔
建立一個TcpClient對象,模拟遠端用戶端與伺服器通信。下面是用戶端Demo運作圖檔:
機器配置:雙核CPU E2140,主頻1.6G,RAM是1G(含顯示卡記憶體)。
正确性測試
測試方法:用戶端的資料包含有長度串,在伺服器端檢測比較,以此判斷資料包傳輸的正确性;
測試情況:單機啟動伺服器,運作7個用戶端程式,其中3個是幹擾源——連續做連接配接/斷開操作,另4個連續發送資料包。運作約80分鐘檢查,伺服器平均每秒接收15個包,沒有發生異常,也沒有錯誤包,資料包隊列穩定在3以下。
速度測試
測試方法:單機運作伺服器,然後運作20個用戶端程式,用10-50ms的速度發送資料包;
測試情況:運作30分鐘檢查,伺服器平均每秒接收60個包,沒有出現資料包隊列顯著增長等情況。
穩定性測試:用戶端Demo中包含一個連接配接後馬上斷開的連續操作,在這種情況下伺服器端沒有發現異常。此外,筆者在30個用戶端會話情況下運作伺服器1個小時,沒有發生伺服器端異常,但有個别用戶端異常退出。
并發性測試:同時運作了30個用戶端Demo,沒有發生伺服器異常等現象,資料包隊列上限也不超過5。
測試結論: 在一台機器上做測試, EMTASS 2.0接收與處理正确,每秒可以處理60以上的資料包,運作穩定可靠,有較好的并發處理能力。測試中發現的主要問題如下:
CPU占用率很大:顯然是三個線程的循環操作所引起的,在EMTASS1.0、1.1中,使用Thread.Sleep(m_waitTime)等待一段時間。本架構的設計目的是專用Socket伺服器,為提高吞吐能力省略了這個操作。如果必要,在以後版本中添加該功能;
伺服器啟動後拒絕連接配接請求:特别是關閉後再啟動容易發生這種現象,多關閉啟動幾次後卻又恢複正常。筆者估計是伺服器m_serverSocket對象釋放資源不同步的原因;
用戶端連接配接請求被拒絕:有時用戶端發生錯誤後,再連接配接時被拒絕。有兩種可能,第一種是如前所講的伺服器對象的問題;第二種是用戶端Socket接收、發送和斷開時被阻塞,具體原因待分析。出現這種情況後,做多次連接配接/斷開操作還是可以連接配接伺服器。
顯然,這種測試環境和結果有待進一步驗證,但存在較大的改進空間。特别,資料包隊列最大值一般不超過5,表明伺服器接收到包後立即處理,并發性能比較好。
本文介紹的EMTASS 2.0 是筆者一段工作的總結,也是學習基于.NET的類設計、元件設計、模式設計等的一個小結。筆者的目标就是不斷修改和完善,設計與實作一個可靠與穩定的、有良好可擴充性的和易于使用的Socket資料包接收伺服器架構。由于最初的代碼和思路均來自他人的開源架構和設計構思,EMTASS也仿效一般開源做法:公布源碼和設計思路。
EMTASS 1.0, 2005年12月
EMTASS 1.1, 2008年09月
EMTASS 2.0, 2008年10月27日
重設計了類名及類關系,增加了架構的可擴充性,重構了大部分代碼
TSessionBase的資料包隊列取代TSocketServerBase的資料包隊列
TSessionBase中封裝了全部通信方法
EMTASS 2.1, 2008年11月9日
TSocketServerBase增加了如下屬性:
ReceiveBufferSize:接收緩沖區大小(預設16K)
SendBufferSize:發送緩沖區大小(預設16K)
CheckDatagramQueueTimeInterval:資料包處理線程Sleep時間間隔(預設:100ms)
CheckSessionTableTimeInterval:會話資源清理線程Sleep時間間隔(預設:100ms)
TSocketServerBase增加了若幹構造函數,包括最大任務數和緩沖區大小
TSocketServerBase使用了Mutex互斥類,防止同機器建立兩個伺服器
TSessionBase在異步接收/發送完成後,調用IAsyncResult.AsyncWaitHandle.Close()方法
TSessionBase接收資料時,使用BufferManger公共緩沖區ReceivevBuffer
TSessionBase發送資料時,如果據長度小于發送緩沖區,則使用SendBuffer,否則申請位元組數組發送
測試結果:10個幹擾用戶端(100ms連續連開)與15個資料用戶端(3個100K/100ms、3個100K/50ms、4個100K/20ms、2個1M/1s、2個1M/500ms、1個1M/10s),CPU占用率為70-90%,速度為40/s,無錯誤包,直接顯示的連接配接數為30-50之間
QQ:519841366
本頁版權歸作者和部落格園所有,歡迎轉載,但未經作者同意必須保留此段聲明,
且在文章頁面明顯位置給出原文連結,否則保留追究法律責任的權利