原創 Se7en Se7en的架構筆記 2021-03-27 20:52
為什麼使用 Netty
Netty 是一個異步事件驅動的網絡應用程式架構,用于快速開發可維護的高性能和高伸縮性的伺服器和用戶端。Netty 擁有高性能,吞吐量更高,延遲更低,減少資源消耗,最小化不必要的記憶體複制等優點。
Netty 和 NIO
NIO 的缺點
- NIO 的類庫和 API 繁雜,學習成本高,你需要熟練掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
- 需要熟悉 Java 多線程程式設計。這是因為 NIO 程式設計涉及到 Reactor 模式,你必須對多線程和網絡程式設計非常熟悉,才能寫出高品質的 NIO 程式。
- 臭名昭著的 epoll bug。它會導緻 Selector 空輪詢,最終導緻 CPU 使用率飙升至 100%。直到 JDK1.7 版本依然沒得到根本性的解決。
Netty 的優點
- Netty 對 JDK 自帶的 NIO 的 API 進行了良好的封裝,API使用簡單,學習成本低。
- 功能強大,内置了多種解碼編碼器,支援多種協定。
- 性能高,對比其他主流的 NIO 架構,Netty 的性能最優。
- 社群活躍,發現 BUG 會及時修複,疊代版本周期短,不斷加入新的功能。Dubbo、Elasticsearch 都采用了 Netty,品質得到驗證。
Netty線程模型

模型解釋
- 1.Netty 抽象出兩組線程池 BossGroup 和 WorkerGroup,BossGroup 專門負責接收用戶端的連接配接, WorkerGroup 專門負責網絡的讀寫。
- 2.BossGroup 和 WorkerGroup 類型都是 NioEventLoopGroup。NioEventLoopGroup 相當于是一個事件循環線程組,這個組内含有多個事件循環線程,每一個事件循環線程都是 NioEventLoop。
- 3.每個 NioEventLoop 都有一個 Selector,用于監聽注冊在其上的 SocketChannel 的網絡通訊。
- 4.每個 Boss NioEventLoop 線程内部循環執行的3個步驟:
-
- 處理 Accept 事件,與用戶端建立連接配接,生成 NioSocketChannel。
- 将 NioSocketChannel 注冊到某個 Worker NioEventLoop 上的 Selector。
- 處理任務隊列的任務,即 runAllTask。
- 5.每個 Worker NioEventLoop 線程内部循環執行的3個步驟:
-
- 輪詢注冊到自己 Selector 上的所有 NioSocketChannel 的 read,write 事件。
- 處理 I/O 事件,即 read,write 事件,在對應的 NioSocketChannel 處理業務。
- runAllTasks 處理任務隊列 TaskQueue 的任務,一些耗時的業務處理一般可以放入 TaskQueue 中慢慢處理,這樣不影響資料在 Pipeline 中的流動處理。
- 6.每個 Worker NioEventLoop 處理 NioSocketChannel 業務時,會使用 Pipeline(管道),Pipeline 中維護了很多的 handler 處理器用來處理 NioSocketChannel 中的資料。
Netty 用戶端 & 伺服器開發
建立并配置伺服器啟動器
Bootstrap、ServerBootstrap
Bootstrap 意思是引導,一個 Netty 應用通常由一個 Bootstrap 開始,主要作用是配置整個 Netty 程式,串聯各個元件,Netty 中 Bootstrap 類是用戶端程式的啟動引導類,ServerBootstrap 是服務端啟動引導類。
Bootstrap 和 ServerBootStrap 是 Netty 提供的一個建立用戶端和服務端啟動器的工廠類,使用這個工廠類非常便利地建立啟動類。
group()
服務端要使用兩個線程組:
- bossGroup 用于監聽用戶端連接配接,專門負責與用戶端建立連接配接,并把連接配接注冊到 workerGroup 的 Selector 中。
- workerGroup 用于處理每一個連接配接發生的讀寫事件。
一般建立線程組直接使用以下new就完事了:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
預設的線程數是cpu核數的兩倍。假設想自定義線程數,可以使用有參構造器:
//設定bossGroup線程數為1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//設定workerGroup線程數為8
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
channel()
這個方法用于設定通道類型,當建立連接配接後,會根據這個設定建立對應的 Channel 執行個體。
常用的就是這兩個通道類型,因為是異步非阻塞的。是以是首選:
- NioSocketChannel:異步非阻塞的用戶端 TCP Socket 連接配接。
- NioServerSocketChannel:異步非阻塞的伺服器端 TCP Socket 連接配接。
還有就是同步阻塞的,一般沒什麼人用:
- OioSocketChannel:同步阻塞的用戶端 TCP Socket 連接配接。
- OioServerSocketChannel:同步阻塞的伺服器端 TCP Socket 連接配接。
option() 與 childOption()
option() 設定的是服務端用于接收進來的連接配接,也就是 boosGroup 線程。option() 常用參數:
SO_BACKLOG //Socket 參數,服務端接受連接配接的隊列長度,如果隊列已滿,用戶端連接配接将被拒絕。預設值,Windows 為200,其他為128。
childOption() 是提供給父管道接收到的連接配接,也就是 workerGroup 線程。childOption() 常用的參數:
SO_RCVBUF //Socket 參數,TCP 資料接收緩沖區大小。
TCP_NODELAY //TCP 參數,立即發送資料,預設值為 True。
SO_KEEPALIVE //Socket 參數,連接配接保活,預設值為 False。啟用該功能時,TCP 會主動探測空閑連接配接的有效性。
pipeline(ChannelPipeline)
ChannelPipeline 是 Netty 處理請求的責任鍊,ChannelHandler 則是具體處理請求的處理器。在 Netty 中每個 Channel 都有且僅有一個 ChannelPipeline 與之對應,它們的組成關系如下:
處理器 Handler 分為兩種:ChannelInboundHandlerAdapter(入站處理器)、ChannelOutboundHandler(出站處理器)。
一個 Channel 包含了一個 ChannelPipeline,而 ChannelPipeline 中又維護了一個由 ChannelHandlerContext 組成的雙向連結清單,并且每個 ChannelHandlerContext 中又關聯着一個 ChannelHandler,通過 ChannelHandlerContext 上下文對象,就可以拿到 Channel、Pipeline 等對象,就可以進行讀寫等操作。
read事件(入站事件)和write事件(出站事件)在一個雙向連結清單中,入站事件會從連結清單 head 往後傳遞到最後一個入站的 handler,出站事件會從連結清單 tail 往前傳遞到最前一個出站的 Handler。兩種類型的 Handler 互不幹擾,相同類型的 Handler 的處理順序是有影響的。
在 Bootstrap 中 childHandler() 方法需要初始化通道,執行個體化一個 ChannelInitializer,這時候需要重寫 initChannel() 初始化通道的方法,裝配流水線就是在這個地方進行。代碼如下:
//使用匿名内部類的形式初始化通道對象
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { //建立通道初始化對象,設定初始化參數,在 SocketChannel 建立起來之前執行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//對 workerGroup的SocketChannel 設定處理器,調用我們自定義的 NettyServerHandler
ch.pipeline().addLast(new NettyServerHandler());
}
});
自定義 NettyServerHandler 代碼如下:
package com.chengzw.netty.base;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* 自定義 Handler 需要繼承 netty 規定好的某個 HandlerAdapter(規範)
* @author 程治玮
* @since 2021/3/25 9:31 下午
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 讀取用戶端發送的資料
*
* @param ctx 上下文對象, 含有通道 channel,管道 pipeline
* @param msg 就是用戶端發送的資料
* @throws Exception
*/
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("伺服器讀取線程 " + Thread.currentThread().getName());
//将 msg 轉成一個 ByteBuf,類似 NIO 的 ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("用戶端發送的消息是:" + buf.toString(CharsetUtil.UTF_8));
}
/**
* 資料讀取完畢後的處理方法
*
* @param ctx 上下文對象, 含有通道 channel,管道 pipeline
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
/**
* 處理異常, 一般是需要關閉通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
bind()
提供用于服務端或者用戶端綁定伺服器位址和端口号,預設是異步啟動,如果加上 sync() 方法則是同步。
// sync 同步
ChannelFuture channelFuture = bootstrap.bind(9000).sync();
// 異步
// 給cf注冊監聽器,監聽我們關心的事件
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("監聽端口9000成功");
} else {
System.out.println("監聽端口9000失敗");
}
}
});
優雅地關閉 EventLoopGroup
//釋放掉所有的資源,包括建立的線程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
服務端啟動器完整代碼
package com.chengzw.netty.base;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Netty 服務端
* @author 程治玮
* @since 2021/3/25 9:31 下午
*/
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 建立兩個線程組 bossGroup 和 workerGroup, 含有的子線程 NioEventLoop 的個數預設為cpu核數的兩倍
// bossGroup 隻是處理連接配接請求 ,真正的和用戶端業務處理,會交給 workerGroup 完成
EventLoopGroup bossGroup = new NioEventLoopGroup(1); //1個線程
EventLoopGroup workerGroup = new NioEventLoopGroup(8); //8個線程
try {
// 建立伺服器端的啟動對象
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用鍊式程式設計來配置參數
bootstrap.group(bossGroup, workerGroup) //設定兩個線程組
// 使用 NioServerSocketChannel 作為伺服器的通道實作,該類用于執行個體化新的 Channel 來接收用戶端的連接配接
.channel(NioServerSocketChannel.class)
// 初始化伺服器連接配接隊列大小,服務端處理用戶端連接配接請求是順序處理的,是以同一時間隻能處理一個用戶端連接配接。
// 多個用戶端同時來的時候,服務端将不能處理的用戶端連接配接請求放在隊列中等待處理
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() { //建立通道初始化對象,設定初始化參數,在 SocketChannel 建立起來之前執行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//對 workerGroup的SocketChannel 設定處理器,調用我們自定義的 NettyServerHandler
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start ...");
// 綁定一個端口并且同步, 生成了一個 ChannelFuture 異步對象,通過 isDone() 等方法可以判斷異步事件的執行情況
// 啟動伺服器(并綁定端口),bind 是異步操作,sync 方法是等待異步操作執行完畢
// sync 同步
ChannelFuture channelFuture = bootstrap.bind(9000).sync();
// 異步
// 給cf注冊監聽器,監聽我們關心的事件
/*channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("監聽端口9000成功");
} else {
System.out.println("監聽端口9000失敗");
}
}
});*/
// 等待服務端監聽端口關閉,closeFuture是異步操作
// 通過sync方法同步等待通道關閉處理完畢,這裡會阻塞等待通道關閉完成,内部調用的是Object的wait()方法
// 在這裡面cf.channel().closeFuture().sync();這個語句的主要目的是,如果缺失上述代碼,則main方法所在的線程,
// 即主線程會在執行完bind().sync()方法後,會進入finally 代碼塊,之前的啟動的nettyserver也會随之關閉掉,整個程式都結束了。
// 原文的例子有英文注釋:
// Wait until the server socket is closed,In this example, this does not happen, but you can do that to gracefully shut down your server.
// 線程進入wait狀态,也就是main線程暫時不會執行到finally裡面,nettyserver也持續運作,如果監聽到關閉事件,可以優雅的關閉通道和nettyserver,
channelFuture.channel().closeFuture().sync();
} finally {
// 資源優雅釋放
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
建立并配置用戶端啟動器
用戶端隻需要一個 NioEventLoopGroup,其餘代碼和伺服器類似。首先自定義 NettyClientHandler 用于處理用戶端 ChannelPipeline 的業務。
package com.chengzw.netty.base;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* 自定義 Handler 需要繼承 netty 規定好的某個 HandlerAdapter(規範)
* @author 程治玮
* @since 2021/3/25 9:50 下午
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 當用戶端連接配接伺服器完成就會觸發該方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
/**
* 當通道有讀取事件時會觸發,即服務端發送資料給用戶端
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到服務端的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服務端的位址: " + ctx.channel().remoteAddress());
}
/**
* 處理異常, 一般是需要關閉通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
然後配置用戶端啟動器:
package com.chengzw.netty.base;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Netty 用戶端
* @author 程治玮
* @since 2021/3/25 9:52 下午
*/
public class NettyClient {
public static void main(String[] args) throws Exception {
//用戶端需要一個事件循環組
EventLoopGroup group = new NioEventLoopGroup();
try {
//建立用戶端啟動對象
//注意用戶端使用的不是ServerBootstrap而是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//設定相關參數
bootstrap.group(group) //設定線程組
.channel(NioSocketChannel.class) // 使用NioSocketChannel作為用戶端的通道實作
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入處理器
ch.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("netty client start..");
//啟動用戶端去連接配接伺服器端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();
//對通道關閉進行監聽
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}