天天看點

十四.Netty之TCP粘包和拆包

1. 基本介紹

産生TCP粘包和拆包問題的主要原因是,作業系統在發送TCP資料的時候,底層會有一個緩沖區,例如1024個位元組大小,如果一次請求發送的資料量比較小,沒達到緩沖區大小,TCP則會将多個請求合并為同一個請求進行發送,這就形成了粘包問題;如果一次請求發送的資料量比較大,超過了緩沖區大小,TCP就會将其拆分為多次發送,這就是拆包,也就是将一個大的包拆分為多個小包進行發送。如下圖展示了TCP粘包和拆包的一個示意圖:

十四.Netty之TCP粘包和拆包

上圖中示範了TCP粘包和拆包的三種情況:

  1. A和B兩個包都剛好滿足TCP緩沖區的大小,或者說其等待時間已經達到TCP等待時長,進而還是使用兩個獨立的包進行發送;
  2. A和B兩次請求間隔時間内較短,并且資料包較小,因而合并為同一個包發送給服務端;
  3. B包比較大,因而将其拆分為兩個包B_1和B_2進行發送,而這裡由于拆分後的B_2比較小,其又與A包合并在一起發送。

2.執行個體

MyServer

public class MyServer {
    public static void main(String[] args) throws Exception{

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定義一個初始化類


            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}
           

MyServerInitializer

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new MyServerHandler());
    }
}

           

MyServerHandler

public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf>{
    private int count;

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);

        //将buffer轉成字元串
        String message = new String(buffer, Charset.forName("utf-8"));

        System.out.println("伺服器接收到資料 " + message);
        System.out.println("伺服器接收到消息量=" + (++this.count));

        //伺服器回送資料給用戶端, 回送一個随機id ,
        ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", Charset.forName("utf-8"));
        ctx.writeAndFlush(responseByteBuf);

    }
}
           

MyClient

public class MyClient {
    public static void main(String[] args)  throws  Exception{

        EventLoopGroup group = new NioEventLoopGroup();

        try {

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer()); //自定義一個初始化類

            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();

            channelFuture.channel().closeFuture().sync();

        }finally {
            group.shutdownGracefully();
        }
    }
}
           

MyClientInitializer

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyClientHandler());
    }
}

           

MyClientHandler

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用用戶端發送10條資料 hello,server 編号
        for(int i= 0; i< 10; ++i) {
            ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8"));
            ctx.writeAndFlush(buffer);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);

        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("用戶端接收到消息=" + message);
        System.out.println("用戶端接收消息數量=" + (++this.count));

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

           

3.解決方案

解決辦法

3.1 方法一:FixedLengthFrameDecoder

消息定長,封包大小固定長度,不夠空格補全,發送和接收方遵循相同的約定, 這樣即使粘包了通過接收方程式設計實作擷取定長封包也能區分。

3.2 方法二:DelimiterBasedFrameDecoder

封包添加特殊分隔符,

例如每條封包結束都添加回車換行符(例如FTP協定) 或者指定特殊字元作為封包分隔符,接收方通過特殊分隔符切分封包區分。

ByteBuf buf = Unpooled.copiedBuffer("_split".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
           

3.3 使用自定義協定 + 編解碼器

執行個體:

  1. 要求用戶端發送 5 個 Message 對象, 用戶端每次發送一個 Message 對象
  2. 伺服器端每次接收一個Message, 分5次進行解碼, 每讀取到 一個Message , 會回複一個Message 對象 給用戶端.
/**
 * 協定包
 *
 * @author Administrator
 */
public class MessageProtocol {
    private int len;
    private byte[] content;

    public int getLen() {
        return len;
    }

    public void setLen(int len) {
        this.len = len;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }
}
           
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {

  private int count;

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //使用用戶端發送5條資料 "今天天氣冷,吃火鍋" 編号

    for (int i = 0; i < 5; i++) {
      String mes = "學好netty,go,go,go";
      byte[] content = mes.getBytes(Charset.forName("utf-8"));
      int length = mes.getBytes(Charset.forName("utf-8")).length;

      //建立協定包對象
      MessageProtocol messageProtocol = new MessageProtocol();
      messageProtocol.setLen(length);
      messageProtocol.setContent(content);
      ctx.writeAndFlush(messageProtocol);

    }

  }

  //    @Override
  protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {

    int len = msg.getLen();
    byte[] content = msg.getContent();

    System.out.println("用戶端接收到消息如下");
    System.out.println("長度=" + len);
    System.out.println("内容=" + new String(content, Charset.forName("utf-8")));

    System.out.println("用戶端接收消息數量=" + (++this.count));

  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("異常消息=" + cause.getMessage());
    ctx.close();
  }
}
           
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyMessageEncoder encode 方法被調用");
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}
           
public class MyMessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyMessageDecoder decode 被調用");
        //需要将得到二進制位元組碼-> MessageProtocol 資料包(對象)
        int length = in.readInt();

        byte[] content = new byte[length];
        in.readBytes(content);

        //封裝成 MessageProtocol 對象,放入 out, 傳遞下一個handler業務處理
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(content);

        out.add(messageProtocol);

    }
}
           
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    private int count;

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {

        //接收到資料,并處理
        int len = msg.getLen();
        byte[] content = msg.getContent();

        System.out.println("============================");
        System.out.println("伺服器接收到資訊如下");
        System.out.println("長度=" + len);
        System.out.println("内容=" + new String(content, StandardCharsets.UTF_8));

        System.out.println("伺服器接收到消息包數量=" + (++this.count));

        //回複消息
        String responseContent = UUID.randomUUID().toString();
        int responseLen = responseContent.getBytes(StandardCharsets.UTF_8).length;
        byte[] responseContent2 = responseContent.getBytes(StandardCharsets.UTF_8);
        //建構一個協定包
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(responseLen);
        messageProtocol.setContent(responseContent2);

        ctx.writeAndFlush(messageProtocol);
    }
}