天天看點

Netty 系列二(傳輸).

一、前言

    上一篇文章我們提到 Netty 的核心元件是 Channel、回調、Future、ChannelHandler、EventLoop,這篇文章主要是對 Channel (Netty傳入和傳出資料的載體)做一些詳細的講解,以及介紹下 Netty 内置的傳輸類型。

二、傳輸的核心

    傳輸 API 的核心是 interface Channel ,她被用于所有的 I/O 操作。Channel 類的層次結構如圖所示:

Netty 系列二(傳輸).

    如圖,每個Channel 都會被配置設定一個 ChannelPipeline 和 ChannelConfig,ChannelConfig 包含了該 Channel 的所有配置設定,并且支援熱更新。ChannelPipeline 是 ChannelHandler鍊的容器,持有所有入站和出站資料以及ChannelHandler 執行個體。

    由于 Channel 是獨一無二的,是以為了保證順序将 Channel 聲明為java.lang.Compareable的一個子接口,是以每個Channel都有不同的散列碼,否則會報Error。

    Netty的 Channel 實作是線程安全的,是以你可以存儲一個Channel的引用,并且每當你需要向遠端節點寫資料時,都可以使用它,即使當時許多線程都在使用它。

    Channel 的其他方法如下:

Netty 系列二(傳輸).

    tips:

1、ChannelHandler 的典型用途:

-- 将資料從一種格式轉換為另一種格式。

-- 提供異常的通知。

-- 提供Channel 變為活動的或者非活動的通知。

-- 提供當Channel 注冊到 EventLoop 或者 從 EventLoop 登出時的通知。

-- 提供有關使用者自定義事件的通知。

2、Netty 所提供的廣泛功能隻依賴于少量的接口。這意味着,你可以對你的應用程式邏輯進行重大的修改,而無需大規模的重構你的代碼庫。

三、Netty 内置的傳輸類型

    Netty 内置了一些可開箱即用的傳輸。因為并不是它們所有的傳輸都支援每一種協定,是以你必須選擇一個和你的應用程式所使用的協定都相容的傳輸。

名稱 描述 應用場景
NIO io.netty.channel.socket.nio 使用java.nio.channels 包作為基礎——基于選擇器的方式 非阻塞I/O使用
Epoll io.netty.channel.epoll 由 JNI 驅動的 epoll()和非阻塞 IO。 這個傳輸支援隻有在 Linux上可用的多種特性,如SO_REUSEPORT,比 NIO 傳輸更快, 而且是完全非阻塞的 Linux上的非阻塞 I/O 使用
OIO io.netty.channel.socket.oio 使用 java.net 包作為基礎——使用阻塞流 阻塞 I/O 使用
Local io.netty.channel.local 可以在 VM 内部通過管道進行通信的本地傳輸 用戶端和服務端都使用同個JVM通信
Embedded io.netty.channel.embedded Embedded 傳輸,允許使用 ChannelHandler 而又不需要一個真正的基于網絡的傳輸。這在測試你的ChannelHandler 實作時非常有用 測試 ChannelHandler 的實作

    1、NIO — 非阻塞I/O

    Java NIO 提供了一個所有I/O操作的全異步實作。其中,選擇器的背後實際上是充當了一個系統資料庫,如圖展示了該處理流程:

Netty 系列二(傳輸).

    對于所有Netty的傳輸實作都共有的使用者級别API完全隐藏了Java NIO的實作細節,如上一篇展示的Demo一樣,Netty 這樣使用Java NIO:

Netty 系列二(傳輸).

    2、Epoll—用于 Linux 的本地非阻塞傳輸

    Netty為Linux提供了一組NIO API, 其以一種和它本身的設計更加一緻的方式使用epoll,并且以一種更加輕量的方式使用中斷。 如果你的應用程式旨在運作于Linux系統, 那麼請考慮利用這個版本的傳輸;你将發現在高負載下它的性能要優于JDK的NIO實作。

    Netty 在代碼中支援 Epoll 也非常簡單,隻需做如下的轉改變:

Netty 系列二(傳輸).

    3、OIO—舊的阻塞 I/O

     Netty是如何能夠使用和用于異步傳輸相同的API來支援OIO的呢?

    上文提到,在NIO中,一個 EventLoop 對應一個線程,一個Channel 綁定一個 EventLoop,而一個EventLoop 可以綁定多個Channel 來實作異步,也就是說一個線程可以處理多個 Channel。而OIO中,一個 EventLoop 僅綁定一個 Channel,也就是說每個線程隻處理一個Channel ,這就有點像傳統IO中,在服務端(ServerSocket)寫了一個多線程來處理用戶端的并發請求。

    現在還有一個問題,channel是雙向的,既可以讀,也可以寫。而stream是單向的,OIO中利用 InputStream 來讀,OutputStream 來寫。那麼Channel 是如何實作阻塞的讀和寫的呢?答案就是, Netty利用了SO_TIMEOUT這個Socket标志,它指定了等待一個I/O操作完成的最大毫秒數,I/O 操作期間Channel是阻塞的,如果操作在指定的時間間隔内沒有完成,則将會抛出一個SocketTimeout Exception。 Netty将捕獲這個異常并繼續處理循環。在EventLoop下一次運作時,它将再次嘗試。這實際上也是類似于Netty這樣的異步架構能夠支援OIO的唯一方式。

    Netty 在代碼中支援 OIO,也和NIO類似:

Netty 系列二(傳輸).

我從硬碟讀取資料,然後程式一直等,資料讀完後,繼續操作。這種方式是最簡單的,叫 阻塞IO。

我從硬碟讀取資料,然後程式繼續向下執行,等資料讀取完後,通知目前程式(對硬體來說叫中斷,對程式來說叫回調),然後此程式可以立即處理資料,也可以執行完目前操作在讀取資料。叫 非阻塞IO。

    4、Local —— 用于 JVM 内部通信的 Local 傳輸

    Netty 提供了一個Local傳輸, 用于在同一個 JVM 中運作的用戶端和伺服器程式之間的異步通信。

    在這個傳輸中,和伺服器 Channel 相關聯的 SocketAddress 并沒有綁定實體網絡位址;相反,隻要伺服器還在運作, 它就會被存儲在系統資料庫裡,并在 Channel 關閉時登出。 因為這個傳輸并不接受真正的網絡流量,是以它并不能夠和其他傳輸實作進行互操作。是以,用戶端希望連接配接到(在同一個 JVM 中)使用了這個傳輸的伺服器端時也必須使用它。

    服務端代碼:

Netty 系列二(傳輸).
Netty 系列二(傳輸).
public void server() throws InterruptedException {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        EventLoopGroup group = new DefaultEventLoop();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(group, group)
                    .channel(LocalServerChannel.class)
                    .childHandler(new ChannelInitializer<LocalChannel>() {
                        @Override
                        protected void initChannel(LocalChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(serverHandler);
                        }
                    });
            ChannelFuture channelFuture = bootstrap.bind(new LocalAddress("foo")).sync();
            System.out.println(EchoServer.class.getName() + "--started and listening for connections on--" + channelFuture.channel().localAddress());
            channelFuture.channel().closeFuture().sync();

        } finally {
            group.shutdownGracefully().sync();
        }
    }      

View Code

    用戶端代碼:

Netty 系列二(傳輸).
Netty 系列二(傳輸).
public void client() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(LocalChannel.class)
                    .handler(new ChannelInitializer<LocalChannel>() {
                        @Override
                        protected void initChannel(LocalChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(new LocalAddress("foo")).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }      

    備注:現在用戶端和服務端的連接配接一直報一個異常,查了很多資料,也看了 Github 上的諸多demo,仍然沒有解決。有沒有大神幫我解答下?

    5、Embedded

    Netty 提供了一種額外的傳輸, 使得你可以将一組 ChannelHandler 作為幫助器類嵌入到其他的 ChannelHandler 内部。 通過這種方式,你将可以擴充一個 ChannelHandler 的功能,而又不需要修改其内部代碼。

    Embedded 傳輸的關鍵是一個被稱為 EmbeddedChannel 的具體的Channel實作。

    如果你想要為自己的 ChannelHandler 實作編寫單元測試, 那麼請考慮使用 Embedded 傳輸。

參考資料:《Netty IN ACTION》

示範源代碼:https://github.com/JMCuixy/NettyDemo