思維導圖

前言
本文主要講述Netty架構的一些特性以及重要元件,希望看完之後能對Netty架構有一個比較直覺的感受,希望能幫助讀者快速入門Netty,減少一些彎路。
一、Netty概述
官方的介紹:
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
Netty是 一個異步事件驅動的網絡應用程式架構,用于快速開發可維護的高性能協定伺服器和用戶端。
二、為什麼使用Netty
從官網上介紹,Netty是一個網絡應用程式架構,開發伺服器和用戶端。也就是用于網絡程式設計的一個架構。既然是網絡程式設計,Socket就不談了,為什麼不用NIO呢?
2.1 NIO的缺點
對于這個問題,之前我寫了一篇文章
《NIO入門》對NIO有比較詳細的介紹,NIO的主要問題是:
- NIO的類庫和API繁雜,學習成本高,你需要熟練掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
- 需要熟悉Java多線程程式設計。這是因為NIO程式設計涉及到Reactor模式,你必須對多線程和網絡程式設計非常熟悉,才能寫出高品質的NIO程式。
- 臭名昭著的epoll bug。它會導緻Selector空輪詢,最終導緻CPU 100%。直到JDK1.7版本依然沒得到根本性的解決。
2.2 Netty的優點
相對地,Netty的優點有很多:
- API使用簡單,學習成本低。
- 功能強大,内置了多種解碼編碼器,支援多種協定。
- 性能高,對比其他主流的NIO架構,Netty的性能最優。
- 社群活躍,發現BUG會及時修複,疊代版本周期短,不斷加入新的功能。
- Dubbo、Elasticsearch都采用了Netty,品質得到驗證。
三、架構圖
上面這張圖就是在官網首頁的架構圖,我們從上到下分析一下。
綠色的部分Core核心子產品,包括零拷貝、API庫、可擴充的事件模型。
橙色部分Protocol Support協定支援,包括Http協定、webSocket、SSL(安全套接字協定)、谷歌Protobuf協定、zlib/gzip壓縮與解壓縮、Large File Transfer大檔案傳輸等等。
紅色的部分Transport Services傳輸服務,包括Socket、Datagram、Http Tunnel等等。
以上可看出Netty的功能、協定、傳輸方式都比較全,比較強大。
四、永遠的Hello Word
首先搭建一個HelloWord工程,先熟悉一下API,還有為後面的學習做鋪墊。以下面這張圖為依據:
4.1 引入Maven依賴
使用的版本是4.1.20,相對比較穩定的一個版本。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
4.2 建立服務端啟動類
public class MyServer {
public static void main(String[] args) throws Exception {
//建立兩個線程組 boosGroup、workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//建立服務端的啟動對象,設定參數
ServerBootstrap bootstrap = new ServerBootstrap();
//設定兩個線程組boosGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)
//設定服務端通道實作類型
.channel(NioServerSocketChannel.class)
//設定線程隊列得到連接配接個數
.option(ChannelOption.SO_BACKLOG, 128)
//設定保持活動連接配接狀态
.childOption(ChannelOption.SO_KEEPALIVE, true)
//使用匿名内部類的形式初始化通道對象
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//給pipeline管道設定處理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});//給workerGroup的EventLoop對應的管道設定處理器
System.out.println("java技術愛好者的服務端已經準備就緒...");
//綁定端口号,啟動服務端
ChannelFuture channelFuture = bootstrap.bind(6666).sync();
//對關閉通道進行監聽
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
4.3 建立服務端處理器
/**
* 自定義的Handler需要繼承Netty規定好的HandlerAdapter
* 才能被Netty架構所關聯,有點類似SpringMVC的擴充卡模式
**/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//擷取用戶端發送過來的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到用戶端" + ctx.channel().remoteAddress() + "發送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//發送消息給用戶端
ctx.writeAndFlush(Unpooled.copiedBuffer("服務端已收到消息,并給你發送一個問号?", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//發生異常,關閉通道
ctx.close();
}
}
4.4 建立用戶端啟動類
public class MyClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//建立bootstrap對象,配置參數
Bootstrap bootstrap = new Bootstrap();
//設定線程組
bootstrap.group(eventExecutors)
//設定用戶端的通道實作類型
.channel(NioSocketChannel.class)
//使用匿名内部類初始化通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加用戶端通道的處理器
ch.pipeline().addLast(new MyClientHandler());
}
});
System.out.println("用戶端準備就緒,随時可以起飛~");
//連接配接服務端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
//對通道關閉進行監聽
channelFuture.channel().closeFuture().sync();
} finally {
//關閉線程組
eventExecutors.shutdownGracefully();
}
}
}
4.5 建立用戶端處理器
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//發送消息到服務端
ctx.writeAndFlush(Unpooled.copiedBuffer("歪比巴蔔~茉莉~Are you good~馬來西亞~", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收服務端發送過來的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到服務端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
}
4.6 測試
先啟動服務端,再啟動用戶端,就可以看到結果:
MyServer列印結果:
MyClient列印結果:
五、Netty的特性與重要元件
5.1 taskQueue任務隊列
如果Handler處理器有一些長時間的業務處理,可以交給taskQueue異步處理。怎麼用呢,請看代碼示範:
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//擷取到線程池eventLoop,添加線程,執行
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
//長時間操作,不至于長時間的業務操作導緻Handler阻塞
Thread.sleep(1000);
System.out.println("長時間的業務處理");
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
我們打一個debug調試,是可以看到添加進去的taskQueue有一個任務。
5.2 scheduleTaskQueue延時任務隊列
延時任務隊列和上面介紹的任務隊列非常相似,隻是多了一個可延遲一定時間再執行的設定,請看代碼示範:
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
//長時間操作,不至于長時間的業務操作導緻Handler阻塞
Thread.sleep(1000);
System.out.println("長時間的業務處理");
} catch (Exception e) {
e.printStackTrace();
}
}
},5, TimeUnit.SECONDS);//5秒後執行
依然打開debug進行調試檢視,我們可以有一個scheduleTaskQueue任務待執行中
5.3 Future異步機制
在搭建HelloWord工程的時候,我們看到有一行這樣的代碼:
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666);
很多操作都傳回這個ChannelFuture對象,究竟這個ChannelFuture對象是用來做什麼的呢?
ChannelFuture提供操作完成時一種異步通知的方式。一般在Socket程式設計中,等待響應結果都是同步阻塞的,而Netty則不會造成阻塞,因為ChannelFuture是采取類似觀察者模式的形式進行擷取結果。請看一段代碼示範:
//添加監聽器
channelFuture.addListener(new ChannelFutureListener() {
//使用匿名内部類,ChannelFutureListener接口
//重寫operationComplete方法
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//判斷是否操作成功
if (future.isSuccess()) {
System.out.println("連接配接成功");
} else {
System.out.println("連接配接失敗");
}
}
});
5.4 Bootstrap與ServerBootStrap
Bootstrap和ServerBootStrap是Netty提供的一個建立用戶端和服務端啟動器的工廠類,使用這個工廠類非常便利地建立啟動類,根據上面的一些例子,其實也看得出來能大大地減少了開發的難度。首先看一個類圖:
可以看出都是繼承于AbstractBootStrap抽象類,是以大緻上的配置方法都相同。
一般來說,使用Bootstrap建立啟動器的步驟可分為以下幾步:
5.4.1 group()
在上一篇文章
《Reactor模式》中,我們就講過服務端要使用兩個線程組:
- bossGroup 用于監聽用戶端連接配接,專門負責與用戶端建立連接配接,并把連接配接注冊到workerGroup的Selector中。
- workerGroup用于處理每一個連接配接發生的讀寫事件。
一般建立線程組直接使用以下new就完事了:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
有點好奇的是,既然是線程組,那線程數預設是多少呢?深入源碼:
//使用一個常量儲存
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
//NettyRuntime.availableProcessors() * 2,cpu核數的兩倍指派給常量
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//如果不傳入,則使用常量的值,也就是cpu核數的兩倍
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
通過源碼可以看到,預設的線程數是cpu核數的兩倍。假設想自定義線程數,可以使用有參構造器:
//設定bossGroup線程數為1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//設定workerGroup線程數為16
EventLoopGroup workerGroup = new NioEventLoopGroup(16);
5.4.2 channel()
這個方法用于設定通道類型,當建立連接配接後,會根據這個設定建立對應的Channel執行個體。
使用debug模式可以看到
通道類型有以下:
NioSocketChannel: 異步非阻塞的用戶端 TCP Socket 連接配接。
NioServerSocketChannel: 異步非阻塞的伺服器端 TCP Socket 連接配接。
常用的就是這兩個通道類型,因為是異步非阻塞的。是以是首選。
OioSocketChannel: 同步阻塞的用戶端 TCP Socket 連接配接。
OioServerSocketChannel: 同步阻塞的伺服器端 TCP Socket 連接配接。
稍微在本地調試過,用起來和Nio有一些不同,是阻塞的,是以API調用也不一樣。因為是阻塞的IO,幾乎沒什麼人會選擇使用Oio,是以也很難找到例子。我稍微琢磨了一下,經過幾次報錯之後,總算調通了。代碼如下:
//server端代碼,跟上面幾乎一樣,隻需改三個地方
//這個地方使用的是OioEventLoopGroup
EventLoopGroup bossGroup = new OioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup)//隻需要設定一個線程組boosGroup
.channel(OioServerSocketChannel.class)//設定服務端通道實作類型
//client端代碼,隻需改兩個地方
//使用的是OioEventLoopGroup
EventLoopGroup eventExecutors = new OioEventLoopGroup();
//通道類型設定為OioSocketChannel
bootstrap.group(eventExecutors)//設定線程組
.channel(OioSocketChannel.class)//設定用戶端的通道實作類型
NioSctpChannel: 異步的用戶端 Sctp(Stream Control Transmission Protocol,流控制傳輸協定)連接配接。
NioSctpServerChannel: 異步的 Sctp 伺服器端連接配接。
本地沒啟動成功,網上看了一些網友的評論,說是隻能在linux環境下才可以啟動。從報錯資訊看:SCTP not supported on this platform,不支援這個平台。因為我電腦是window系統,是以網友說的有點道理。
5.4.3 option()與childOption()
首先說一下這兩個的差別。
option()設定的是服務端用于接收進來的連接配接,也就是boosGroup線程。
childOption()是提供給父管道接收到的連接配接,也就是workerGroup線程。
搞清楚了之後,我們看一下常用的一些設定有哪些:
SocketChannel參數,也就是childOption()常用的參數:
SO_RCVBUF Socket參數,TCP資料接收緩沖區大小。
TCP_NODELAY TCP參數,立即發送資料,預設值為Ture。
SO_KEEPALIVE Socket參數,連接配接保活,預設值為False。啟用該功能時,TCP會主動探測空閑連接配接的有效性。
ServerSocketChannel參數,也就是option()常用參數:
SO_BACKLOG Socket參數,服務端接受連接配接的隊列長度,如果隊列已滿,用戶端連接配接将被拒絕。預設值,Windows為200,其他為128。
由于篇幅限制,其他就不列舉了,大家可以去網上找資料看看,了解一下。
5.4.4 設定流水線(重點)
ChannelPipeline是Netty處理請求的責任鍊,ChannelHandler則是具體處理請求的處理器。實際上每一個channel都有一個處理器的流水線。
在Bootstrap中childHandler()方法需要初始化通道,執行個體化一個ChannelInitializer,這時候需要重寫initChannel()初始化通道的方法,裝配流水線就是在這個地方進行。代碼示範如下:
//使用匿名内部類的形式初始化通道對象
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//給pipeline管道設定自定義的處理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});
處理器Handler主要分為兩種:
ChannelInboundHandlerAdapter(入站處理器)、ChannelOutboundHandler(出站處理器)
入站指的是資料從底層java NIO Channel到Netty的Channel。
出站指的是通過Netty的Channel來操作底層的java NIO Channel。
ChannelInboundHandlerAdapter處理器常用的事件有:
- 注冊事件 fireChannelRegistered。
- 連接配接建立事件 fireChannelActive。
- 讀事件和讀完成事件 fireChannelRead、fireChannelReadComplete。
- 異常通知事件 fireExceptionCaught。
- 使用者自定義事件 fireUserEventTriggered。
- Channel 可寫狀态變化事件 fireChannelWritabilityChanged。
- 連接配接關閉事件 fireChannelInactive。
ChannelOutboundHandler處理器常用的事件有:
- 端口綁定 bind。
- 連接配接服務端 connect。
- 寫事件 write。
- 重新整理時間 flush。
- 讀事件 read。
- 主動斷開連接配接 disconnect。
- 關閉 channel 事件 close。
還有一個類似的handler(),主要用于裝配parent通道,也就是bossGroup線程。一般情況下,都用不上這個方法。
5.4.5 bind()
提供用于服務端或者用戶端綁定伺服器位址和端口号,預設是異步啟動。如果加上sync()方法則是同步。
有五個同名的重載方法,作用都是用于綁定位址端口号。不一一介紹了。
5.4.6 優雅地關閉EventLoopGroup
//釋放掉所有的資源,包括建立的線程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
會關閉所有的child Channel。關閉之後,釋放掉底層的資源。
5.5 Channel
Channel是什麼?不妨看一下官方文檔的說明:
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind
翻譯大意:一種連接配接到網絡套接字或能進行讀、寫、連接配接和綁定等I/O操作的元件。
如果上面這段說明比較抽象,下面還有一段說明:
A channel provides a user:
the current state of the channel (e.g. is it open? is it connected?),
the configuration parameters of the channel (e.g. receive buffer size),
the I/O operations that the channel supports (e.g. read, write, connect, and bind), and
the ChannelPipeline which handles all I/O events and requests associated with the channel.
翻譯大意:
channel為使用者提供:
- 通道目前的狀态(例如它是打開?還是已連接配接?)
- channel的配置參數(例如接收緩沖區的大小)
- channel支援的IO操作(例如讀、寫、連接配接和綁定),以及處理與channel相關聯的所有IO事件和請求的ChannelPipeline。
5.5.1 擷取channel的狀态
boolean isOpen(); //如果通道打開,則傳回true
boolean isRegistered();//如果通道注冊到EventLoop,則傳回true
boolean isActive();//如果通道處于活動狀态并且已連接配接,則傳回true
boolean isWritable();//當且僅當I/O線程将立即執行請求的寫入操作時,傳回true。
以上就是擷取channel的四種狀态的方法。
5.5.2 擷取channel的配置參數
擷取單條配置資訊,使用getOption(),代碼示範:
ChannelConfig config = channel.config();//擷取配置參數
//擷取ChannelOption.SO_BACKLOG參數,
Integer soBackLogConfig = config.getOption(ChannelOption.SO_BACKLOG);
//因為我啟動器配置的是128,是以我這裡擷取的soBackLogConfig=128
擷取多條配置資訊,使用getOptions(),代碼示範:
ChannelConfig config = channel.config();
Map<ChannelOption<?>, Object> options = config.getOptions();
for (Map.Entry<ChannelOption<?>, Object> entry : options.entrySet()) {
System.out.println(entry.getKey() + " : " + entry.getValue());
}
/**
SO_REUSEADDR : false
WRITE_BUFFER_LOW_WATER_MARK : 32768
WRITE_BUFFER_WATER_MARK : WriteBufferWaterMark(low: 32768, high: 65536)
SO_BACKLOG : 128
以下省略...
*/
5.5.3 channel支援的IO操作
寫操作,這裡示範從服務端寫消息發送到用戶端:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("這波啊,這波是肉蛋蔥雞~", CharsetUtil.UTF_8));
}
用戶端控制台:
//收到服務端/127.0.0.1:6666的消息:這波啊,這波是肉蛋蔥雞~
連接配接操作,代碼示範:
ChannelFuture connect = channelFuture.channel().connect(new InetSocketAddress("127.0.0.1", 6666));//一般使用啟動器,這種方式不常用
通過channel擷取ChannelPipeline,并做相關的處理:
//擷取ChannelPipeline對象
ChannelPipeline pipeline = ctx.channel().pipeline();
//往pipeline中添加ChannelHandler處理器,裝配流水線
pipeline.addLast(new MyServerHandler());
5.6 Selector
在NioEventLoop中,有一個成員變量selector,這是nio包的Selector,在之前
中,我已經講過Selector了。
Netty中的Selector也和NIO的Selector是一樣的,就是用于監聽事件,管理注冊到Selector中的channel,實作多路複用器。
5.7 PiPeline與ChannelPipeline
在前面介紹Channel時,我們知道可以在channel中裝配ChannelHandler流水線處理器,那一個channel不可能隻有一個channelHandler處理器,肯定是有很多的,既然是很多channelHandler在一個流水線工作,肯定是有順序的。
于是pipeline就出現了,pipeline相當于處理器的容器。初始化channel時,把channelHandler按順序裝在pipeline中,就可以實作按序執行channelHandler了。
在一個Channel中,隻有一個ChannelPipeline。該pipeline在Channel被建立的時候建立。ChannelPipeline包含了一個ChannelHander形成的清單,且所有ChannelHandler都會注冊到ChannelPipeline中。
5.8 ChannelHandlerContext
在Netty中,Handler處理器是有我們定義的,上面講過通過內建入站處理器或者出站處理器實作。這時如果我們想在Handler中擷取pipeline對象,或者channel對象,怎麼擷取呢。
于是Netty設計了這個ChannelHandlerContext上下文對象,就可以拿到channel、pipeline等對象,就可以進行讀寫等操作。
通過類圖,ChannelHandlerContext是一個接口,下面有三個實作類。
實際上ChannelHandlerContext在pipeline中是一個連結清單的形式。看一段源碼就明白了:
//ChannelPipeline實作類DefaultChannelPipeline的構造器方法
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
//設定頭結點head,尾結點tail
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
下面我用一張圖來表示,會更加清晰一點:
5.9 EventLoopGroup
我們先看一下EventLoopGroup的類圖:
其中包括了常用的實作類NioEventLoopGroup。OioEventLoopGroup在前面的例子中也有使用過。
從Netty的架構圖中,可以知道伺服器是需要兩個線程組進行配合工作的,而這個線程組的接口就是EventLoopGroup。
每個EventLoopGroup裡包括一個或多個EventLoop,每個EventLoop中維護一個Selector執行個體。
5.9.1 輪詢機制的實作原理
我們不妨看一段DefaultEventExecutorChooserFactory的源碼:
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
@Override
public EventExecutor next() {
//idx.getAndIncrement()相當于idx++,然後對任務長度取模
return executors[idx.getAndIncrement() & executors.length - 1];
}
這段代碼可以确定執行的方式是輪詢機制,接下來debug調試一下:
它這裡還有一個判斷,如果線程數不是2的N次方,則采用取模算法實作。
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
寫在最後
參考Netty官網文檔:
API文檔創作不易,覺得有用就點個贊吧。
我不要下次一定,希望這次一定素質三連,感謝!
想第一時間看到我更新的文章,可以微信搜尋公衆号「
java技術愛好者
」,拒絕做一條鹹魚,我是一個努力讓大家記住的程式員。我們下期再見!!!
能力有限,如果有什麼錯誤或者不當之處,請大家批評指正,一起學習交流!