天天看點

C# 一個高性能異步socket封裝庫的實作思路

前言

socket是軟體之間通訊最常用的一種方式。c#實作socket通訊有很多中方法,其中效率最高就是異步通訊。

異步通訊實際是利用windows完成端口(IOCP)來處理的,關于完成端口實作原理,大家可以參考網上文章。

我這裡想強調的是采用完成端口機制的異步通訊是windows下效率最高的通訊方式,沒有之一!

異步通訊比同步通訊處理要難很多,代碼編寫中會遇到許多“坑“。如果沒有經驗,很難完成。

我搜集了大量資料,完成了對異步socket的封裝。此庫已用穩定高效的運作幾個月。

縱觀網上的資料,我還沒有遇到一個滿意的封裝庫。許多文章把資料收發和協定處理雜糅在一塊,代碼非常難懂,也無法擴充。

在編寫該庫時,避免以上缺陷。将邏輯處理階層化,子產品化!同時實作了高可用性與高性能。

為了使大家對通訊效率有初步了解,先看測試圖。

C# 一個高性能異步socket封裝庫的實作思路
C# 一個高性能異步socket封裝庫的實作思路

用戶端和服務端都是本機測試,最大連接配接數為64422,套接字已耗盡!

主機配置情況

C# 一個高性能異步socket封裝庫的實作思路

百兆帶寬基本占滿,cpu占用40%,我的電腦在空閑時,cpu占用大概20%,也就是說程式占用cpu 20%左右。

這個庫是可擴充的,就是說即使10萬個連接配接,收發同樣的資料,cpu占用基本相同。

庫的結構圖 

C# 一個高性能異步socket封裝庫的實作思路

目标

  1. 即可作為服務端(監聽)也可以作為用戶端(主動連接配接)使用。
  2. 可以适應任何網絡協定。收發的資料針對位元組流或一個完整的包。對協定内容不做處理。
  3. 高可用性。将複雜的底層處理封裝,對外接口非常友好。
  4. 高性能。最大限度優化處理。單機可支援數萬連接配接,收發速度可達幾百兆bit。

實作思路

網絡處理邏輯可以分為以下幾個部分:

  1. 網絡監聽   可以在多個端口實作監聽。負責生成socket,生成的socket供後續處理。監聽子產品功能比較單一,如有必要,可對監聽子產品做進一步優化。
  2. 主動連接配接  可以異步或同步的連接配接對方。連接配接成功後,對socket的後續處理,與監聽得到的socket完全一樣。注:無論是監聽得到的socket,還是連接配接得到的socket,後續處理完全一樣。
  3. Socket收發處理   每個socket對應一個收發執行個體,socket收發隻針對位元組流處理。收發時,做了優化。比如發送時,對資料做了沾包,提高發送性能;接收時,一次投遞1K的資料。
  4. 組包處理   一般資料包都有包長度訓示;比如 報頭的前倆個位元組表示長度,根據這個值就可以組成一個完整的包。

 NetListener 監聽

using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
 
namespace IocpCore
{
    class NetListener
    {
        private Socket listenSocket;
        public ListenParam _listenParam { get; set; }
        public event Action<ListenParam, AsyncSocketClient> OnAcceptSocket;
 
        bool start;
 
        NetServer _netServer;
        public NetListener(NetServer netServer)
        {
            _netServer = netServer;
        }
 
        public int _acceptAsyncCount = 0;
        public bool StartListen()
        {
            try
            {
                start = true;
                IPEndPoint listenPoint = new IPEndPoint(IPAddress.Parse("0.0.0.0"), _listenParam._port);
                listenSocket = new Socket(listenPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                listenSocket.Bind(listenPoint);
                listenSocket.Listen(200);
 
                Thread thread1 = new Thread(new ThreadStart(NetProcess));
                thread1.Start();
                
                StartAccept();
                return true;
            }
            catch (Exception ex)
            {
                NetLogger.Log(string.Format("**監聽異常!{0}", ex.Message));
                return false;
            }
        }
 
        AutoResetEvent _acceptEvent = new AutoResetEvent(false);
        private void NetProcess()
        {
            while (start)
            {
                DealNewAccept();
                _acceptEvent.WaitOne(1000 * 10);
            }
        }
 
        private void DealNewAccept()
        {
            try
            {
                if(_acceptAsyncCount <= 10)
                {
                    StartAccept();
                }
 
                while (true)
                {
                    AsyncSocketClient client = _newSocketClientList.GetObj();
                    if (client == null)
                        break;
 
                    DealNewAccept(client);
                }
            }
            catch (Exception ex)
            {
                NetLogger.Log(string.Format("DealNewAccept 異常 {0}***{1}", ex.Message, ex.StackTrace));
            }
        }
 
        private void DealNewAccept(AsyncSocketClient client)
        {
            client.SendBufferByteCount = _netServer.SendBufferBytePerClient;
           OnAcceptSocket?.Invoke(_listenParam, client);
        }
 
        private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs)
        {
            try
            {
                Interlocked.Decrement(ref _acceptAsyncCount);
                _acceptEvent.Set();
                acceptEventArgs.Completed -= AcceptEventArg_Completed;
                ProcessAccept(acceptEventArgs);
            }
            catch (Exception ex)
            {
                NetLogger.Log(string.Format("AcceptEventArg_Completed {0}***{1}", ex.Message, ex.StackTrace));
            }
        }
 
        public bool StartAccept()
        {
            SocketAsyncEventArgs acceptEventArgs = new SocketAsyncEventArgs();
            acceptEventArgs.Completed += AcceptEventArg_Completed;
 
            bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs);
            Interlocked.Increment(ref _acceptAsyncCount);
 
            if (!willRaiseEvent)
            {
                Interlocked.Decrement(ref _acceptAsyncCount);
                _acceptEvent.Set();
                acceptEventArgs.Completed -= AcceptEventArg_Completed;
                ProcessAccept(acceptEventArgs);
            }
            return true;
        }
 
        ObjectPool<AsyncSocketClient> _newSocketClientList = new ObjectPool<AsyncSocketClient>();
        private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)
        {
            try
            {
                using (acceptEventArgs)
                {
                    if (acceptEventArgs.AcceptSocket != null)
                    {
                        AsyncSocketClient client = new AsyncSocketClient(acceptEventArgs.AcceptSocket);
                        client.CreateClientInfo(this);
 
                        _newSocketClientList.PutObj(client);
                        _acceptEvent.Set();
                    }
                }
            }
            catch (Exception ex)
            {
                NetLogger.Log(string.Format("ProcessAccept {0}***{1}", ex.Message, ex.StackTrace));
            }
        }
    }
}      

NetConnectManage連接配接處理

using System;
using System.Net;
using System.Net.Sockets;

namespace IocpCore
{
    class NetConnectManage
    {
        public event Action<SocketEventParam, AsyncSocketClient> OnSocketConnectEvent;

        public bool ConnectAsyn(string peerIp, int peerPort, object tag)
        {
            try
            {
                Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
                SocketAsyncEventArgs socketEventArgs = new SocketAsyncEventArgs();
                socketEventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);
                socketEventArgs.Completed += SocketConnect_Completed;

                SocketClientInfo clientInfo = new SocketClientInfo();
                socketEventArgs.UserToken = clientInfo;
                clientInfo.PeerIp = peerIp;
                clientInfo.PeerPort = peerPort;
                clientInfo.Tag = tag;

                bool willRaiseEvent = socket.ConnectAsync(socketEventArgs);
                if (!willRaiseEvent)
                {
                    ProcessConnect(socketEventArgs);
                    socketEventArgs.Completed -= SocketConnect_Completed;
                    socketEventArgs.Dispose();
                }
                return true;
            }
            catch (Exception ex)
            {
                NetLogger.Log("ConnectAsyn",ex);
                return false;
            }
        }

        private void SocketConnect_Completed(object sender, SocketAsyncEventArgs socketEventArgs)
        {
            ProcessConnect(socketEventArgs);
            socketEventArgs.Completed -= SocketConnect_Completed;
            socketEventArgs.Dispose();
        }

        private void ProcessConnect(SocketAsyncEventArgs socketEventArgs)
        {
            SocketClientInfo clientInfo = socketEventArgs.UserToken as SocketClientInfo;
            if (socketEventArgs.SocketError == SocketError.Success)
            {
                DealConnectSocket(socketEventArgs.ConnectSocket, clientInfo);
            }
            else
            {
                SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, null);
                socketParam.ClientInfo = clientInfo;
                OnSocketConnectEvent?.Invoke(socketParam, null);
            }
        }


        void DealConnectSocket(Socket socket, SocketClientInfo clientInfo)
        {
            clientInfo.SetClientInfo(socket);

            AsyncSocketClient client = new AsyncSocketClient(socket);
            client.SetClientInfo(clientInfo);

            //觸發事件
            SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, socket);
            socketParam.ClientInfo = clientInfo;
            OnSocketConnectEvent?.Invoke(socketParam, client);
        }

        public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
        {
            socket = null;
            try
            {
                Socket socketTmp = new Socket(SocketType.Stream, ProtocolType.Tcp);

                SocketClientInfo clientInfo = new SocketClientInfo();
                clientInfo.PeerIp = peerIp;
                clientInfo.PeerPort = peerPort;
                clientInfo.Tag = tag;

                EndPoint remoteEP = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);
                socketTmp.Connect(remoteEP);
                if (!socketTmp.Connected)
                    return false;

                DealConnectSocket(socketTmp, clientInfo);
                socket = socketTmp;
                return true;
            }
            catch (Exception ex)
            {
                NetLogger.Log(string.Format("連接配接對方:({0}:{1})出錯!", peerIp, peerPort), ex);
                return false;
            }
        }
    }
}      

AsyncSocketClient socket收發處理

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;

namespace IocpCore
{
    public class AsyncSocketClient
    {
        public static int IocpReadLen = 1024;

        public readonly Socket ConnectSocket;

        protected SocketAsyncEventArgs m_receiveEventArgs;
        public SocketAsyncEventArgs ReceiveEventArgs { get { return m_receiveEventArgs; } set { m_receiveEventArgs = value; } }
        protected byte[] m_asyncReceiveBuffer;

        protected SocketAsyncEventArgs m_sendEventArgs;
        public SocketAsyncEventArgs SendEventArgs { get { return m_sendEventArgs; } set { m_sendEventArgs = value; } }
        protected byte[] m_asyncSendBuffer;

        public event Action<AsyncSocketClient, byte[]> OnReadData;
        public event Action<AsyncSocketClient, int> OnSendData;
        public event Action<AsyncSocketClient> OnSocketClose;

        static object releaseLock = new object();
        public static int createCount = 0;
        public static int releaseCount = 0;

        ~AsyncSocketClient()
        {
            lock (releaseLock)
            {
                releaseCount++;
            }
        }

        public AsyncSocketClient(Socket socket)
        {
            lock (releaseLock)
            {
                createCount++;
            }

            ConnectSocket = socket;

            m_receiveEventArgs = new SocketAsyncEventArgs();
            m_asyncReceiveBuffer = new byte[IocpReadLen];
            m_receiveEventArgs.AcceptSocket = ConnectSocket;
            m_receiveEventArgs.Completed += ReceiveEventArgs_Completed;

            m_sendEventArgs = new SocketAsyncEventArgs();
            m_asyncSendBuffer = new byte[IocpReadLen * 2];
            m_sendEventArgs.AcceptSocket = ConnectSocket;
            m_sendEventArgs.Completed += SendEventArgs_Completed;
        }

        SocketClientInfo _clientInfo;

        public SocketClientInfo ClientInfo
        {
            get
            {
                return _clientInfo;
            }
        }

        internal void CreateClientInfo(NetListener netListener)
        {
            _clientInfo = new SocketClientInfo();
            try
            {
                _clientInfo.Tag = netListener._listenParam._tag;
                IPEndPoint ip = ConnectSocket.LocalEndPoint as IPEndPoint;
                Debug.Assert(netListener._listenParam._port == ip.Port);

                _clientInfo.LocalIp = ip.Address.ToString();
                _clientInfo.LocalPort = netListener._listenParam._port;

                ip = ConnectSocket.RemoteEndPoint as IPEndPoint;
                _clientInfo.PeerIp = ip.Address.ToString();
                _clientInfo.PeerPort = ip.Port;
            }
            catch (Exception ex)
            {
                NetLogger.Log("CreateClientInfo", ex);
            }
        }
        internal void SetClientInfo(SocketClientInfo clientInfo)
        {
            _clientInfo = clientInfo;
        }

        #region read process
        bool _inReadPending = false;
        public EN_SocketReadResult ReadNextData()
        {
            lock (this)
            {
                if (_socketError)
                    return EN_SocketReadResult.ReadError;
                if (_inReadPending)
                    return EN_SocketReadResult.InAsyn;
                if(!ConnectSocket.Connected)
                {
                    OnReadError();
                    return EN_SocketReadResult.ReadError;
                }

                try
                {
                    m_receiveEventArgs.SetBuffer(m_asyncReceiveBuffer, 0, m_asyncReceiveBuffer.Length);
                    _inReadPending = true;
                    bool willRaiseEvent = ConnectSocket.ReceiveAsync(ReceiveEventArgs); //投遞接收請求
                    if (!willRaiseEvent)
                    {
                        _inReadPending = false;
                        ProcessReceive();
                        if (_socketError)
                        {
                            OnReadError();
                            return EN_SocketReadResult.ReadError;
                        }
                        return EN_SocketReadResult.HaveRead;
                    }
                    else
                    {
                        return EN_SocketReadResult.InAsyn;
                    }
                }
                catch (Exception ex)
                {
                    NetLogger.Log("ReadNextData", ex);
                    _inReadPending = false;
                    OnReadError();
                    return EN_SocketReadResult.ReadError;
                }
            }
        }

        private void ProcessReceive()
        {
            if (ReceiveEventArgs.BytesTransferred > 0
                && ReceiveEventArgs.SocketError == SocketError.Success)
            {
                int offset = ReceiveEventArgs.Offset;
                int count = ReceiveEventArgs.BytesTransferred;

                byte[] readData = new byte[count];
                Array.Copy(m_asyncReceiveBuffer, offset, readData, 0, count);

                _inReadPending = false;
                if (!_socketError)
                    OnReadData?.Invoke(this, readData);
            }
            else
            {
                _inReadPending = false;
                OnReadError();
            }
        }

        private void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e)
        {
            lock (this)
            {
                _inReadPending = false;
                ProcessReceive();
                if (_socketError)
                {
                    OnReadError();
                }
            }
        }

        bool _socketError = false;
        private void OnReadError()
        {
            lock (this)
            {
                if (_socketError == false)
                {
                    _socketError = true;
                    OnSocketClose?.Invoke(this);
                }
                CloseClient();
            }
        }
        #endregion

        #region send process
        int _sendBufferByteCount = 102400;
        public int SendBufferByteCount
        {
            get
            {
                return _sendBufferByteCount;
            }
            set
            {
                if (value < 1024)
                {
                    _sendBufferByteCount = 1024;
                }
                else
                {
                    _sendBufferByteCount = value;
                }
            }
        }

        SendBufferPool _sendDataPool = new SendBufferPool();
        internal EN_SendDataResult PutSendData(byte[] data)
        {
           //此處省略302         }

        private void SendEventArgs_Completed(object sender, SocketAsyncEventArgs sendEventArgs)
        {
            lock (this)
            {
                try
                {
                    _inSendPending = false;
                    ProcessSend(m_sendEventArgs);

                    int sendCount = 0;
                    if (sendEventArgs.SocketError == SocketError.Success)
                    {
                        sendCount = sendEventArgs.BytesTransferred;
                    }
                    OnSendData?.Invoke(this, sendCount);

                    if (_socketError)
                    {
                        OnSendError();
                    }
                }
                catch (Exception ex)
                {
                    NetLogger.Log("SendEventArgs_Completed", ex);
                }
            }
        }

        private bool ProcessSend(SocketAsyncEventArgs sendEventArgs)
        {
            if (sendEventArgs.SocketError == SocketError.Success)
            {
                return true;
            }
            else
            {
                OnSendError();
                return false;
            }
        }

        private int GetSendData()
        {
            int dataLen = 0;
            while (true)
            {
                byte[] data = _sendDataPool.GetObj();
                if (data == null)
                    return dataLen;
                Array.Copy(data, 0, m_asyncSendBuffer, dataLen, data.Length);
                dataLen += data.Length;
                if (dataLen > IocpReadLen)
                    break;
            }
            return dataLen;
        }
        private void OnSendError()
        {
            lock (this)
            {
                if (_socketError == false)
                {
                    _socketError = true;
                    OnSocketClose?.Invoke(this);
                }
                CloseClient();
            }
        }
        #endregion

        internal void CloseSocket()
        {
            try
            {
                ConnectSocket.Close();
            }
            catch (Exception ex)
            {
                NetLogger.Log("CloseSocket", ex);
            }
        }

        static object socketCloseLock = new object();
        public static int closeSendCount = 0;
        public static int closeReadCount = 0;

        bool _disposeSend = false;
        void CloseSend()
        {
            if (!_disposeSend && !_inSendPending)
            {
                lock (socketCloseLock)
                    closeSendCount++;

                _disposeSend = true;
                m_sendEventArgs.SetBuffer(null, 0, 0);
                m_sendEventArgs.Completed -= SendEventArgs_Completed;
                m_sendEventArgs.Dispose();
            }
        }

        bool _disposeRead = false;
        void CloseRead()
        {
            if (!_disposeRead && !_inReadPending)
            {
                lock (socketCloseLock)
                    closeReadCount++;

                _disposeRead = true;
                m_receiveEventArgs.SetBuffer(null, 0, 0);
                m_receiveEventArgs.Completed -= ReceiveEventArgs_Completed;
                m_receiveEventArgs.Dispose();
            }
        }
        private void CloseClient()
        {
            try
            {
                CloseSend();
                CloseRead();
                ConnectSocket.Close();
            }
            catch (Exception ex)
            {
                NetLogger.Log("CloseClient", ex);
            }
        }

        //發送緩沖大小
        private List<byte[]> SplitData(byte[] data, int maxLen)
        {
            List<byte[]> items = new List<byte[]>();

            int start = 0;
            while (true)
            {
                int itemLen = Math.Min(maxLen, data.Length - start);
                if (itemLen == 0)
                    break;
                byte[] item = new byte[itemLen];
                Array.Copy(data, start, item, 0, itemLen);
                items.Add(item);

                start += itemLen;
            }
            return items;
        }
    }

    public enum EN_SocketReadResult
    {
        InAsyn,
        HaveRead,
        ReadError
    }

    public enum EN_SocketSendResult
    {
        InAsyn,
        HaveSend,
        NoSendData,
        SendError
    }

    class SendBufferPool
    {
        ObjectPool<byte[]> _bufferPool = new ObjectPool<byte[]>();

        public Int64 _bufferByteCount = 0;
        public bool PutObj(byte[] obj)
        {
            if (_bufferPool.PutObj(obj))
            {
                lock (this)
                {
                    _bufferByteCount += obj.Length;
                }
                return true;
            }
            else
            {
                return false;
            }
        }

        public byte[] GetObj()
        {
            byte[] result = _bufferPool.GetObj();
            if (result != null)
            {
                lock (this)
                {
                    _bufferByteCount -= result.Length;
                }
            }
            return result;
        }
    }
}      

NetServer  聚合其他類

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Sockets;
using System.Threading;

namespace IocpCore
{
    public class NetServer
    {
        public Action<SocketEventParam> OnSocketPacketEvent;

        //每個連接配接發送緩沖大小
        public int SendBufferBytePerClient { get; set; } = 1024 * 100;

        bool _serverStart = false;
        List<NetListener> _listListener = new List<NetListener>();

        //負責對收到的位元組流 組成完成的包
        ClientPacketManage _clientPacketManage;

        public Int64 SendByteCount { get; set; }
        public Int64 ReadByteCount { get; set; }

        List<ListenParam> _listListenPort = new List<ListenParam>();
        public void AddListenPort(int port, object tag)
        {
            _listListenPort.Add(new ListenParam(port, tag));
        }
        /// <summary>
        /// 
        /// </summary>
        /// <param name="listenFault">監聽失敗的端口</param>
        /// <returns></returns>
        public bool StartListen(out List<int> listenFault)
        {
            _serverStart = true;

            _clientPacketManage = new ClientPacketManage(this);
            _clientPacketManage.OnSocketPacketEvent += PutClientPacket;

            _netConnectManage.OnSocketConnectEvent += SocketConnectEvent;

            _listListener.Clear();
            Thread thread1 = new Thread(new ThreadStart(NetPacketProcess));
            thread1.Start();

            Thread thread2 = new Thread(new ThreadStart(NetSendProcess));
            thread2.Start();

            Thread thread3 = new Thread(new ThreadStart(NetReadProcess));
            thread3.Start();

            listenFault = new List<int>();
            foreach (ListenParam param in _listListenPort)
            {
                NetListener listener = new NetListener(this);
                listener._listenParam = param;
                listener.OnAcceptSocket += Listener_OnAcceptSocket;
                if (!listener.StartListen())
                {
                    listenFault.Add(param._port);
                }
                else
                {
                    _listListener.Add(listener);
                    NetLogger.Log(string.Format("監聽成功!端口:{0}", param._port));
                }
            }

            return listenFault.Count == 0;
        }

        public void PutClientPacket(SocketEventParam param)
        {
            OnSocketPacketEvent?.Invoke(param);
        }

        //擷取包的最小長度
        int _packetMinLen;
        int _packetMaxLen;
        public int PacketMinLen
        {
            get { return _packetMinLen; }
        }
        public int PacketMaxLen
        {
            get { return _packetMaxLen; }
        }

        /// <summary>
        /// 設定包的最小和最大長度
        /// 當minLen=0時,認為是接收位元組流
        /// </summary>
        /// <param name="minLen"></param>
        /// <param name="maxLen"></param>
        public void SetPacketParam(int minLen, int maxLen)
        {
            Debug.Assert(minLen >= 0);
            Debug.Assert(maxLen > minLen);
            _packetMinLen = minLen;
            _packetMaxLen = maxLen;
        }

        //擷取包的總長度
        public delegate int delegate_GetPacketTotalLen(byte[] data, int offset);
        public delegate_GetPacketTotalLen GetPacketTotalLen_Callback;

        ObjectPoolWithEvent<SocketEventParam> _socketEventPool = new ObjectPoolWithEvent<SocketEventParam>();
        private void NetPacketProcess()
        {
            while (_serverStart)
            {
                try
                {
                    DealEventPool();
                }
                catch (Exception ex)
                {
                    NetLogger.Log(string.Format("DealEventPool 異常 {0}***{1}", ex.Message, ex.StackTrace));
                }
                _socketEventPool.WaitOne(1000);
            }
        }

        Dictionary<Socket, AsyncSocketClient> _clientGroup = new Dictionary<Socket, AsyncSocketClient>();
        public int ClientCount
        {
            get
            {
                lock (_clientGroup)
                {
                    return _clientGroup.Count;
                }
            }
        }
        public List<Socket> ClientList
        {
            get
            {
                lock (_clientGroup)
                {
                    return _clientGroup.Keys.ToList();
                }
            }
        }

        private void DealEventPool()
        {
            while (true)
            {
                SocketEventParam param = _socketEventPool.GetObj();
                if (param == null)
                    return;

                if (param.SocketEvent == EN_SocketEvent.close)
                {
                    lock (_clientGroup)
                    {
                        _clientGroup.Remove(param.Socket);
                    }
                }

                if (_packetMinLen == 0)//位元組流處理
                {
                    OnSocketPacketEvent?.Invoke(param);
                }
                else
                {
                    //組成一個完整的包 邏輯
                    _clientPacketManage.PutSocketParam(param);
                }
            }
        }

        private void SocketConnectEvent(SocketEventParam param, AsyncSocketClient client)
        {
            try
            {
                if (param.Socket == null || client == null) //連接配接失敗
                {
                   
                }
                else
                {
                    lock (_clientGroup)
                    {
                        bool remove = _clientGroup.Remove(client.ConnectSocket);
                        Debug.Assert(!remove);
                        _clientGroup.Add(client.ConnectSocket, client);
                    }

                    client.OnSocketClose += Client_OnSocketClose;
                    client.OnReadData += Client_OnReadData;
                    client.OnSendData += Client_OnSendData;

                    _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
                }
                _socketEventPool.PutObj(param);
            }
            catch (Exception ex)
            {
                NetLogger.Log(string.Format("SocketConnectEvent 異常 {0}***{1}", ex.Message, ex.StackTrace));
            }
        }

        internal void OnRcvPacketLenError(Socket socket, byte[] buffer, int offset, int packetLen)
        {
            try
            {
                lock (_clientGroup)
                {
                    if (!_clientGroup.ContainsKey(socket))
                    {
                        Debug.Assert(false);
                        return;
                    }

                    NetLogger.Log(string.Format("報長度異常!包長:{0}", packetLen));
                    AsyncSocketClient client = _clientGroup[socket];
                    client.CloseSocket();
                }
            }
            catch (Exception ex)
            {
                NetLogger.Log(string.Format("OnRcvPacketLenError 異常 {0}***{1}", ex.Message, ex.StackTrace));
            }
        }

        #region listen port
        private void Listener_OnAcceptSocket(ListenParam listenPatam, AsyncSocketClient client)
        {
            try
            {
                lock (_clientGroup)
                {
                    bool remove = _clientGroup.Remove(client.ConnectSocket);
                    Debug.Assert(!remove);
                    _clientGroup.Add(client.ConnectSocket, client);
                }

                client.OnSocketClose += Client_OnSocketClose;
                client.OnReadData += Client_OnReadData;
                client.OnSendData += Client_OnSendData;

                _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));

                SocketEventParam param = new SocketEventParam(EN_SocketEvent.accept, client.ConnectSocket);
                param.ClientInfo = client.ClientInfo;

                _socketEventPool.PutObj(param);
            }
            catch (Exception ex)
            {
                NetLogger.Log(string.Format("Listener_OnAcceptSocket 異常 {0}***{1}", ex.Message, ex.StackTrace));
            }
        }


        ObjectPoolWithEvent<SocketEventDeal> _listSendEvent = new ObjectPoolWithEvent<SocketEventDeal>();
        private void NetSendProcess()
        {
            while (true)
            {
                DealSendEvent();
                _listSendEvent.WaitOne(1000);
            }
        }

        ObjectPoolWithEvent<SocketEventDeal> _listReadEvent = new ObjectPoolWithEvent<SocketEventDeal>();
        private void NetReadProcess()
        {
            while (true)
            {
                DealReadEvent();
                _listReadEvent.WaitOne(1000);
            }
        }

        
        private void DealSendEvent()
        {
            while (true)
            {
                SocketEventDeal item = _listSendEvent.GetObj();
                if (item == null)
                    break;
                switch (item.SocketEvent)
                {
                    case EN_SocketDealEvent.send:
                        {
                            while (true)
                            {
                                EN_SocketSendResult result = item.Client.SendNextData();
                                if (result == EN_SocketSendResult.HaveSend)
                                    continue;
                                else
                                    break;
                            }
                        }
                        break;
                    case EN_SocketDealEvent.read:
                        {
                            Debug.Assert(false);
                        }
                        break;                   
                }
            }
        }

        private void DealReadEvent()
        {
            while (true)
            {
                SocketEventDeal item = _listReadEvent.GetObj();
                if (item == null)
                    break;
                switch (item.SocketEvent)
                {
                    case EN_SocketDealEvent.read:
                        {
                            while (true)
                            {
                                EN_SocketReadResult result = item.Client.ReadNextData();
                                if (result == EN_SocketReadResult.HaveRead)
                                    continue;
                                else
                                    break;
                            }
                        }
                        break;
                    case EN_SocketDealEvent.send:
                        {
                            Debug.Assert(false);
                        }
                        break;
                }
            }
        }

        private void Client_OnReadData(AsyncSocketClient client, byte[] readData)
        {
            //讀下一條
            _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));

            try
            {
                SocketEventParam param = new SocketEventParam(EN_SocketEvent.read, client.ConnectSocket);
                param.ClientInfo = client.ClientInfo;
                param.Data = readData;
                _socketEventPool.PutObj(param);

                lock (this)
                {
                    ReadByteCount += readData.Length;
                }
            }
            catch (Exception ex)
            {
                NetLogger.Log(string.Format("Client_OnReadData 異常 {0}***{1}", ex.Message, ex.StackTrace));
            }
        }
#endregion

        private void Client_OnSendData(AsyncSocketClient client, int sendCount)
        {
            //發送下一條
            _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));
            lock (this)
            {
                SendByteCount += sendCount;
            }
        }

        private void Client_OnSocketClose(AsyncSocketClient client)
        {
            try
            {
                SocketEventParam param = new SocketEventParam(EN_SocketEvent.close, client.ConnectSocket);
                param.ClientInfo = client.ClientInfo;
                _socketEventPool.PutObj(param);
            }
            catch (Exception ex)
            {
                NetLogger.Log(string.Format("Client_OnSocketClose 異常 {0}***{1}", ex.Message, ex.StackTrace));
            }
        }

        /// <summary>
        /// 放到發送緩沖
        /// </summary>
        /// <param name="socket"></param>
        /// <param name="data"></param>
        /// <returns></returns>
        public EN_SendDataResult SendData(Socket socket, byte[] data)
        {
            if (socket == null)
                return EN_SendDataResult.no_client;
            lock (_clientGroup)
            {
                if (!_clientGroup.ContainsKey(socket))
                    return EN_SendDataResult.no_client;
                AsyncSocketClient client = _clientGroup[socket];
                EN_SendDataResult result = client.PutSendData(data);
                if (result == EN_SendDataResult.ok)
                {
                    //發送下一條
                    _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));                  
                }
                return result;
            }
        }

        /// <summary>
        /// 設定某個連接配接的發送緩沖大小
        /// </summary>
        /// <param name="socket"></param>
        /// <param name="byteCount"></param>
        /// <returns></returns>
        public bool SetClientSendBuffer(Socket socket, int byteCount)
        {
            lock (_clientGroup)
            {
                if (!_clientGroup.ContainsKey(socket))
                    return false;
                AsyncSocketClient client = _clientGroup[socket];
                client.SendBufferByteCount = byteCount;
                return true;
            }
        }


        #region connect process
        NetConnectManage _netConnectManage = new NetConnectManage();
        /// <summary>
        /// 異步連接配接一個用戶端
        /// </summary>
        /// <param name="peerIp"></param>
        /// <param name="peerPort"></param>
        /// <param name="tag"></param>
        /// <returns></returns>
        public bool ConnectAsyn(string peerIp, int peerPort, object tag)
        {
            return _netConnectManage.ConnectAsyn(peerIp, peerPort, tag);
        }

        /// <summary>
        /// 同步連接配接一個用戶端
        /// </summary>
        /// <param name="peerIp"></param>
        /// <param name="peerPort"></param>
        /// <param name="tag"></param>
        /// <param name="socket"></param>
        /// <returns></returns>
        public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
        {
            return _netConnectManage.Connect(peerIp, peerPort, tag, out socket);
        }
        #endregion
    }

    enum EN_SocketDealEvent
    {
        read,
        send,
    }
    class SocketEventDeal
    {
        public AsyncSocketClient Client { get; set; }
        public EN_SocketDealEvent SocketEvent { get; set; }
        public SocketEventDeal(AsyncSocketClient client, EN_SocketDealEvent socketEvent)
        {
            Client = client;
            SocketEvent = socketEvent;
        }
    }
}      

庫的使用

使用起來非常簡單,示例如下 

using IocpCore;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using System.Windows;

namespace WarningClient
{
    public class SocketServer
    {
        public Action<SocketEventParam> OnSocketEvent;

        public Int64 SendByteCount
        {
            get
            {
                if (_netServer == null)
                    return 0;
                return _netServer.SendByteCount;
            }
        }
        public Int64 ReadByteCount
        {
            get
            {
                if (_netServer == null)
                    return 0;
                return _netServer.ReadByteCount;
            }
        }

        NetServer _netServer;
        EN_PacketType _packetType = EN_PacketType.byteStream;
        public void SetPacktType(EN_PacketType packetType)
        {
            _packetType = packetType;
            if (_netServer == null)
                return;
            if (packetType == EN_PacketType.byteStream)
            {
                _netServer.SetPacketParam(0, 1024);
            }
            else
            {
                _netServer.SetPacketParam(9, 1024);
            }
        }

        public bool Init(List<int> listenPort)
        {
            NetLogger.OnLogEvent += NetLogger_OnLogEvent;
            _netServer = new NetServer();
            SetPacktType(_packetType);
            _netServer.GetPacketTotalLen_Callback += GetPacketTotalLen;
            _netServer.OnSocketPacketEvent += SocketPacketDeal;

            foreach (int n in listenPort)
            {
                _netServer.AddListenPort(n, n);
            }

            List<int> listenFault;
            bool start = _netServer.StartListen(out listenFault);
            return start;
        }

        int GetPacketTotalLen(byte[] data, int offset)
        {
            if (MainWindow._packetType == EN_PacketType.znss)
                return GetPacketZnss(data, offset);
            else
                return GetPacketAnzhiyuan(data, offset);
        }

        int GetPacketAnzhiyuan(byte[] data, int offset)
        {
            int n = data[offset + 5] + 6;
            return n;
        }

        int GetPacketZnss(byte[] data, int offset)
        {
            int packetLen = (int)(data[4]) + 5;
            return packetLen;
        }


        public bool ConnectAsyn(string peerIp, int peerPort, object tag)
        {
            return _netServer.ConnectAsyn(peerIp, peerPort, tag);
        }

        public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
        {
            return _netServer.Connect(peerIp, peerPort, tag, out socket);
        }

        private void NetLogger_OnLogEvent(string message)
        {
            AppLog.Log(message);
        }

        Dictionary<Socket, SocketEventParam> _clientGroup = new Dictionary<Socket, SocketEventParam>();

        public int ClientCount
        {
            get
            {
                lock (_clientGroup)
                {
                    return _clientGroup.Count;
                }
            }
        }
        public List<Socket> ClientList
        {
            get
            {
                if (_netServer != null)
                    return _netServer.ClientList;
                return new List<Socket>();
            }
        }
        void AddClient(SocketEventParam socketParam)
        {
            lock (_clientGroup)
            {
                _clientGroup.Remove(socketParam.Socket);
                _clientGroup.Add(socketParam.Socket, socketParam);
            }
        }

        void RemoveClient(SocketEventParam socketParam)
        {
            lock (_clientGroup)
            {
                _clientGroup.Remove(socketParam.Socket);
            }
        }

        ObjectPool<SocketEventParam> _readDataPool = new ObjectPool<SocketEventParam>();

        public ObjectPool<SocketEventParam> ReadDataPool
        {
            get
            {
                return _readDataPool;
            }
        }

        private void SocketPacketDeal(SocketEventParam socketParam)
        {
            OnSocketEvent?.Invoke(socketParam);
            if (socketParam.SocketEvent == EN_SocketEvent.read)
            {
                if (MainWindow._isShowReadPacket)
                    _readDataPool.PutObj(socketParam);
            }
            else if (socketParam.SocketEvent == EN_SocketEvent.accept)
            {
                AddClient(socketParam);
                string peerIp = socketParam.ClientInfo.PeerIpPort;
                AppLog.Log(string.Format("用戶端連結!本地端口:{0},對端:{1}",
                    socketParam.ClientInfo.LocalPort, peerIp));
            }
            else if (socketParam.SocketEvent == EN_SocketEvent.connect)
            {
                string peerIp = socketParam.ClientInfo.PeerIpPort;
                if (socketParam.Socket != null)
                {
                    AddClient(socketParam);

                    AppLog.Log(string.Format("連接配接對端成功!本地端口:{0},對端:{1}",
                       socketParam.ClientInfo.LocalPort, peerIp));
                }
                else
                {
                    AppLog.Log(string.Format("連接配接對端失敗!本地端口:{0},對端:{1}",
                        socketParam.ClientInfo.LocalPort, peerIp));
                }
            }
            else if (socketParam.SocketEvent == EN_SocketEvent.close)
            {
                MainWindow.MainWnd.OnSocketDisconnect(socketParam.Socket);
                RemoveClient(socketParam);
                string peerIp = socketParam.ClientInfo.PeerIpPort;
                AppLog.Log(string.Format("用戶端斷開!本地端口:{0},對端:{1},",
                    socketParam.ClientInfo.LocalPort, peerIp));
            }
        }

        public EN_SendDataResult SendData(Socket socket, byte[] data)
        {
            if(socket == null)
            {
                MessageBox.Show("還沒連接配接!");
                return EN_SendDataResult.no_client;
            }
            return _netServer.SendData(socket, data);
        }

        internal void SendToAll(byte[] data)
        {
            lock (_clientGroup)
            {
                foreach (Socket socket in _clientGroup.Keys)
                {
                    SendData(socket, data);
                }
            }
        }
    }
}