天天看點

Netty in action—Netty傳輸服務

網絡傳輸中資料通常以一種格式:位元組。這些位元組要怎樣傳播主要取決于我們指定的網絡傳輸服務,幫助我們抽象底層的資料傳輸機制。使用者不需要關心實作細節,他們隻需要确信他們的位元組能被可靠地發送和接收。

Netty為它所有的傳輸服務實作提供了通用的API,使你能很容易的從阻塞傳輸服務轉換到非阻塞傳輸服務。

案例學習:傳輸服務遷移

我們以一個簡單的應用案例開始我們的傳輸服務學習。這個應用接收一個連接配接,然後寫入”Hi!”并傳回給用戶端,最後關閉連接配接。

使用OIO(阻塞的傳輸服務)和NIO(異步的傳輸服務)

我們會僅僅使用JDK的API來實作這個應用的OIO和NIO版本。

下面的代碼實作了阻塞的版本。

package com.netty.ch3;

import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;

public class PlainOioServer
    public void serve(int port) throws IOException{
        final ServerSocket socket = new ServerSocket(port);
        try{
            for (;;){
                final Socket clinetSocket = socket.accept();
                System.out.println("Accepted connection from " + clinetSocket);
                new Thread(new Runnable() {//建立一個線程來處理這個連結
                    public void run() {
                        OutputStream out;
                        try{
                            out = clinetSocket.getOutputStream();
                            out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));//把消息寫入輸出流傳回給用戶端
                            out.flush();
                            clinetSocket.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }finally {
                            try{
                                clinetSocket.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }).start();
            }
        }catch      

這段代碼足夠進行中等數量的并發請求。但是如果這個應該大受歡迎,你會發現如果有成千上萬的并發請求,你的應用就表現的不太好了,然後你決定轉換到非阻塞(異步)的網絡程式設計實作。但是你馬上就會發現非阻塞的API和阻塞的API完全不同,是以你需要重寫你的應用。

下面給出的非阻塞的版本實作:

package com.netty.ch3;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class PlainNioServer
    public void serve(int port)throws IOException{
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ssocket = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ssocket.bind(address);
        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);//注冊serverChannel到selector,關心ACCEPT事件
        final ByteBuffer msg  = ByteBuffer.wrap("Hi!\r\n".getBytes());
        for (;;){
            try{
                selector.select();//阻塞等待下一個事件
            }catch (IOException ex){
                ex.printStackTrace();
                break;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                try{
                    if(key.isAcceptable()){
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        client.register(selector,SelectionKey.OP_WRITE | SelectionKey.OP_READ,msg.duplicate());
                        System.out.println("Accepted connection from " + client);
                    }
                    if(key.isWritable()){
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        while(buffer.hasRemaining()){
                            if(client.write(buffer) == 0){
                                break;
                            }
                        }
                        client.close();
                    }
                }catch (IOException ex){
                    key.cancel();
                    try{
                        key.channel().close();
                    }catch      

正如你所看到的,盡管這些代碼是處理同樣的事情,但是确實完全不同。如果實作一個非阻塞的IO需要完全的重寫,考慮下實作這麼複雜的代碼需要多大的工作量。

下面我們看看如何通過Netty實作OIO和NIO。

通過Netty實作OIO和NIO

首先實作阻塞版本:

package com.netty.ch3;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.oio.OioServerSocketChannel;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;


public class NettyOioServer
    public void server(int port) throws Exception{
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();//通過OioEventLoopGroup來實作阻塞IO
        try{
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    .channel(OioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
                                }
                            });
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        }finally      

接下來我們會通過Netty實作非阻塞的版本

Netty實作非阻塞的版本

下面的代碼幾乎上上一個版本的代碼相同,除了兩行代碼(line5,9)。

這就是從OIO轉換到NIO所有要做的事情。

public class NettyNioServer
    public void server(int port) throws Exception{
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new NioEventLoopGroup();//通過NioEventLoopGroup來實作非阻塞IO
        try{
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    .channel(NioServerSocketChannel.class)//對應的Channel也要改
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
                                }
                            });
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        }finally      

是以Netty為所有傳輸服務的實作都提供了同一套API,無論你選哪一種傳輸服務,你的代碼隻要做出很少的修改。

下面更深入的研究一下傳輸服務API

傳輸服務API

傳輸服務API的核心就是​

​Channel​

​接口,它為所有的IO操作服務。它的結構如下圖所示:

Netty in action—Netty傳輸服務

如果所示,​

​ChannelPipeline​

​​和​

​ChannelConfig​

​​中都指定了​

​Channel​

​​執行個體。​

​ChannelConfig​

​​存儲了Channel的所有配置同時支援熱部署(hot change)。因為一個特定的傳輸服務可能會有一個唯一的配置,它可能實作了​

​ChannelConfig​

​的子類。

因為Channel都是獨立的,聲明​

​Channel​

​​作為​

​java.lang.Comparable​

​​的子接口是為了保證有序性。是以,​

​AbstractChannel​

​​中​

​compareTo()​

​​的實作會抛出異常,如果兩個不同的​

​Channel​

​執行個體傳回同樣hash code。

​ChannelPipeline​

​​持有所有的​

​ChannelHandler​

​​執行個體,這些執行個體會為輸入輸出的資料和事件服務。

​​

​ChannelHandler​

​的典型使用場景有:

  • 轉換資料的格式
  • 為異常提供通知
  • 為Channel的激活(active)和失活(inactive)提供通知
  • 為Channel從一個EventLoop中注冊或撤銷(deregister)提供通知
  • 為使用者自定義事件提供通知

攔截過濾器

​​

​ChannelPipeline​

​實作了一個通用的設計模式-攔截過濾器。UNIX的管道是另一個常見的例子:指令被連結在一起,一個指令的輸出呗鍊上的下一個指令過濾

如果有需要,你可以通過添加或移除​

​ChannelHandler​

​​執行個體來修改一個正在運作的​

​ChannelPipeline​

​​。Netty能構造高靈活性(highly flexible)的應用。比如,你能通過簡單的增加一個合适的​

​ChannelHandler​

​(SslHandler)來支援所需的STARTTLS協定。

除了剛剛介紹的​

​ChannelPipeline​

​​和​

​ChannelConfig​

​​,你還能使用其他​

​Channel​

​接口提供的方法,最重要方法如下表所示:

方法名 描述
eventLoop 傳回這個Channel的EventLoop
pipeline 傳回這個Channel的ChannelPipeline
isActive 傳回Channel是否是active的。active的定義可能依賴于底層的傳輸服務。比如,Socket傳輸服務認為一旦與遠端主機建立連接配接即active,然後Datagram傳輸服務認為隻要連接配接打開就算active
localAddress 傳回本地的SocketAddress
remoteAddress 傳回遠端的SocketAddress
write 寫資料到遠端端,這個資料會被送到ChannelPipeline并進入隊列直到被flush才發送出去
flush 将剛才寫入的資料刷到底層的傳出服務(如:Socket)中
writeAndFlush 一個友善的方法,首先調用write()然後flush()

後面我們會詳細讨論這些特性的使用,但現在隻需要知道Netty通過幾個接口就可以提供豐富的功能。

考慮寫資料并flush到遠端這個常見的任務。下面的代碼展示了​

​writeAndFlush()​

​方法的使用:

Channel channel = ...
//建立一個ByteBuf來持有要寫的資料
ByteBuf buf = Unpooled.copiedBuffer("your data",CharsetUtil.UTF_8);
ChannelFuture cf = channel.writeAndFlush(buf);
//增加ChannelFutureListener來監聽write完成事件
cf.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future){
        if(future.isSuccess()){
            System.out.println("Write successful");
        }else{
            System.err.println("Write error");
            future.cause.printStacktrace();
        }
    }
});      

Netty的​

​Channel​

​​實作是線程安全的,是以你可以在多線程的環境下通過一個​

​Channel​

​執行個體的引用來寫資料到遠端。下面的例子展示了再多線程的環境下将資料有序的寫到遠端:

final Channel channel = ...
        //建立一個ByteBuf來持有要寫的資料
        final ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8).retain();
        Runnable writer = new Runnable() {
            @Override
            public void run() {
                channel.write(buf.duplicate());
            }
        };
        Executor executor = Executors.newCachedThreadPool();
        executor.execute(writer);//将寫資料任務移交到一個線程中      

提供的傳輸服務

Netty提供了幾個可用的傳輸服務。因為并不是所有的傳輸服務都支援每一個傳輸協定,你得選擇一個适合你應用中傳輸協定的傳輸服務。下表列出了Netty提供的傳輸服務

名稱 描述
NIO io.netty.channel.socket.nio 基于java.nio.channels(基于selector的途徑)包
Epoll io.netty.channel.epoll 使用JNI的epoll()和非阻塞IO,這個傳輸服務支援的一些特性在Linux上才有效,如SO_REUSEPORT。且比NIO傳輸服務和完全非阻塞都要快
OIO io.netty.channel.socket.oio 基于java.net包,使用阻塞流
Local io.netty.channel.local 一本地傳輸服務能用來通過pipe在VM中通信
Embedded io.netty.channel.embedded 一個嵌入式(embedded)傳輸服務,允許在沒有真正基于網絡傳輸服務的情況下使用ChannelHandler,這對于你測試ChannelHandler的實作很有用

NIO—非阻塞IO

NIO為所有的IO操作提供了完全異步的實作。它利用了基于selector的API。

selector作為一個注冊器來告知你Channel狀态的改變,可能的狀态改變有:

  • 一個新的Channel被接收且已準備就緒
  • 一個Channel的連接配接已經被建立
  • 一個Channel中有準備讀取的資料
  • 一個Channel可用于寫資料

在應用對狀态的改變做出反應後,selector被重置然後重複前面的處理,在另一個線程中檢查狀态的改變并做出相應的相應。

下表顯示了​

​java.nio.channels.SelectionKey​

​中定義的位模式常量,這些位模式常量能組成應用關心的通知集合。

名稱 描述
OP_ACCEPT 請求告知新連接配接被接收,一個Channel執行個體被建立
OP_CONNECT 請求告知一個連接配接被建立
OP_READ 請求告知當資料準備好從Channel中讀
OP_WRITE 請求告知當可以向Channel中寫資料。這種情況出現在socket的緩存被填滿的情況(當資料傳輸速度高于遠端機器處理速度會發送)

這些NIO的内部細節被使用者級的API隐藏,這一點在Netty的傳輸服務實作中很常見。

下圖顯示了狀态變換的處理流程

Netty in action—Netty傳輸服務

Zero-cpoy : Zero-copy是一個特殊屬性,目前隻能在NIO和Epoll傳輸服務中可使用,它允許你可以快速高效地移動你的資料從檔案系統到網絡中而不需要從你的記憶體空間複制到你的使用者空間,這對于你提高協定中的傳輸性能是一個非常重要的特性,例如FTP和HTTP協定,但是這種特性并不是被所有的作業系統所支援的,具體來說,如果資料被加密或者壓縮過就不能正常使用了,隻有一些簡單的原生的檔案内容可以被傳輸

Epoll—Linux的native非阻塞傳輸服務

Netty的NIO傳輸服務是基于Java提供的對非阻塞網絡程式設計的通用抽象來實作的。盡管這足以保證Netty的非阻塞的API可以在任何的平台的可用性,但這依舊會有一些限制,因為JDK為了能在所有的系統上有同樣的可用性做了一些折。

因為linux的網絡的高性能促使了很多一些特性的産生,包括epoll,一個高擴充性的IO事件通知的新特性。

Netty為Linux提供的NIO的API是使用的epoll的,通過這個方法我們可以與使用的linux保持一緻,而不需要浪費一些性能。思考一下如果你的系統是linux,你的應用可以利用這個特性,你會發現在高負載的情況下這比JDK的NIO的實作更加高效。如果用epoll替換NIO,隻需要将​​

​NioEventLoopGroup​

​​用​

​EpollEventLoopGroup​

​​替換,​

​NioServerSocketChannel​

​​用E​

​pollServerSocketChannel​

​替換就可以了。

OIO—舊的阻塞IO模型

從Netty的OIO傳輸服務實作代表了一種折中:它是通過通用傳輸服務API來實作的,但因為是建立在​

​java.net​

​包的基礎上的,是以不是異步的。這适用于一些特定的場合。

舉例來說,你也許需要一些普通的代碼來實作阻塞調用例如JDBC,如果你将其轉化成非阻塞的也許并不是那麼的實用,短期内你可以直接使用Netty的OIO傳輸服務,如果有需要你可以在未來的時間内将其轉化成其他的任意一種異步傳輸,我們還是先看看阻塞通信是如何工作的吧。

在java.net包下的API,經常有一個線程接收新的連接配接到serverSocket的請求,一個新的socket即将被建立來與遠端服務端進行互動,然後需要一個新的線程配置設定來處理後繼的資料互動,多開一個新的線程是有必要的,因為在一個具體的socket上的任何IO操作都可能随時被阻斷,如果用一個線程處理多個sockets很容易導緻一個阻塞的操作也會影響其他的操作。

Netty in action—Netty傳輸服務

正是因為如此,你可能會有疑問為什麼可以使用與非阻塞一樣的API來支援OIO呢?因為Netty使用了SO_TIMEOUT這個socket的參數辨別,它規定了一個I/O操作完成的最長等待的毫秒數,如果在内部規定的時間内操作并沒有完成,那麼一個SocketTimeOutException将會被抛出,Netty将會捕獲這個異常,然後繼續處理循環,在下一個EcentLoop運作的時候,它會再次嘗試,這是像Netty這樣的異步架構能支援OIO的唯一方式,上圖說明了這個邏輯

在JVM中通過本地傳輸服務通信

Netty為運作在同一個虛拟機上的服務端和用戶端提供了本地傳輸的異步通信。同樣,這個傳輸服務支援的API與其他所Netty的傳輸服務實作相同。

這種傳輸方式,與服務端channel相連的SocketAddress并沒有綁定實體位址,而是,隻要伺服器一運作就在注冊器上存儲注冊,隻要channel一關閉,就從注冊器上解除注冊,因為這種傳輸服務并沒有真實的網絡傳輸産生,它不能與其他的傳輸服務互相操作,是以,在同一個JVM上的用戶端想要連接配接到伺服器端的話,必須使用這種傳輸方式,除了這種限制,它就與其他的傳輸服務一樣了

嵌入式傳輸服務

Netty還提供了一種額外的傳輸方式允許你将一些ChannelHandler作為一種輔助工具類嵌入到其他的ChannelHandler中,在這種方式下,我們可以在不修改内部代碼的基礎上擴充ChannelHandler的功能

傳輸服務用例

接下來讓我們考慮如何為一個特定的用例選擇傳輸協定。正如之前所說的,不是所有的傳輸服務都支援所有核心傳輸協定。下表顯示了這種支援情況:

傳輸服務 TCP UDP ​​SCTP​​ UDT
NIO X X X X
Epoll(linux) X X
OIO X X X X

在Linux中啟用SCTP

SCTP需要核心支援同時需要使用者來安裝

比如,Ubuntu系統你使用如下指令:

​​

​# sudo apt-get install libsctp1​

​​

Fedora系統:

​​

​# sudo yum install kernel-modules-extra.x86_64 lksctp-tools.x86_64​

  • 非阻塞代碼—如果在你的代碼中沒有阻塞調用(或者你可以限制),使用NIO或epoll(Linux系統上)是一個好主意。盡管NIO/epoll是用來處理高并發連接配接,但在低并發連接配接的情況下也能工作良好,特别是在多個連接配接中共享線程的情況。
  • 阻塞代碼—如果你的代碼庫嚴重依賴于阻塞的I/O,那麼你的應用應該有一個相應的設計,如果你想把你的阻塞操作直接轉化成Netty的NIO傳輸的時候,在且你不是用重寫原有代碼去完成轉化的功能,你可能會遇到一些問題,例如你可以考慮一種遷移場景,你的應用一開始使用OIO,然後遷移到NIO,你需要重新修改你的代碼
  • 在同一個JVM下通信一同一個JVM上通信且不需要通過網絡暴露你的服務的場景下,使用本地傳輸會是一個很棒的決定,這樣可以在使用的你代碼的基礎上,消除所有的網絡傳輸操作上的開銷,如果你在以後的時間裡想将你的服務暴露出去,你可以将其轉化成NIO或者OIO
  • 測試你的ChannelHandler實作一如果你想要為你的channelHandler寫一個測試單元的話,你可以考慮使用嵌入式的傳輸方式,這個方式可以在不需要創造很多mock對象的基礎上很輕易地測試你的代碼,你可以測試再所有的事件流上的常用的API,且保證你的channelHandler在真實的傳輸服務上運作正确