天天看點

Java NIO(六)Netty解決TCP粘包/拆包

TCP粘包/拆包

Tcp是個“流”協定,所謂流就是沒有界限的一串資料。可以類比一下水流,沒有分極限。TCP底層并不了解上層業務資料的具體含義,它會根據TCP緩沖區的實際情況進行包的劃分。是以一個業務資料可能被TCP拆分成多個包進行發送,也有可能把多個小的資料包封裝成一個大的資料包發送。這就是TCP的拆包和粘包。

出現TCP拆包/粘包的幾個原因:

  • 程式write寫入的位元組大小大于套接口發送緩沖區大小。
  • 進行MSS(TCP傳輸時的最大封包段長度)大小的TCP分段。
  • 以太網幀的payload(封裝後的不含頭和尾的資料包部分)大于MTU( 最大傳輸單元()進行IP分片。

TCP拆包/粘包問題的解決政策:

  • 消息定長,例如,每個封包的大小固定長度為200位元組,如果不夠,空位補空格。
  • 在包尾增加回車換行符進行分割,例如FTP協定。
  • 将特殊字元作為消息結束的标志,回車換行符隻是其中的一種。
  • 将消息分為消息頭和消息體,消息頭中包含表示消息長度的字段,通常設計思路為消息頭的第一個字段使用int32來表示消息的總長度。

Netty對于粘包/拆包問題重制

重制問題,将上一篇文章中的代碼進行修改。

1.服務端修改NettyServerHandler類,加一個計數器:

public class NettyServerHandler extends ChannelHandlerAdapter {

    private int count = ;//記錄用戶端請求次數。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req,"UTF-8");
        System.out.println("收到用戶端消息:" + body + ";次數是:" + count++);
        String currentTime = "query".equalsIgnoreCase(body)?new java.util.Date(System.currentTimeMillis()).toString():"error";

        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
           

2.用戶端修改NettyClientHandler類,向服務端寫100次資料。

public class NettyClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        ByteBuf firstMessage = null;
        //此處做了修改。
        byte[] req = ("query"+System.getProperty("line.separator")).getBytes();
        for(int j=; j<; j++){
            firstMessage = Unpooled.buffer(req.length);
            firstMessage.writeBytes(req);
            ctx.writeAndFlush(firstMessage);
        }

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req,"UTF-8");
        System.out.println("Now is : " + body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
           

服務端的運作結果如下,次數并不是100次,而是發生了粘包,如果把用戶端的字元串換成一個更長的字元串,效果會更明顯。

Java NIO(六)Netty解決TCP粘包/拆包

用戶端的運作結果如下,用戶端在接收消息時也發生了粘包,收到了兩個連在一起的error字元串。

Java NIO(六)Netty解決TCP粘包/拆包

Netty對于粘包/拆包問題的初步解決

分别在服務端和用戶端的初始化時使用了兩個解碼器LineBasedFrameDecoder和StringDecoder,這兩個解碼器是通過換行符來配合實作的拆包粘包。

服務端修改内容如下

//修改了NettyServerInit類
public class NettyServerInit extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new LineBasedFrameDecoder());
        //添加了下面兩行。
        socketChannel.pipeline().addLast(new StringDecoder());
        socketChannel.pipeline().addLast(new NettyServerHandler());
    }
}
           
//NettyServerHandler修改如下:
public class NettyServerHandler extends ChannelHandlerAdapter {

    private int count = ;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
        //直接強轉了消息,不用再處理
        String body = (String) msg;

        System.out.println("收到用戶端消息:" + body + "次數是:" + ++count);
        String currentTime = ("query".equalsIgnoreCase(body)?new java.util.Date(System.currentTimeMillis()).toString():"error") +System.getProperty("line.separator");

        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
           

用戶端端修改内容如下

public class NettyClient {

    public void connect(int port,String host) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        public void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LineBasedFrameDecoder());
                            //在此處加了兩個解碼器。
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            ChannelFuture f = b.connect("192.168.1.104",port).sync();
            f.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new NettyClient().connect(,"127.0.0.1");
    }
}
           
public class NettyClientHandler extends ChannelHandlerAdapter {

    private int count = ;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        ByteBuf firstMessage = null;

        byte[] req = ("query"+System.getProperty("line.separator")).getBytes();
        for(int j=; j<; j++){
            firstMessage = Unpooled.buffer(req.length);
            firstMessage.writeBytes(req);
            ctx.writeAndFlush(firstMessage);
        }

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //此處不用再處理消息,直接強轉即可。
        String body = (String) msg;
        System.out.println("第" + ++count + "次收到時間N : " + body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
           

服務端運作結果如下:

Java NIO(六)Netty解決TCP粘包/拆包

用戶端運作結果如下:

Java NIO(六)Netty解決TCP粘包/拆包

可以發現,服務端和用戶端都達到了預期效果,用戶端發送了100次消息,服務端響應了100次消息。

Netty對于粘包/拆包問題其他解決方案

LineBasedFrameDecoder是通過換行符來實作的拆包粘包,Netty中還有兩種常見的解碼器,分别是利用“分隔符”和“定長”的解碼器。

文章最開始提到了TCP拆包/粘包問題的解決政策,大體分為4種,Netty分别對這四種進行了抽象。
  • DelimiterBasedFrameDecoder自定義消息分隔符解碼器。
  • FixedLengthFrameDecoder固定長度解碼器。