天天看点

十四.Netty之TCP粘包和拆包

1. 基本介绍

产生TCP粘包和拆包问题的主要原因是,操作系统在发送TCP数据的时候,底层会有一个缓冲区,例如1024个字节大小,如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题;如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP就会将其拆分为多次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。如下图展示了TCP粘包和拆包的一个示意图:

十四.Netty之TCP粘包和拆包

上图中演示了TCP粘包和拆包的三种情况:

  1. A和B两个包都刚好满足TCP缓冲区的大小,或者说其等待时间已经达到TCP等待时长,从而还是使用两个独立的包进行发送;
  2. A和B两次请求间隔时间内较短,并且数据包较小,因而合并为同一个包发送给服务端;
  3. B包比较大,因而将其拆分为两个包B_1和B_2进行发送,而这里由于拆分后的B_2比较小,其又与A包合并在一起发送。

2.实例

MyServer

public class MyServer {
    public static void main(String[] args) throws Exception{

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定义一个初始化类


            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}
           

MyServerInitializer

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new MyServerHandler());
    }
}

           

MyServerHandler

public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf>{
    private int count;

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);

        //将buffer转成字符串
        String message = new String(buffer, Charset.forName("utf-8"));

        System.out.println("服务器接收到数据 " + message);
        System.out.println("服务器接收到消息量=" + (++this.count));

        //服务器回送数据给客户端, 回送一个随机id ,
        ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", Charset.forName("utf-8"));
        ctx.writeAndFlush(responseByteBuf);

    }
}
           

MyClient

public class MyClient {
    public static void main(String[] args)  throws  Exception{

        EventLoopGroup group = new NioEventLoopGroup();

        try {

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer()); //自定义一个初始化类

            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();

            channelFuture.channel().closeFuture().sync();

        }finally {
            group.shutdownGracefully();
        }
    }
}
           

MyClientInitializer

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyClientHandler());
    }
}

           

MyClientHandler

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客户端发送10条数据 hello,server 编号
        for(int i= 0; i< 10; ++i) {
            ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8"));
            ctx.writeAndFlush(buffer);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);

        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("客户端接收到消息=" + message);
        System.out.println("客户端接收消息数量=" + (++this.count));

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

           

3.解决方案

解决办法

3.1 方法一:FixedLengthFrameDecoder

消息定长,报文大小固定长度,不够空格补全,发送和接收方遵循相同的约定, 这样即使粘包了通过接收方编程实现获取定长报文也能区分。

3.2 方法二:DelimiterBasedFrameDecoder

报文添加特殊分隔符,

例如每条报文结束都添加回车换行符(例如FTP协议) 或者指定特殊字符作为报文分隔符,接收方通过特殊分隔符切分报文区分。

ByteBuf buf = Unpooled.copiedBuffer("_split".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
           

3.3 使用自定义协议 + 编解码器

实例:

  1. 要求客户端发送 5 个 Message 对象, 客户端每次发送一个 Message 对象
  2. 服务器端每次接收一个Message, 分5次进行解码, 每读取到 一个Message , 会回复一个Message 对象 给客户端.
/**
 * 协议包
 *
 * @author Administrator
 */
public class MessageProtocol {
    private int len;
    private byte[] content;

    public int getLen() {
        return len;
    }

    public void setLen(int len) {
        this.len = len;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }
}
           
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {

  private int count;

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //使用客户端发送5条数据 "今天天气冷,吃火锅" 编号

    for (int i = 0; i < 5; i++) {
      String mes = "学好netty,go,go,go";
      byte[] content = mes.getBytes(Charset.forName("utf-8"));
      int length = mes.getBytes(Charset.forName("utf-8")).length;

      //创建协议包对象
      MessageProtocol messageProtocol = new MessageProtocol();
      messageProtocol.setLen(length);
      messageProtocol.setContent(content);
      ctx.writeAndFlush(messageProtocol);

    }

  }

  //    @Override
  protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {

    int len = msg.getLen();
    byte[] content = msg.getContent();

    System.out.println("客户端接收到消息如下");
    System.out.println("长度=" + len);
    System.out.println("内容=" + new String(content, Charset.forName("utf-8")));

    System.out.println("客户端接收消息数量=" + (++this.count));

  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("异常消息=" + cause.getMessage());
    ctx.close();
  }
}
           
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyMessageEncoder encode 方法被调用");
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}
           
public class MyMessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyMessageDecoder decode 被调用");
        //需要将得到二进制字节码-> MessageProtocol 数据包(对象)
        int length = in.readInt();

        byte[] content = new byte[length];
        in.readBytes(content);

        //封装成 MessageProtocol 对象,放入 out, 传递下一个handler业务处理
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(content);

        out.add(messageProtocol);

    }
}
           
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    private int count;

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //cause.printStackTrace();
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {

        //接收到数据,并处理
        int len = msg.getLen();
        byte[] content = msg.getContent();

        System.out.println("============================");
        System.out.println("服务器接收到信息如下");
        System.out.println("长度=" + len);
        System.out.println("内容=" + new String(content, StandardCharsets.UTF_8));

        System.out.println("服务器接收到消息包数量=" + (++this.count));

        //回复消息
        String responseContent = UUID.randomUUID().toString();
        int responseLen = responseContent.getBytes(StandardCharsets.UTF_8).length;
        byte[] responseContent2 = responseContent.getBytes(StandardCharsets.UTF_8);
        //构建一个协议包
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(responseLen);
        messageProtocol.setContent(responseContent2);

        ctx.writeAndFlush(messageProtocol);
    }
}