天天看點

.NET Core 實作的 TCP 通訊元件 BeetleXBeetleX

BeetleX

beetleX是基于dotnet core實作的輕量級高性能的TCP通訊元件,使用友善、性能高效和安全可靠是元件設計的出發點!開發人員可以在Beetlx元件的支援下快帶地建構高性能的TCP通訊服務程式,在安全通訊方面隻需要簡單地設定一下SSL資訊即可實作可靠安全的SSL服務。

https://gitee.com/ikende/BeetleX#%E9%AB%98%E6%95%88%E7%B2%BE%E7%AE%80%E7%9A%84http%E6%9C%8D%E5%8A%A1%E6%89%A9%E5%B1%95-fasthttpapi-%E5%9C%A8api%E6%9C%8D%E5%8A%A1%E5%9C%BA%E6%99%AF%E6%95%88%E7%8E%87%E8%BF%9C%E9%AB%98%E4%BA%8Easpnet-core-kestrel 高效精簡的HTTP服務擴充  FastHttpApi  在API服務場景效率遠高于asp.net core Kestrel

https://gitee.com/ikende/BeetleX#%E4%BD%BF%E7%94%A8%E6%96%B9%E4%BE%BF%E6%80%A7 使用友善性

beetleX網絡流讀寫是基于Stream标準來建構,僅僅基于Stream的基礎讀寫對于應用者來說還是過于繁瑣;元件為了更友善進行網絡資料處理在Stream的基礎之上擴充了一系列的讀寫規則:ReadLine、ReadInt、WriteLine、WriteInt等一系列簡便方法,在這些方法的支援下使用者就可以更輕松地處理資料;為了在網絡通訊中更好的相容其他平台協定以上方法都相容Big-Endian和Little-Endian不同方式。為了更好地利用現有序列化元件,元件通過IPacket接口規範消息擴充,通過實作不同的Packet解釋器,即可以實作基于Protobuf,json和Msgpack等方式的對象資料傳輸。

https://gitee.com/ikende/BeetleX#%E9%AB%98%E6%80%A7%E8%83%BD%E7%89%B9%E6%80%A7 高性能特性

beetleX的高性能是建立在内部一個資料流處理對象PipeStream,它是建構在Stream标準之上;它和.NET内置的NetworkStream最大的差别是PipeStream的讀寫基于SocketAsyncEventArgs實作,這正是在編寫高性能網絡資料處理所提倡的模式。PipeStream不僅在網絡資料處理模式上有着性能的優勢,在記憶體讀寫上和MemoryStream也有着很大的差別;由于PipeStream的記憶體塊是以一個基于連結清單的SocketAsyncEventArgs Buffer 組成,是以PipeStream在寫入大資料的情況并不存在記憶體擴容和複制的問題;因為PipeStream基礎記憶體是SocketAsyncEventArgs Buffer,是以在資料和網絡緩存讀寫并不存在記憶體塊複制過程。如果在應用中中使用PipeStream相應的BinaryReader和IBinaryWriter讀寫規範,那大部分資料處理基本不存在記憶體複制過程,進而讓資料處理性能更高效。

.NET Core 實作的 TCP 通訊元件 BeetleXBeetleX

以下是PipeStream的結構:

https://gitee.com/ikende/BeetleX#%E6%80%A7%E8%83%BD 性能

beetleX的性能到底怎樣呢,以下簡單和DotNetty進行一個網絡資料交換的性能測試,分别是1K,5K和10K連接配接數下資料請求并發測試

https://gitee.com/ikende/BeetleX#dotnetty%E6%B5%8B%E8%AF%95%E4%BB%A3%E7%A0%81 DotNetty測試代碼

public override void ChannelRead(IChannelHandlerContext context, object message)
        {
            var buffer = message as IByteBuffer;
            context.WriteAsync(message);
        }           

https://gitee.com/ikende/BeetleX#beetlex-%E6%B5%8B%E8%AF%95%E4%BB%A3%E7%A0%81 Beetlex 測試代碼

public override void SessionReceive(IServer server, SessionReceiveEventArgs e)
        {
            server.Send(e.Stream.ToPipeStream().GetReadBuffers(), e.Session);
            base.SessionReceive(server, e);
        }           

https://gitee.com/ikende/BeetleX#%E6%B5%8B%E8%AF%95%E7%BB%93%E6%9E%9C 測試結果

https://gitee.com/ikende/BeetleX#1k-connections 1K connections

.NET Core 實作的 TCP 通訊元件 BeetleXBeetleX
.NET Core 實作的 TCP 通訊元件 BeetleXBeetleX

https://gitee.com/ikende/BeetleX#5k-connections 5K connections

.NET Core 實作的 TCP 通訊元件 BeetleXBeetleX
.NET Core 實作的 TCP 通訊元件 BeetleXBeetleX

https://gitee.com/ikende/BeetleX#10k-connections 10K connections

.NET Core 實作的 TCP 通訊元件 BeetleXBeetleX
.NET Core 實作的 TCP 通訊元件 BeetleXBeetleX

https://gitee.com/ikende/BeetleX#%E6%9E%84%E5%BB%BAtcp-server 建構TCP Server

class Program : ServerHandlerBase
    {
        private static IServer server;

        public static void Main(string[] args)
        {
            NetConfig config = new NetConfig();
            //ssl
            //config.SSL = true;
            //config.CertificateFile = @"c:\ssltest.pfx";
            //config.CertificatePassword = "123456";
            server = SocketFactory.CreateTcpServer<Program>(config);
            server.Open();
            Console.Write(server);
            Console.Read();
        }
        public override void SessionReceive(IServer server, SessionReceiveEventArgs e)
        {
            string name = e.Stream.ToPipeStream().ReadLine();
            Console.WriteLine(name);
            e.Session.Stream.ToPipeStream().WriteLine("hello " + name);
            e.Session.Stream.Flush();
            base.SessionReceive(server, e);
        }
    }           

https://gitee.com/ikende/BeetleX#%E6%9E%84%E5%BB%BAtcp-client 建構TCP Client

class Program
    {
        static void Main(string[] args)
        {
            TcpClient client = SocketFactory.CreateClient<TcpClient>("127.0.0.1", 9090);
            //ssl
            //TcpClient client = SocketFactory.CreateSslClient<TcpClient>("127.0.0.1", 9090, "localhost");
            while (true)
            {
                Console.Write("Enter Name:");
                var line = Console.ReadLine();
                client.Stream.ToPipeStream().WriteLine(line);
                client.Stream.Flush();
                var reader = client.Read();
                line = reader.ToPipeStream().ReadLine();
                Console.WriteLine(line);
            }
            Console.WriteLine("Hello World!");
        }
    }           

https://gitee.com/ikende/BeetleX#%E5%BC%82%E6%AD%A5client 異步Client

class Program
    {
        static void Main(string[] args)
        {
            AsyncTcpClient client = SocketFactory.CreateClient<AsyncTcpClient>("127.0.0.1", 9090);
            //SSL
            //AsyncTcpClient client = SocketFactory.CreateSslClient<AsyncTcpClient>("127.0.0.1", 9090, "serviceName");
            client.ClientError = (o, e) =>
            {
                Console.WriteLine("client error {0}@{1}", e.Message, e.Error);
            };
            client.Receive = (o, e) =>
            {
                Console.WriteLine(e.Stream.ToPipeStream().ReadLine());
            };
            var pipestream = client.Stream.ToPipeStream();
            pipestream.WriteLine("hello henry");
            client.Stream.Flush();
            Console.Read();
        }
    }           

實作一個Protobuf對象解釋器

public class Packet : FixedHeaderPacket
    {
        public Packet()
        {
            TypeHeader = new TypeHandler();
        }

        private PacketDecodeCompletedEventArgs mCompletedEventArgs = new PacketDecodeCompletedEventArgs();

        public void Register(params Assembly[] assemblies)
        {
            TypeHeader.Register(assemblies);
        }

        public IMessageTypeHeader TypeHeader { get; set; }

        public override IPacket Clone()
        {
            Packet result = new Packet();
            result.TypeHeader = TypeHeader;
            return result;
        }

        protected override object OnReader(ISession session, PipeStream reader)
        {
            Type type = TypeHeader.ReadType(reader);
            int bodySize = reader.ReadInt32();
            return reader.Stream.Deserialize(bodySize, type);
        }

        protected override void OnWrite(ISession session, object data, PipeStream writer)
        {
            TypeHeader.WriteType(data, writer);
            MemoryBlockCollection bodysize = writer.Allocate(4);
            int bodyStartlegnth = (int)writer.CacheLength;
            ProtoBuf.Meta.RuntimeTypeModel.Default.Serialize(writer.Stream, data);
            bodysize.Full((int)writer.CacheLength - bodyStartlegnth);
        }
    }           

本文來自雲栖社群合作夥伴“開源中國”

本文作者:局長

原文連結