一、前言
上一篇文章我們提到 Netty 的核心元件是 Channel、回調、Future、ChannelHandler、EventLoop,這篇文章主要是對 Channel (Netty傳入和傳出資料的載體)做一些詳細的講解,以及介紹下 Netty 内置的傳輸類型。
二、傳輸的核心
傳輸 API 的核心是 interface Channel ,她被用于所有的 I/O 操作。Channel 類的層次結構如圖所示:

如圖,每個Channel 都會被配置設定一個 ChannelPipeline 和 ChannelConfig,ChannelConfig 包含了該 Channel 的所有配置設定,并且支援熱更新。ChannelPipeline 是 ChannelHandler鍊的容器,持有所有入站和出站資料以及ChannelHandler 執行個體。
由于 Channel 是獨一無二的,是以為了保證順序将 Channel 聲明為java.lang.Compareable的一個子接口,是以每個Channel都有不同的散列碼,否則會報Error。
Netty的 Channel 實作是線程安全的,是以你可以存儲一個Channel的引用,并且每當你需要向遠端節點寫資料時,都可以使用它,即使當時許多線程都在使用它。
Channel 的其他方法如下:
tips:
1、ChannelHandler 的典型用途:
-- 将資料從一種格式轉換為另一種格式。
-- 提供異常的通知。
-- 提供Channel 變為活動的或者非活動的通知。
-- 提供當Channel 注冊到 EventLoop 或者 從 EventLoop 登出時的通知。
-- 提供有關使用者自定義事件的通知。
2、Netty 所提供的廣泛功能隻依賴于少量的接口。這意味着,你可以對你的應用程式邏輯進行重大的修改,而無需大規模的重構你的代碼庫。
三、Netty 内置的傳輸類型
Netty 内置了一些可開箱即用的傳輸。因為并不是它們所有的傳輸都支援每一種協定,是以你必須選擇一個和你的應用程式所使用的協定都相容的傳輸。
名稱 | 包 | 描述 | 應用場景 |
NIO | io.netty.channel.socket.nio | 使用java.nio.channels 包作為基礎——基于選擇器的方式 | 非阻塞I/O使用 |
Epoll | io.netty.channel.epoll | 由 JNI 驅動的 epoll()和非阻塞 IO。 這個傳輸支援隻有在 Linux上可用的多種特性,如SO_REUSEPORT,比 NIO 傳輸更快, 而且是完全非阻塞的 | Linux上的非阻塞 I/O 使用 |
OIO | io.netty.channel.socket.oio | 使用 java.net 包作為基礎——使用阻塞流 | 阻塞 I/O 使用 |
Local | io.netty.channel.local | 可以在 VM 内部通過管道進行通信的本地傳輸 | 用戶端和服務端都使用同個JVM通信 |
Embedded | io.netty.channel.embedded | Embedded 傳輸,允許使用 ChannelHandler 而又不需要一個真正的基于網絡的傳輸。這在測試你的ChannelHandler 實作時非常有用 | 測試 ChannelHandler 的實作 |
1、NIO — 非阻塞I/O
Java NIO 提供了一個所有I/O操作的全異步實作。其中,選擇器的背後實際上是充當了一個系統資料庫,如圖展示了該處理流程:
對于所有Netty的傳輸實作都共有的使用者級别API完全隐藏了Java NIO的實作細節,如上一篇展示的Demo一樣,Netty 這樣使用Java NIO:
2、Epoll—用于 Linux 的本地非阻塞傳輸
Netty為Linux提供了一組NIO API, 其以一種和它本身的設計更加一緻的方式使用epoll,并且以一種更加輕量的方式使用中斷。 如果你的應用程式旨在運作于Linux系統, 那麼請考慮利用這個版本的傳輸;你将發現在高負載下它的性能要優于JDK的NIO實作。
Netty 在代碼中支援 Epoll 也非常簡單,隻需做如下的轉改變:
3、OIO—舊的阻塞 I/O
Netty是如何能夠使用和用于異步傳輸相同的API來支援OIO的呢?
上文提到,在NIO中,一個 EventLoop 對應一個線程,一個Channel 綁定一個 EventLoop,而一個EventLoop 可以綁定多個Channel 來實作異步,也就是說一個線程可以處理多個 Channel。而OIO中,一個 EventLoop 僅綁定一個 Channel,也就是說每個線程隻處理一個Channel ,這就有點像傳統IO中,在服務端(ServerSocket)寫了一個多線程來處理用戶端的并發請求。
現在還有一個問題,channel是雙向的,既可以讀,也可以寫。而stream是單向的,OIO中利用 InputStream 來讀,OutputStream 來寫。那麼Channel 是如何實作阻塞的讀和寫的呢?答案就是, Netty利用了SO_TIMEOUT這個Socket标志,它指定了等待一個I/O操作完成的最大毫秒數,I/O 操作期間Channel是阻塞的,如果操作在指定的時間間隔内沒有完成,則将會抛出一個SocketTimeout Exception。 Netty将捕獲這個異常并繼續處理循環。在EventLoop下一次運作時,它将再次嘗試。這實際上也是類似于Netty這樣的異步架構能夠支援OIO的唯一方式。
Netty 在代碼中支援 OIO,也和NIO類似:
我從硬碟讀取資料,然後程式一直等,資料讀完後,繼續操作。這種方式是最簡單的,叫 阻塞IO。
我從硬碟讀取資料,然後程式繼續向下執行,等資料讀取完後,通知目前程式(對硬體來說叫中斷,對程式來說叫回調),然後此程式可以立即處理資料,也可以執行完目前操作在讀取資料。叫 非阻塞IO。
4、Local —— 用于 JVM 内部通信的 Local 傳輸
Netty 提供了一個Local傳輸, 用于在同一個 JVM 中運作的用戶端和伺服器程式之間的異步通信。
在這個傳輸中,和伺服器 Channel 相關聯的 SocketAddress 并沒有綁定實體網絡位址;相反,隻要伺服器還在運作, 它就會被存儲在系統資料庫裡,并在 Channel 關閉時登出。 因為這個傳輸并不接受真正的網絡流量,是以它并不能夠和其他傳輸實作進行互操作。是以,用戶端希望連接配接到(在同一個 JVM 中)使用了這個傳輸的伺服器端時也必須使用它。
服務端代碼:
public void server() throws InterruptedException {
final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup group = new DefaultEventLoop();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group, group)
.channel(LocalServerChannel.class)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
protected void initChannel(LocalChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(serverHandler);
}
});
ChannelFuture channelFuture = bootstrap.bind(new LocalAddress("foo")).sync();
System.out.println(EchoServer.class.getName() + "--started and listening for connections on--" + channelFuture.channel().localAddress());
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
View Code
用戶端代碼:
public void client() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(LocalChannel.class)
.handler(new ChannelInitializer<LocalChannel>() {
@Override
protected void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(new LocalAddress("foo")).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
備注:現在用戶端和服務端的連接配接一直報一個異常,查了很多資料,也看了 Github 上的諸多demo,仍然沒有解決。有沒有大神幫我解答下?
5、Embedded
Netty 提供了一種額外的傳輸, 使得你可以将一組 ChannelHandler 作為幫助器類嵌入到其他的 ChannelHandler 内部。 通過這種方式,你将可以擴充一個 ChannelHandler 的功能,而又不需要修改其内部代碼。
Embedded 傳輸的關鍵是一個被稱為 EmbeddedChannel 的具體的Channel實作。
如果你想要為自己的 ChannelHandler 實作編寫單元測試, 那麼請考慮使用 Embedded 傳輸。
參考資料:《Netty IN ACTION》
示範源代碼:https://github.com/JMCuixy/NettyDemo