天天看點

Netty 4 實踐 (1)初識netty4

netty 5正式版還沒有出,項目準備從netty 3更新到netty4。

mina與netty 是同一個人寫的,mina将核心和一些特性的聯系過于緊密,使得使用者在不需要這些特性的時候無法脫離,相比下性能會有所下降;netty解決了這個設計問題,按照這個理論netty 是mina 的更新。

從netty3,到netty4的代碼變話還是挺多的,首先看看bootstrap,它的結構可以說比較簡單,涉及的類和接口很少;如圖:

Netty 4 實踐 (1)初識netty4

Bootstrap則是用戶端程式用的引導類,ServerBootstrap是服務端程式用的引導類,Bootstrap 和ServerBootstrap都繼承自AbstractBootstrap,ChannelFactory用于處理channel。

      ChannelFuture被拆分為ChannelFuture和ChannelPromise。這不僅僅是讓異步操作裡的生産者和消費者間的約定更明顯,同樣也是得在使用從鍊中傳回的ChannelFuture更加安全,因為ChannelFuture的狀态是不能改變的。

由于這個編号,一些方法現在都采用ChannelPromiser而不是ChannelFuture來改變它的狀态。

      在編碼解碼器架構裡有實質性的内部改變,因為4.0需要一個處理器來建立和管理它的緩存(看這篇文章的每個處理器緩存部分。)然而,從使用者角度來看這些變化都不是很大的。核心編碼界面器類移到io.netty.handler.codec包裡。FrameDecoder重命名為ByteToMessageDecoder。OneToOneEncoder和OneToOneDecoder由MessageToMessageEncoder和MessageToMessageDecoder替換。

       看了看官網netty 4的入門實踐 http://netty.io/wiki/user-guide-for-4.x.html 寫了個例子,體驗和一步一步來研究一下netty 4

   首先來寫服務端:

public class ByteServer {

    private final static String host = "127.0.0.1";
    private final static Integer port = 8898;

    public static void main(String[] args) {
        /***
         * ·NioEventLoopGroup 實際上是個連接配接池,NioEventLoopGroup在背景啟動了n個NioEventLoop
         * 來處理Channel事件,每個NioEventLoop負責m個Channel
         * ·NioEventLoopGroup從NioEventLoop數組集中挨個取出NioEventLoop用以處理Channel
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup);
            // 設定 nio 類型的 channel
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.childHandler(new ByteServerInitializer());
            //此處在寫一個TCP/IP 的服務端,是以我們被允許設定 socket 的參數選項比如tcpNoDelay 和 keepAlive。
            serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
            /***
             * option() 是提供NioServerSocketChannel用來接收進來的連接配接。
             * childOption() 是提供父管道ServerChannel接收到的
             * 連接配接(此例是 NioServerSocketChannel)。
             */
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            // 伺服器綁定端口監聽(sync,同步方法阻塞直到綁定成功)
            ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();
            // 監聽伺服器關閉監聽(應用程式等待直到channel關閉)
            channelFuture.channel().closeFuture().sync();

            // logger.info("TCP伺服器已啟動");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //關閉EventLoopGroup釋放資源包
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
           

是不是感覺比netty 3 的代碼變化了好多,接下來寫ByteServerInitializer類:

public class ByteServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        /***
         * 個地方的 必須和服務端對應上。否則無法正常解碼和編碼
         */
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 8, 0, 8));
        pipeline.addLast(new LengthFieldPrepender(8));
        //當 read 的時候
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        //當 send 的時候
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        //服務端邏輯
        pipeline.addLast(new ByteServerHandler());
    }
}
           

接下來寫服務端handler 類,處理資料:

public class ByteServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (null != msg) {
//            ByteBuf byteBuf = ((ByteBuf) msg);
            StringBuffer sbf = new StringBuffer("收到用戶端->");
            sbf.append(ctx.channel().remoteAddress());
            sbf.append("的消息:");
            sbf.append(msg);
            System.out.println(sbf);
            ctx.writeAndFlush("ok");
//            byteBuf.release();
            ctx.close();
        }
    }

    /***
     * 覆寫了 channelActive() 事件處理方法。服務端監聽到用戶端活動
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive>>>> Client:" + ctx.channel().remoteAddress() + "線上");
        ctx.fireChannelActive();
    }

    /***
     * 監聽用戶端掉線
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // System.out.println("channelInactive>>>> Client:"+ctx.channel().remoteAddress()+"掉線");
        super.channelInactive(ctx);
    }

    /*****
     * 異常資訊 (根據需要,選擇是否關閉)
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("server exception is general");
         ctx.fireExceptionCaught(cause);
//        ctx.close();
    }
}
           

用戶端

public class ByteClient {
    private final static String host = "127.0.0.1";
    private final static Integer port = 8898;

    public static void main(String[] args) {
        EventLoopGroup group = null;
        try {
            group = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ByteClientInitializer());
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            Channel ch = bootstrap.connect(host, port).sync().channel();
            ch.writeAndFlush("發送一條指令:我的小魚你醒了,還認識早晨嗎?" + Thread.currentThread().getName());
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
           if(null!=group){
               group.shutdownGracefully();
           }
        }
    }
}
           
public class ByteClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        /****
         * 個地方的 必須和服務端對應上。否則無法正常解碼和編碼
         */
        pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 8, 0, 8));
        pipeline.addLast("frameEncoder", new LengthFieldPrepender(8));
        //當 read 的時候
        pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
        //當 send 的時候
        pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
        // 用戶端的邏輯
        pipeline.addLast("handler", new ByteClientHander());
    }
}
           
public class ByteClientHander extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(null!=msg){
            try {
               System.out.println("服務端傳回消息:" + msg);
            }finally {
                ReferenceCountUtil.release(msg);
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }

    /***
     * 異常處理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("client exception is general");
        /***
         * 異常資訊
         *   cause.printStackTrace();
         */
        ctx.fireExceptionCaught(cause);
    }
}
           

用戶端向服務端,發資料服務端如果傳回OK則表示資料發送成功,沒有包含其他邏輯;一個簡單的例子就成了。

繼續閱讀