一、概要
在上一篇文章講到Dotnetty的基本認識,本文這次會講解dotnetty非常核心的子產品是屬于比較硬核的幹貨了,然後繼續往下講解如何根據自己的需求或者自己的喜好去配置Dotnetty而不是生搬硬套官網的示例源碼。如果看了本文有收獲的話麻煩關注一下文章尾部的公衆号和技術讨論群。各位的支援是對我莫大的幫助。
二、簡介
主要講解一下幾個知識點:
- EventLoopGroup & EventLoop
- Bootstrap
- Channel
- ChannelPipeline & ChannelHandler
- ChannelHandlerContext
- ChannelHandler
三、詳細内容
1.EventLoopGroup & EventLoop
- 高性能RPC架構的3個要素:IO模型、資料協定、線程模型
- EventLoop好比一個線程,1個EventLoop可以服務多個Channel,1個Channel隻有一個EventLoop可以建立多個 EventLoop 來優化資源利用,也就是EventLoopGroup。
- EventLoopGroup 負責配置設定 EventLoop 到新建立的 Channel,裡面包含多個EventLoop
- EventLoopGroup →多個 EventLoop ,EventLoop →維護一個 Selector。

2.伺服器啟動引導類:ServerBootstrap
Group :設定線程組模型,Reactor線程模型對比EventLoopGroup
- 單線程
- 多線程
- 主從線程
Channel:設定channel通道類型NioServerSocketChannel、OioServerSocketChannel
Option: 作用于每個建立立的channel,設定TCP連接配接中的一些參數,如下:
- ChannelOption.SO_BACKLOG: 存放已完成三次握手的請求的等待隊列的最大長度;
- ChannelOption.TCP_NODELAY: 為了解決Nagle的算法問題,預設是false, 要求高實時性,有資料時馬上發送,就将該選項設定為true關閉Nagle算法;如果要減少發送次數,就設定為false,會累積一定大小後再發送。
- ChildOption: 作用于被accept之後的連接配接
- ChildHandler: 用于對每個通道裡面的資料處理
3.連接配接通道類:Channel
Channel: 用戶端和服務端建立的一個連接配接通道(可以了解為一個channel就是一個socket連接配接) ChannelHandler: 負責Channel的邏輯處理 ChannelPipeline: 負責管理ChannelHandler的有序容器
關系: 一個Channel包含一個ChannelPipeline,所有ChannelHandler都會順序加入到ChannelPipeline中 建立 Channel時會自動建立一個ChannelPipeline,每個Channel都有一個管理它的pipeline,這關聯是永久 性的Channel當狀态出現變化,就會觸發對應的事件。
生命周期:
- ChannelRegistered: channel注冊到一個EventLoop
- ChannelActive: 變為活躍狀态(連接配接到了遠端主機),可以接受和發送資料
- ChannelInactive: channel處于非活躍狀态,沒有連接配接到遠端主機
- ChannelUnregistered: channel已經建立,但是未注冊到一個EventLoop裡面,也就是沒有和Selector綁定
4.頻道的内部實作 ChannelHandler & ChannelPipeline
- ChannelInboundHandler:(入站) 處理輸入資料和Channel狀态類型改變,擴充卡。
- ChannelInboundHandlerAdapter(擴充卡設計模式) 常用的:SimpleChannelInboundHandler
- ChannelOutboundHandler:(出站) 處理輸出資料,擴充卡 ChannelOutboundHandlerAdapter
- ChannelPipeline: 好比廠裡的流水線一樣,可以在上面添加多個ChannelHanler,也可看成是一串
- ChannelHandler 執行個體,攔截穿過 Channel 的輸入輸出 event, ChannelPipeline 實作了攔截器的一種進階形 式,使得使用者可以對事件的處理以及ChannelHanler之間互動獲得完全的控制權。
5.頻道的内部實作 ChannelHandler & ChannelPipeline
ChannelHandlerContext是連接配接ChannelHandler和ChannelPipeline的橋梁,ChannelHandlerContext部分方法和Channel及ChannelPipeline重合。
- 好比調用write方法Channel、ChannelPipeline、ChannelHandlerContext 都可以調用此方法,前兩者都會在整個管道流裡 傳播,而ChannelHandlerContext就隻會在後續的Handler裡面傳播。
- AbstractChannelHandlerContext類雙向連結清單結構,next/prev分别是後繼節點,和前驅節點。
- DefaultChannelHandlerContext 是實作類,但是大部分都是父類那邊完成,這個隻是簡單的實作一些方法 主要就是判斷Handler的類型。
- ChannelInboundHandler之間的傳遞,主要通過調用ctx裡面的FireXXX()方法來實作下個handler的調用。
6.Handler執行順序
一般的項目中,inboundHandler和outboundHandler有多個,在Pipeline中的執行順序?
InboundHandler順序執行,OutboundHandler逆序執行
- InboundHandler之間傳遞資料,通過context.fireChannelRead(message)
- InboundHandler通過context.write(message),則會傳遞到outboundHandler
- 使用context.write(msg)傳遞消息,Inbound需要放在結尾,在Outbound之後,不然outboundhandler會不執行; 但是使用channel.write(msg)、pipline.write(msg)情況會不一緻,都會執行。
- OutBound和Inbound誰先執行,針對用戶端和服務端而言,用戶端是發起請求再接受資料,先outbound再 inbound,服務端則相反。
四、實戰環節
以上概念性的東西介紹完了之後開始編寫本章實戰代碼(完整的案例代碼将在qq群檔案共享裡上傳,文章末尾有QQ群二維碼和聯系方式)。接下來我們先看一下項目結構。
Handlers - 主要存放所有處理相關類。
Initializer - 存放初始化tcp服務的相關内容。
appsetting.json - 主要存放的内容為,服務端的相關配置例如:ip位址、端口号等。
dotnetty - 安全證書
Program - 啟動類
項目結構介紹完畢之後,我大緻将這個demo分為5個部分來實作具體根據自己需求去設計搭建結構都是可以的,這裡的内容僅供參考。
- 第一步,配置建構引導類
1 //主要工作組,設定為2個線程
2 private static readonly IEventLoopGroup bossGroup = new MultithreadEventLoopGroup(4);
3 //子工作組,預設為核心數*2的線程數
4 private static readonly IEventLoopGroup workerGroup = new MultithreadEventLoopGroup();
5
6 static async Task RunAsync() {
7 /*
8 *初始化服務端引導對象。
9 *聲明一個服務端Bootstrap,每個Netty服務端程式,都由ServerBootstrap控制,
10 *通過鍊式的方式組裝需要的參數
11 */
12 ServerBootstrap bootstrap = new ServerBootstrap();
13 //添加工作組
14 bootstrap.Group(bossGroup, workerGroup);
15 //初始化工作頻道
16 bootstrap.Channel<TcpServerSocketChannel>();
17 bootstrap
18 //存放已完成三次握手的請求的等待隊列的最大長度;
19 .Option(ChannelOption.SoBacklog, 1024)
20 //ByteBuf的配置設定器(重用緩沖區)大小
21 .Option(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default)
22 //接收字元的長度
23 .Option(ChannelOption.RcvbufAllocator, new FixedRecvByteBufAllocator(1024 * 8))
24 //保持長連接配接
25 .ChildOption(ChannelOption.SoKeepalive, true)
26 //取消延遲發送
27 .ChildOption(ChannelOption.TcpNodelay, true)
28 //端口複用
29 .ChildOption(ChannelOption.SoReuseport, true)
30 //初始化日志攔截器,可以不加
31 .Handler(new LoggingHandler("SRV-LSTN"))
32 //自定義初始化Tcp服務
33 .ChildHandler(new EchoServerInitializer());
34
35 //綁定服務端,端口号。IP位址預設讀取項目配置檔案。
36 await bootstrap.BindAsync(ServerSettings.Port);
37 }
- 第二步,初始化Channel相關處理類
1 /// <summary>
2 /// 初始化
3 /// </summary>
4 public class EchoServerInitializer : ChannelInitializer<ISocketChannel>
5 {
6 /// <summary>
7 /// No interaction time.300s
8 /// </summary>
9 public const int AllTimeOut = 60 * 5;
10
11 /// <summary>
12 /// Read Time Out.60s
13 /// </summary>
14 public const int ReadTimeOut = 60;
15
16 /// <summary>
17 /// Recive Time Out.60s
18 /// </summary>
19 public const int WriterTimeOut = 60;
20
21 protected override void InitChannel(ISocketChannel channel)
22 {
23 /*
24 * 工作線程連接配接器是設定了一個頻道,服務端主線程所有接收到的資訊都會通過這個管道一層層往下傳輸
25 * 同時所有出棧的消息 也要這個頻道的所有處理器進行一步步處理
26 */
27 IChannelPipeline pipeline = channel.Pipeline;
28 //初始化Dotnetty日志攔截器
29 pipeline.AddLast(new LoggingHandler("SRV-CONN"));
30 //心跳逾時時間配置
31 pipeline.AddLast(new IdleStateHandler(
32 ReadTimeOut,
33 WriterTimeOut,
34 AllTimeOut));
35 //消息内容編碼邏輯處理類
36 pipeline.AddLast("encoder", new EncoderHandler());
37 //解碼邏輯處理類
38 pipeline.AddLast("decoder", new DecoderHandler());
39 //心跳邏輯處理
40 pipeline.AddLast(new HeartBeatHandler());
41 //每個頻道請求消息處理類
42 pipeline.AddLast(new ServerHandler());
43 }
44 }
- 第三步,配置、實作心跳處理機制
1 public class HeartBeatHandler : ChannelHandlerAdapter
2 {
3 /// <summary>
4 /// 每個頻道都有自己的心跳管理,如果頻道長時間不操作踢掉線的邏輯可以寫在這裡
5 /// </summary>
6 /// <param name="context"></param>
7 /// <param name="evt"></param>
8 public override void UserEventTriggered(IChannelHandlerContext context, object evt)
9 {
10 var eventState = evt as IdleStateEvent;
11 if (eventState != null)
12 {
13 String type = string.Empty;
14 if (eventState.State == IdleState.ReaderIdle)
15 {
16 type = "read idle";//沒有任何接受
17 }
18 else if (eventState.State == IdleState.WriterIdle)
19 {
20 type = "write idle";//沒有任何寫入
21 }
22 else if (eventState.State == IdleState.AllIdle)
23 {
24 type = "all idle";
25 context.CloseAsync();//5分鐘内無任何互動則斷開該用戶端連接配接
26 }
27 }
28 else
29 {
30 base.UserEventTriggered(context, evt);
31 }
32 }
33 }
- 第四步,編碼、解碼
1 /// <summary>
2 /// 解碼
3 /// </summary>
4 public class DecoderHandler : ByteToMessageDecoder
5 {
6 protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
7 {
8 throw new NotImplementedException();
9 }
10 }
11
12
13
14 public class EncoderHandler : MessageToByteEncoder<byte[]>
15 {
16 /// <summary>
17 /// 編碼
18 /// </summary>
19 /// <param name="context"></param>
20 /// <param name="message"></param>
21 /// <param name="output"></param>
22 protected override void Encode(IChannelHandlerContext context, byte[] message, IByteBuffer output)
23 {
24 throw new NotImplementedException();
25 }
26 }
- 第五步,Channel邏輯處理實作
1 public class ServerHandler : ChannelHandlerAdapter
2 {
3 /*
4 * Channel的生命周期
5 * 1.ChannelRegistered 先注冊
6 * 2.ChannelActive 再被激活
7 * 3.ChannelRead 用戶端與服務端建立連接配接之後的會話(資料互動)
8 * 4.ChannelReadComplete 讀取用戶端發送的消息完成之後
9 * error. ExceptionCaught 如果在會話過程當中出現dotnetty架構内部異常都會通過Caught方法傳回給開發者
10 * 5.ChannelInactive 使目前頻道處于未激活狀态
11 * 6.ChannelUnregistered 取消注冊
12 */
13
14 /// <summary>
15 /// 頻道注冊
16 /// </summary>
17 /// <param name="context"></param>
18 public override void ChannelRegistered(IChannelHandlerContext context)
19 {
20 base.ChannelRegistered(context);
21 }
22
23 /// <summary>
24 /// socket client 連接配接到服務端的時候channel被激活的回調函數
25 /// </summary>
26 /// <param name="context"></param>
27 public override void ChannelActive(IChannelHandlerContext context)
28 {
29 //一般可用來記錄連接配接對象資訊
30 base.ChannelActive(context);
31 }
32
33 /// <summary>
34 /// socket接收消息方法具體的實作
35 /// </summary>
36 /// <param name="context">目前頻道的句柄,可使用發送和接收方法</param>
37 /// <param name="message">接收到的用戶端發送的内容</param>
38 public override void ChannelRead(IChannelHandlerContext context, object message)
39 {
40 var buffer = message as IByteBuffer;
41 if (buffer != null)
42 {
43 Console.WriteLine("Received from client: " + buffer.ToString(Encoding.UTF8));
44 }
45 context.WriteAsync(message);//發送給用戶端方法
46 }
47
48 /// <summary>
49 /// 該次會話讀取完成後回調函數
50 /// </summary>
51 /// <param name="context"></param>
52 public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();//
53
54 /// <summary>
55 /// 異常捕獲
56 /// </summary>
57 /// <param name="context"></param>
58 /// <param name="exception"></param>
59 public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
60 {
61 Console.WriteLine("Exception: " + exception);
62 context.CloseAsync();
63 }
64
65 /// <summary>
66 /// 目前頻道未激活狀态
67 /// </summary>
68 /// <param name="context"></param>
69 public override void ChannelInactive(IChannelHandlerContext context)
70 {
71 base.ChannelInactive(context);
72 }
73
74 /// <summary>
75 /// 取消注冊目前頻道,可了解為銷毀目前頻道
76 /// </summary>
77 /// <param name="context"></param>
78 public override void ChannelUnregistered(IChannelHandlerContext context)
79 {
80 base.ChannelUnregistered(context);
81 }
82 }
希望大家多多支援。不勝感激。
- E-Mail:[email protected]
- QQ: 580749909(個人群)
- Blog: https://www.cnblogs.com/justzhuzhu/
- Git: https://github.com/JusterZhu
- 微信公衆号