天天看點

ESFramework介紹之(23)―― AgileTcp

    Tcp元件主要控制着系統與終端使用者的所有消息的進出,ITcp接口描述了這個元件的外貌,告訴外部如何使用Tcp元件、如何與Tcp元件互動。而從實作的角度來看,我們必須理清Tcp元件的職責:

(1) 管理所有已經建立的Tcp連接配接

(2) 管理與每個連接配接相對應接收緩沖區

(3) 管理所有的工作者線程

(4) 處理長度大于接收緩沖區的消息

    我們來看看如何滿足這些職責。

    由于每個連接配接都對應着一個接收緩沖區,是以可以将它們封裝在一起形成ContextKey(連接配接上下文):

ESFramework介紹之(23)―― AgileTcp
ESFramework介紹之(23)―― AgileTcp

ContextKey

    public class ContextKey

    {        

        private byte[]  buffer ;          //封裝接收緩沖區

        private ISafeNetworkStream netStream = null ;            

        private volatile bool      isDataManaging = false ;

        public ContextKey(ISafeNetworkStream net_Stream ,int buffSize)

        {

            this.netStream = net_Stream ;            

            this.buffer    = new byte[buffSize] ;            

        }

        #region NetStream  

        public ISafeNetworkStream NetStream

            get

            {

                return this.netStream ;

            }

        }            

        public byte[] Buffer

                return this.buffer ;

            }            

        }        

        public bool IsDataManaging

                return this.isDataManaging ;

            set

                this.isDataManaging = value ;

        private bool firstMessageExist = false ;

        public  bool FirstMessageExist 

                return this.firstMessageExist ;

                this.firstMessageExist = value ;

        #endregion            

    }    

    IsDataManaging屬性表明工作線程是否正在處理本連接配接對應的緩沖區中的資料,FirstMessageExist屬性用于标志接收到的第一條消息,因為系統可能需要對接收到的第一條消息做一些特殊的處理。

    任何時刻,可能都有成千上萬個連接配接存活着;任何時刻,都可能有新的連接配接建立、或已有的連接配接被釋放。所有這些ContextKey對象需要被管理起來,這就是上下文管理器IContextKeyManager:

    public interface IContextKeyManager

    {

        void InsertContextKey(ContextKey context_key) ;

        void DisposeAllContextKey() ;        

        void RemoveContextKey(int streamHashCode) ;

        ISafeNetworkStream GetNetStream(int streamHashCode) ;

        int            ConnectionCount {get ;}

        ICollection ContextKeyList{get ;}

        event CbSimpleInt StreamCountChanged ;        

    說到上下文管理器,先要講講如何标志每個不同的上下文了?使用連接配接字,連接配接字是Tcp連接配接的Hashcode,它們與連接配接一一對應,并且不會重複。是以在源碼中你經常會看到名為“streamHashCode”的參數和變量。由于Tcp連接配接與streamHashCode一一對應,是以GetNetStream方法的實作就非常簡單。不知道你是否記得,RoundedMessage類中有個ConnectID字段,它就是連接配接字,與streamHashCode意義一樣。根據此字段,你可以清楚的知道這個RoundedMessage來自于哪個連接配接。

    關于工作者線程,很幸運的是,我們可以直接使用.NET提供的背景線程池,而不必要再去手動管理,這可以省卻一些麻煩。當然你也可以使用ThreadPool類,甚至你可以從頭開始實作自己的線程池元件,這也是不困難的。

    我經常被問到,接收緩沖區應該開辟多大?這取決于你的應用,但是有一點是錯不了的――緩沖區的大小至少要大于消息頭Header的大小,否則麻煩就多了。根據我的經驗,一般緩沖區的大小至少應該能容納所有接收消息中的60%-80%。對于大于緩沖區大小的消息,ESFramework采用的政策是使用緩沖區池IBufferPool。

    public interface IBufferPool

        byte[] RentBuffer(int minSize) ;

        void   GivebackBuffer(byte[] buffer) ;

    }

    通過上面的介紹我們已經知道如何滿足Tcp元件的職責,現在我們來看看更細的實作政策:

(1) 使用Checker線程。

    使用Checker線程是AgileTcp元件的差別于模拟完成端口的Tcp元件實作和異步Tcp元件的主要特色。當AgileTcp啟動時,Checker線程也随之啟動,這個線程的主要工作就是檢查已經存在的每個連接配接上是否有資料要接收(還記得Select網絡模型),這可以通過NetworkStream.DataAvailable屬性知道。如果發現某個連接配接上有待接收的資料,就将其放到工作者線程中去處理,并設定前面提到的ContextKey.IsDataManaging屬性,然後再判斷下個連接配接,如此循環下去。

        private void TaskChecker()

            while(! this.stop)

                foreach(ContextKey key in this.contextKeyManager.ContextKeyList)

                {

                    if(this.stop)

                    {

                        break ;

                    }

                    if((! key.IsDataManaging) && key.NetStream.DataAvailable)

                    {                        

                        key.IsDataManaging = true ;    

                        CbContextKey cb = new CbContextKey(this.DataManaging) ;

                        cb.BeginInvoke(key ,null ,null ) ;

                    }                    

                }

                System.Threading.Thread.Sleep(50) ;

(2) 将消息頭的解析置于Tcp元件之中

    将消息頭解析置于Tcp元件之中這個方案我層考慮了非常久,原因是,這會破壞Tcp元件的單純性,使得Tcp元件與協定(Contract)有所關聯。最終采用這個政策的第一個理由是清晰,第二個理由是效率。清晰在于簡化了ContextKey結構,避免了使用消息分裂器這樣複雜的算法元件(如果大家看過我以前關于通信方案的文章,一定會得到這樣的答案)。效率在于,當在此解析了Header之後,後面所有的處理器都可以使用這個Header對象了,而不用在自己去解析。這也是NetMessage類中有個Header字段的原因。

(3) 針對于某個連接配接,隻有當上一個消息處理完并将回複發送後(如果有回複的話),才會去接收下一個消息。

    這個政策會使很多事情變得簡單,而且不會影響任何有用的特性。由于不會在處理消息的時候去接收下一個消息,是以可以直接處理接收緩沖區中的資料,而不需要将資料從接收緩沖區拷貝到另外的地方去處理。這又對效率提高有所幫助。

    綜上所述,我們可以總結工作者線程要做的事情:首先,從連接配接上接收MessageHeaderSize個位元組,解析成Header,然後在接收Header. MessageBodyLength個位元組,即是Body,接着構造成RoundedMessage對象交給消息配置設定器去處理,最後将得到的處理結果發送出去。代碼如下所示:

ESFramework介紹之(23)―― AgileTcp
ESFramework介紹之(23)―― AgileTcp

DataManaging

        private void DataManaging(ContextKey key)

        {    

            int streamHashCode = key.NetStream.GetHashCode() ;    

            int headerLen = this.contractHelper.MessageHeaderLength ;

            while((key.NetStream.DataAvailable) && (! this.stop))

                byte[] rentBuff = null ;//每次分派的消息中,最多有一個rentBuff

                try

                    #region 構造 RoundedMessage

                    NetHelper.RecieveData(key.NetStream ,key.Buffer ,0 ,headerLen) ;

                    IMessageHeader header = this.contractHelper.ParseMessageHeader(key.Buffer ,0) ;    

                    if(! this.contractHelper.ValidateMessageToken(header))

                        this.DisposeOneConnection(streamHashCode ,DisconnectedCause.MessageTokenInvalid) ;

                        return ;

                    RoundedMessage requestMsg = new RoundedMessage() ;

                    requestMsg.ConnectID      = streamHashCode ;

                    requestMsg.Header         = header ;

                    if(! key.FirstMessageExist)

                        requestMsg.IsFirstMessage = true ;

                        key.FirstMessageExist     = true ;

                    if((headerLen + header.MessageBodyLength) > this.maxMessageSize)

                        this.DisposeOneConnection(streamHashCode ,DisconnectedCause.MessageSizeOverflow) ;

                    if(header.MessageBodyLength >0 )

                        if((header.MessageBodyLength + headerLen) <= this.recieveBuffSize)

                        {

                            NetHelper.RecieveData(key.NetStream ,key.Buffer ,0 ,header.MessageBodyLength) ;

                            requestMsg.Body = key.Buffer ;                            

                        }

                        else

                        {                        

                            rentBuff = this.bufferPool.RentBuffer(header.MessageBodyLength) ;                        

                            NetHelper.RecieveData(key.NetStream ,rentBuff ,0 ,header.MessageBodyLength) ;

                            requestMsg.Body = rentBuff ;                            

                    #endregion                    

                    bool closeConnection = false ;

                    NetMessage resMsg = this.tcpStreamDispatcher.DealRequestData(requestMsg ,ref closeConnection) ;

                    if(rentBuff != null)

                        this.bufferPool.GivebackBuffer(rentBuff) ;

                    if(closeConnection)

                        this.DisposeOneConnection(streamHashCode ,DisconnectedCause.OtherCause) ;

                    if((resMsg != null) &&(! this.stop))

                    {                    

                        byte[] bRes = resMsg.ToStream() ;

                        key.NetStream.Write(bRes ,0 ,bRes.Length) ;

                        if(this.ServiceCommitted != null)

                        {                                

                            this.ServiceCommitted(streamHashCode ,resMsg) ;

                catch(Exception ee)

                    if(ee is System.IO.IOException) //正在讀寫流的時候,連接配接斷開

                        this.DisposeOneConnection(streamHashCode ,DisconnectedCause.NetworkError) ;

                    else

                        this.esbLogger.Log(ee.Message ,"ESFramework.Network.Tcp.AgileTcp" ,ErrorLevel.Standard) ;

                    ee = ee ;                    

            key.IsDataManaging = false ;

    AgileTcp元件的主要原理差不多就這些了,這種實作有個缺點,不知大家發現沒有。那就是當用戶端主動斷開連接配接或掉線時,AgileTcp元件可能感受不到(除非對應的連接配接上正在發送或接收資料,此時會抛出異常),因為當連接配接斷開時,key.NetStream.DataAvailable不會抛出異常,而是仍然傳回false。這是個問題,幸好有補救的辦法,一是要求用戶端下線的時候給伺服器發送Logoff消息,二是使用定時掉線檢查器(IUserOnLineChecker)。當伺服器檢查或發現某使用者下線時,即可調用ITcpClientsController.DisposeOneConnection方法來釋放對應的連接配接和Context。(你應該還記得ITcp接口是從ITcpClientsController繼承的)。關于這個問題,你有更好的解決辦法嗎?

    感謝關注!