詳細介紹了Netty網絡程式設計架構的核心概念以及入門案例。
文章目錄
- 1 Netty的介紹
- 2 Netty的核心元件
-
- 2.1 Channel
- 2.2 EventLoop
- 2.3 ChannelFuture
- 2.4 ChannelHandler
- 2.5 ChannelPipeline
-
- 2.5.1 ChannelHandlerContext
- 2.5.2 入站和出站
- 2.6 EventLoopGroup
- 3 Netty的線程模型
-
- 3.1 單線程模型
- 3.2 多線程模型
- 3.3 主從多線程模型
- 4 Netty預設啟動的線程數
- 5 Netty的啟動過程
-
- 5.1 服務端
- 5.2 用戶端
- 6 TCP 粘包/拆包的原因以及解決辦法
-
-
- 6.1 原因
- 6.2 解決辦法
-
- 7 Netty的長連接配接、心跳機制
- 8 Netty 的零拷貝
- 9 Netty 和 Tomcat 的差別
- 10 Netty簡單案例
1 Netty的介紹
- 基于事件驅動的Java NIO網絡通信架構,可以快速簡單地開發網絡應用程式。
- 極大地簡化并優化了 TCP 和 UDP 套接字伺服器等網絡程式設計,并且性能以及安全性等很多方面甚至都要更好。
- 支援多種通信協定 如 FTP,SMTP,HTTP 以及各種二進制和基于文本的傳統協定,同樣支援自定義協定。
簡單的說,Netty有三個優點:
- 高并發:基于 NIO開發(Reactor模型),并發性能相比BIO得到了很大提高。
- 傳輸快:傳輸依賴于零拷貝特性,盡量減少不必要的記憶體拷貝,使用高性能序列化協定protobuf,實作了高效傳輸。
- 封裝好:封裝了原始NIO程式設計的很多細節,提供了易于使用調用接口,使用更簡單。
借用官方的描述:Netty 成功地找到了一種在不妥協可維護性和性能的情況下實作易于開發,性能,穩定性和靈活性的方法。
Netty的社群目前非常活躍。很多涉及到網絡調用的開源項目和架構底層都用到了Netty,比如我們常用的 Dubbo、RocketMQ、Elasticsearch、gRPC、Spark、GateWay等等。
總之,涉及到網絡程式設計開發時,比如即時通訊系統、自定義RPC架構、自定義HTTP伺服器、實時消息推送系統等場景下,用Netty,準沒錯。
2 Netty的核心元件
2.1 Channel
通道,Netty網絡操作抽象類,包括基本的 I/O 操作,如 bind、connect、read、write 等,Netty 的 Channel 接口所提供的 API,大大地降低了直接使用 Socket 類的複雜性。
不同協定、不同的阻塞類型的連接配接都有不同的 Channel 類型與之對應,下面是一些常用的 Channel 類型:
- NioSocketChannel,異步的用戶端 TCP Socket 連接配接。
- NioServerSocketChannel,異步的伺服器端 TCP Socket 連接配接。
- NioDatagramChannel,異步的 UDP 連接配接。
- NioSctpChannel,異步的用戶端 Sctp 連接配接。
- NioSctpServerChannel,異步的 Sctp 伺服器端連接配接 這些通道涵蓋了 UDP 和 TCP網絡 IO以及檔案 IO。
2.2 EventLoop
EventLoop(事件循環)接口是Netty的核心接口,用于處理連接配接的生命周期中所發生的各種事件,實際上就是負責監聽網絡事件并調用事件處理器進行相關 I/O 操作的處理。
EventLoop内部持有NIO中的Selector,Channel将會注冊到EventLoop中,一個EventLoop可以監聽多個Channel,EventLoop是實作IO多路複用的核心,可以看作是Reactor模型中的mainReactor。
Channel 為 Netty 網絡操作抽象類,EventLoop 負責監聽注冊到其上的Channel的IO事件,兩者配合完成 I/O 操作。
2.3 ChannelFuture
在Netty中所有的IO操作都是異步的,不能立刻得知消息是否被正确處理。
Channel會注冊到EventLoop中後會立即傳回一個ChannelFuture對象,可以通過ChannelFuture#addListener注冊GenericFutureListener監聽器,當操作執行成功或失敗時監聽會自動觸發注冊的監聽事件。
2.4 ChannelHandler
ChannelHandler 是消息的具體處理器。他負責處理各種任務,這個任務非常廣泛,可以是讀寫事件、連接配接、解碼編碼、資料轉換、業務邏輯等等,處理完畢之後将資料繼續轉發到ChannelPipeline中的下一個ChannelHandler。
通過定制ChannelHandler可對Netty進行擴充。ChannelHandler接口本身并沒有提供很多方法,因為這個接口有許多的方法需要實作,為了友善使用,可以繼承它的子類:
- ChannelInboundHandler用于處理入站I/O事件
- ChannelOutboundHandler用于處理出站I/O操作
或者使用以下擴充卡類,更加友善:
- ChannelInboundHandlerAdapter用于處理入站I/O事件
- ChannelOutboundHandlerAdapter用于處理出站I/O操作
- ChannelDuplexHandler用于處理入站和出站事件
2.5 ChannelPipeline
ChannelPipeline 是一個 ChannelHandler 的連結清單,即ChannelHandler組成的List,提供了一個沿着鍊傳播入站和出站事件流的 API。
可以在 ChannelPipeline 上通過 addLast() 方法添加一個或者多個ChannelHandler ,因為一個資料或者事件可能會被多個 Handler 處理。當一個 ChannelHandler 處理完之後就将資料交給下一個 ChannelHandler 。
在執行時,入站事件會從連結清單head往後傳遞到最後一個入站的handler(ChannelInboundHandler類型),出站事件會從連結清單tail往前傳遞到最前一個出站的handler(ChannelOutboundHandler類型),兩種類型的handler在執行時互不幹擾。如果Handler同時屬于入站、出站Handler,則都會執行一次。

在 Netty 中每個 Channel 都有且僅有一個 ChannelPipeline 與之對應,當 Channel 被建立時,它會被自動地配置設定到它專屬的 ChannelPipeline。
2.5.1 ChannelHandlerContext
用于傳輸業務資料,儲存Channel相關的所有上下文資訊。
将Handler和Pipeline聯系起來,實際上ChannelPipeline中直接存儲的是ChannelHandlerContext,而每個 ChannelHandlerContext 中又關聯着唯一一個 ChannelHandler。
2.5.2 入站和出站
資料入站,一般是指讀事件觸發,即資料要讀進來;資料從底層的Java NIO channel讀取到Netty的Channel,此過程中會進行資料解碼。
資料出站,一般是指寫事件觸發,即資料要寫出去;資料從Netty的Channel寫入底層的 Java NIO chanel,此過程中會進行資料編碼。
入站會從先讀取資料,再執行入站的Handler;出站會先執行出站的Handler,再寫入。
即每次出現讀事件時,會執行入站操作,實際讀取資料之後,會先從頭至尾依次調用ChannelPipeline 中的InboundHandler處理,不會調用OutboundHandler;而觸發寫事件時,會執行出站操作,實際寫入資料之前,則會從尾到頭依次調用ChannelPipeline的OutboundHandler處理,不會調用InboundHandler;
下圖描述了 ChannelPipeline 中的 ChannelHandlers 通常如何處理 I/O 事件(https://netty.io/4.1/api/io/netty/channel/ChannelPipeline.html):
入站事件由入站處理程式按自下而上的方向處理,如圖左側所示。入站處理程式通常處理由圖底部的 I/O 線程生成的原始入站資料,例如通過 SocketChannel.read(ByteBuffer)讀取。
出站事件由出站處理程式按自上而下的方向處理,如圖右側所示。出站處理程式通常會生成或轉換出站流量,例如寫入請求。如果出站事件超出了底部出站處理程式,則由與通道關聯的 I/O 線程處理。I/O 線程執行實際的輸出操作,例如通過 SocketChannel.write(ByteBuffer)輸出。
2.6 EventLoopGroup
EventLoopGroup相當于1個事件循環組,這個組裡包含多個事件循環EventLoop, EventLoop 的主要作用實際就是負責監聽網絡事件并調用事件處理器進行相關 I/O 操作的處理。
EventLoopGroup内部的每個EventLoop通常包含1個Selector和1個事件循環線程,一個EventLoop可以綁定多個Channel,但一個Channel隻能綁定一個EventLoop,這樣某一個連接配接的IO事件就在專有的線程上處理,保證線程安全。
Netty Server端包含1個Boss NioEventLoopGroup和1個Worker NioEventLoopGroup:
- Boss NioEventLoop主要循環執行的工作:
- select監聽accept事件。
- 處理到來的accept事件,與Client建立連接配接,生成SocketChannel,并将SocketChannel注冊到某個Worker NioEventLoop的Selector上。
- 處理任務隊列中的任務,runAllTasks。任務隊列中的任務包括使用者調用eventloop.execute或schedule執行的任務,或者其它線程送出到該eventloop的任務。
- Worker NioEventLoop主要循環執行的工作:
- select監聽read、write事件。
- 處理到來的read、write事件,在NioSocketChannel可讀、可寫事件發生時進行處理。
- 處理任務隊列中的任務,runAllTasks。
3 Netty的線程模型
Netty通過Reactor模型基于多路複用器接收并處理使用者請求,内部實作了兩個線程池,boss線程池和work線程池,其中boss線程池的線程負責處理請求的accept事件,當接收到accept事件的請求時,就會建立連接配接并把對應的socket封裝到一個NioSocketChannel中,并交給work線程池,其中work線程池負責請求的read和write事件,以及業務邏輯,這些都由對應的Handler處理。
Netty 主要靠 NioEventLoopGroup 線程池的配置來實作具體的線程模型。
3.1 單線程模型
bossGroup和workerGroup使用同一個NioEventLoopGroup,且配置線程數為1。
适合連接配接量和并發量都不大的應用。
3.2 多線程模型
bossGroup和workerGroup使用不同NioEventLoopGroup,且bossGroup配置線程數為1。
适合連接配接量不大,并發量大的應用。
3.3 主從多線程模型
bossGroup和workerGroup使用不同NioEventLoopGroup,且都配置為多線程。
适合連接配接量和并發量都比較大的應用。
從一個主線程 NIO 線程池中選擇一個線程作為 Acceptor 線程,綁定監聽端口,接收用戶端連接配接的連接配接,其他線程負責後續的接入認證等工作。連接配接建立完成後分派給workerGroup線程。
4 Netty預設啟動的線程數
EventLoopGroup 預設的構造函數實際會起的線程數為 CPU核心數*2,但bossGroup一般設定數量為1。EventLoopGroup内部的EventLoop數量就是線程數量,保證1對1的關系。
5 Netty的啟動過程
5.1 服務端
首先初始化兩個NioEventLoopGroup,其中boosGroup用于處理用戶端建立TCP連接配接的請求(Accept事件), workerGroup用于處理每一條連接配接的I/O讀寫事件和具體的業務邏輯。
NioEventLoopGroup 類的無參構造函數的預設設定的線程數量是 CPU 核心數 *2 。一般情況下我們會指定 bossGroup 的 線程數為 1(并發連接配接量不大的時候) ,workGroup 的線程數量為 CPU 核心數 *2 。
随後建立一個ServerBootstrap,它是服務端的啟動引導類/輔助類,它将引導我們進行服務端的啟動工作。通過ServerBootstrap配置EventLoopGroup、Channel類型,連接配接參數、配置入站、出站事件handler等。
最後通過bind()方法綁定端口,開始工作。
public class NettyServer {
static int port = 8888;
public static void main(String[] args) {
//1 bossGroup 用于接收連接配接 mainReactor
//workerGroup 用于具體的處理 subReactor
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//2.建立服務端啟動引導/輔助類:ServerBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
//3.給引導類配置兩大線程組,确定了線程模型
serverBootstrap
.group(bossGroup, workerGroup)
// 4.指定 IO 模型
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
//5.可以自定義用戶端消息的業務處理邏輯Handler
p.addLast(new HelloServerHandler());
p.addLast(…………);
}
});
// 6.綁定端口,調用 sync 方法阻塞直到綁定完成
ChannelFuture f = serverBootstrap.bind(1234).sync();
// 7.阻塞等待直到伺服器Channel關閉(closeFuture()方法擷取Channel 的CloseFuture對象,然後調用sync()方法)
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//8.優雅關閉相關線程組資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
5.2 用戶端
首先初始化一個NioEventLoopGroup。
随後建立一個Bootstrap,它是用戶端的啟動引導類/輔助類,它将引導我們進行用戶端的啟動工作。通過Bootstrap配置EventLoopGroup、Channel類型,連接配接參數、配置入站、出站事件handler等。
最後通過connect()方法使用服務端的ip和port進行連接配接,開始工作。
public class NettyClient {
static int port = 8888;
static String host = "127.0.0.1";
public static void main(String[] args) {
//1.建立一個 NioEventLoopGroup 對象執行個體
EventLoopGroup group = new NioEventLoopGroup();
try {
//2.建立用戶端啟動引導/輔助類:Bootstrap
Bootstrap bootstrap = new Bootstrap();
//3.指定線程組
bootstrap.group(group)
//4.指定 IO 模型
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// 5.這裡可以自定義消息的業務處理邏輯
p.addLast(new HelloClientHandler(message));
p.addLast(…………);
}
});
// 6.嘗試建立連接配接
ChannelFuture f = bootstrap.connect(host, port).sync();
// 7.等待連接配接關閉(阻塞,直到Channel關閉)
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
6 TCP 粘包/拆包的原因以及解決辦法
6.1 原因
TCP是以流的方式來處理資料,底層會有一個緩沖區,一個完整的較大的包可能會被TCP拆分成多個包進行發送,也可能把多個小的包封裝成一個大的資料包發送。
TCP粘包/拆包的原因:應用程式寫入的位元組大小大于套接字發送緩沖區的大小,會發生拆包現象,實際表現就是不能收到完整的消息。而應用程式寫入資料小于套接字緩沖區大小,網卡将應用多次寫入的資料發送到網絡上,這将會發生粘包現象,實際表現就是一次性收到多條粘連在一起消息。
報頭的選項字段有MSS(Maximum Segment Size,最大封包段大小)字段,規定一個TCP包最大可傳輸的位元組數,一般是1500-20-20=1460位元組,大于該大小時将發生拆包。
6.2 解決辦法
- 使用 Netty 自帶的解碼器
- LineBasedFrameDecoder : 發送端發送資料包的時候,每個資料包之間以換行符作為分隔,即\n或者\r\n,其工作原理是它依次周遊 ByteBuf 中的可讀位元組,判斷是否有換行符,然後進行相應的截取。
- DelimiterBasedFrameDecoder : 可以自定義分隔符解碼器,其實際上是一種特殊的DelimiterBasedFrameDecoder 解碼器。
- FixedLengthFrameDecoder: 固定長度解碼器,它能夠按照指定的長度對消息進行相應的拆包。需要約定每一個包的固定大小。
- LengthFieldBasedFrameDecoder:将消息分為消息頭和消息體。在頭部中儲存有目前整個消息的長度,隻有在讀取到足夠長度的消息之後才算是讀到了一個完整的消息。
- 通過自定義協定進行粘包和拆包的處理。
7 Netty的長連接配接、心跳機制
Netty用戶端和伺服器采用長連接配接保持聯系。client 與 server 完成一次讀寫,它們之間的連接配接并不會主動關閉,後續的讀寫操作會繼續使用這個連接配接。長連接配接的可以省去較多的 TCP 建立和關閉的操作,降低對網絡資源的依賴,節約時間。
在 TCP 保持長連接配接的過程中,可能會出現斷網等網絡異常出現,異常發生的時候, client 與 server 之間如果沒有互動的話,它們是無法發現對方已經掉線的。為了解決這個問題, 我們就需要引入心跳機制 。
心跳機制的工作原理是: 在 client 與 server 之間在一定時間内沒有資料互動時, 即處于 idle 狀态時,用戶端或伺服器就會發送一個特殊的資料包給對方,當接收方收到這個資料封包後,也立即發送一個特殊的資料封包,回應發送方,此即一個 PING-PONG 互動。是以,當某一端收到心跳消息後,就知道了對方仍然線上,這就確定 TCP 連接配接的有效性。
TCP 實際上自帶的就有長連接配接選項,本身是也有心跳包機制,也就是 TCP 的選項:SO_KEEPALIVE。但是,TCP 協定層面的長連接配接靈活性不夠。是以,一般情況下我們都是在應用層協定上實作自定義心跳機制的,也就是在 Netty 層面通過編碼實作。通過 Netty 實作心跳機制的話,核心類是 IdleStateHandler 。
Netty支援的哪些心跳類型設定:
- readerIdleTime:為讀逾時時間(即測試端一定時間内未接受到被測試端消息)。
- writerIdleTime:為寫逾時時間(即測試端一定時間内向被測試端發送消息)。
- allIdleTime:所有類型的逾時時間。
8 Netty 的零拷貝
零複制(英語:Zero-copy;也譯零拷貝)技術是指計算機執行操作時,CPU 不需要先将資料從某處記憶體複制到另一個特定區域。這種技術通常用于通過網絡傳輸檔案時節省 CPU 周期和記憶體帶寬。
Netty 中的零拷貝展現在以下幾個方面:
- Netty 提供了CompositeByteBuf類,可以将多個ByteBuf合并為一個邏輯上的 ByteBuf,避免了各個 ByteBuf 之間的資料拷貝。
- ByteBuf 支援 slice 操作, 是以可以将 ByteBuf 分解為多個共享同一個存儲區域的 ByteBuf, 避免了記憶體的拷貝。
- 通過 FileRegion 包裝的FileChannel.tranferTo 實作檔案傳輸,可以直接将檔案緩沖區的資料發送到目标 Channel,避免了傳統通過循環 write 方式導緻的記憶體拷貝問題。
- Netty 的接收和發送 ByteBuffer 采用 DIRECT BUFFERS,使用堆外直接記憶體進行 Socket 讀寫,不需要進行位元組緩沖區的二次拷貝。如果使用傳統的堆記憶體(HEAP BUFFERS)進行 Socket 讀寫,JVM 會将堆記憶體 Buffer 拷貝一份到直接記憶體中,然後才寫入 Socket 中。相比于堆外直接記憶體,消息在發送過程中多了一次緩沖區的記憶體拷貝。
9 Netty 和 Tomcat 的差別
作用不同:Tomcat 是 Servlet 容器,可以視為 Web 伺服器,是一款已經開發好的軟體,而 Netty 是一款強大的異步事件驅動的網絡應用程式架構,用于簡化網絡程式設計,可用于編寫各種伺服器。
協定不同:Tomcat 是基于 http 協定的 Web 伺服器,而 Netty 支援各種現成的協定并且能通過程式設計自定義各種協定,因為 Netty 本身自己能編碼/解碼位元組流,是以Netty 可以實作HTTP 伺服器、FTP 伺服器、UDP 伺服器、RPC 伺服器、WebSocket 伺服器、Redis 的 Proxy 伺服器、MySQL 的 Proxy 伺服器等等。
10 Netty簡單案例
client:
public class NettyClient {
public static void main(String[] args) throws IOException, InterruptedException {
//1.建立一個 NioEventLoopGroup 對象執行個體
EventLoopGroup group = new NioEventLoopGroup();
try {
//2.建立用戶端啟動引導/輔助類:Bootstrap
Bootstrap bootstrap = new Bootstrap();
//3.指定線程組
bootstrap.group(group)
//4.指定 IO 模型
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 5.這裡可以自定義消息的業務處理邏輯
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new ClientHandler());
}
});
// 6.嘗試建立連接配接
ChannelFuture f = bootstrap.connect("localhost", 8888).sync();
Channel channel = f.channel();
// 7.等待連接配接關閉(阻塞,直到Channel關閉)
//channel.closeFuture().sync();
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for (; ; ) {
String s = br.readLine();
channel.writeAndFlush(s + "\r\n");
if ("bye".equals(s)) {
break;
}
}
} finally {
group.shutdownGracefully();
}
}
}
ClientHandler:
/**
* @author lx
*/
public class ClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
}
}
NettyServer:
public class NettyServer {
public static void main(String[] args) {
//1 bossGroup 用于接收連接配接 mainReactor
//workerGroup 用于具體的處理 subReactor
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//2.建立服務端啟動引導/輔助類:ServerBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
//3.給引導類配置兩大線程組,确定了線程模型
serverBootstrap
.group(bossGroup, workerGroup)
// 4.指定 IO 模型
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
//5.可以自定義用戶端消息的業務處理邏輯Handler
pipeline.addLast(new DelimiterBasedFrameDecoder(4096,Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new ServerHandler());
}
});
// 6.綁定端口,調用 sync 方法阻塞直到綁定完成
ChannelFuture f = serverBootstrap.bind(8888).sync();
// 7.阻塞等待直到伺服器Channel關閉(closeFuture()方法擷取Channel 的CloseFuture對象,然後調用sync()方法)
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//8.優雅關閉相關線程組資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
ServerHandler:
/**
* @author lx
*/
public class ServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 讀取請求
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
Channel channel = ctx.channel();
System.out.println("client: " + channel.remoteAddress());
System.out.println("from client: " + msg);
double v = ThreadLocalRandom.current().nextDouble();
channel.writeAndFlush("from server: " + v + " \r\n");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
System.out.println("client: " + channel.remoteAddress() + "加入");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
System.out.println("client: " + channel.remoteAddress() + "離開");
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
System.out.println("client: " + channel.remoteAddress() + "上線");
}
}
參考文章:
- Netty常見面試題總結
- Netty 面試題