天天看點

TCP粘包拆包及Netty的解決方案

10 TCP粘包和拆包

10.1 TCP粘包拆包概述

什麼是TCP粘包和拆包

TCP是面向連接配接的,面向流的,提供高可靠性服務。收發兩端(用戶端和伺服器端)都要有一一成對的socket,是以,發送端為了将多個發給接收端的包,更有效的發給對方,使用了優化方法(Nagle算法),将多次間隔較小且資料量小的資料,合并成一個大的資料塊,然後進行封包。這樣做雖然提高了效率,但是接收端就難于分辨出完整的資料包了,因為面向流的通信是無消息保護邊界的。

簡單來說,粘包就是TCP為了提升傳輸效率,将多個資料拼接成一個資料包進行傳輸。

粘包雖然提升了傳輸效率,但是在接受端如何把這個資料包拆回原來的多個資料呢?這就是拆包要解決的問題。

如何拆包?

可以看到,粘包時TCP傳輸層幫我們做的事情,但是TCP粘包之後它不會去負責拆包的工作(渣男),這時候需要我們自己去解決。

這時候我們的解決思路就是:不斷的從TCP緩沖區讀取資料,讀取完之後判斷是否是一個完整的資料包。如果不是一個完整的資料包,就繼續從TCP中讀取,直到得到一個完整的資料包。

是以,我怎麼知道是不是一個完整的資料包?這時就需要我們在每個資料包中都存放一個字段:資料的位元組數。

TCP粘包示例

用戶端連續發送資料較小的資料

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            ByteBuf byteBuf = Unpooled.copiedBuffer("server" + i + " ", CharsetUtil.UTF_8);
            ctx.writeAndFlush(byteBuf);
        }
    }
}
           

伺服器端的自定義Handler

public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);  //将Bytebuf的msg轉成byte數組
        String string = new String(buffer, CharsetUtil.UTF_8);
        System.out.println(string+"\n");  //列印伺服器端收到的資料。
    }
}
           

啟動伺服器端後再啟動用戶端。可以在伺服器端觀察到,伺服器隻進行了一次讀取,将所有資料都讀取出來了。即伺服器端隻收到一個包。說明了用戶端發送的TCP資料包有粘包問題。

10.2 TCP粘包拆包解決方案

自定義協定包解決
  • 使用自定義協定 + 編解碼器 來解決
  • 關鍵就是要解決 伺服器端每次讀取資料長度的問題, 這個問題解決,就不會出現伺服器多讀或少讀資料的問題,進而避免的TCP 粘包、拆包 。

自定義一個協定Message對象:

public class MessageProtocol {
    private int len; //關鍵,表示消息的長度
    private byte[] content;
}
           

自定義編碼器:将Message轉換成ByteBuf的編碼器

public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MessageEncoder 方法被調用");
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}
           

自定義解碼器: 将

ByteBuf

轉換成

Message

//繼承ReplayingDecoder,這個類會自定的去按照位元組長度解碼
public class MessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MessageDecoder 被調用");
        //需要将擷取到的二進制位元組碼轉換成 MessageProtocol
        int length = in.readInt();
        byte[] content = new byte[length];
        in.readBytes(content);

        //封裝成 MessageProtocol 對象,放入 out,傳遞到下一個Handler
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(content);
        out.add(messageProtocol);
    }
}
           

用戶端發送資料

1)添加自定義的編碼器和處理器

bootstrap.group(eventExecutors)  //設定線程組
                    .channel(NioSocketChannel.class)   //設定用戶端通道實作類
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MessageEncoder());
                            ch.pipeline().addLast(new MyClientHandler());
                        }
                    });
           

2)自定義處理器中,建立連接配接後發送資料

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 5; i++) {
            StringBuilder stringBuilder = new StringBuilder();
            for(int j=0; j<i; ++j){
                stringBuilder.append('a');
            }
            String msg = stringBuilder.toString();
            byte[] context = msg.getBytes(CharsetUtil.UTF_8);
            int length = context.length;

            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setContent(context);
            messageProtocol.setLen(length);
            ctx.writeAndFlush(messageProtocol);
        }
    }
}
           

伺服器端接受資料

1)添加自定義的解碼器和處理器

pipeline.addLast(new MessageDecoder());
pipeline.addLast(new MyServerHandler());
           

2)自定義處理器中接受資料

public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        System.out.println("長度:"+msg.getLen());
        System.out.println(new String(msg.getContent(),CharsetUtil.UTF_8));
    }
}
           

服務端列印内容:

TCP粘包拆包及Netty的解決方案

這樣就能解決TCP粘包和拆包問題。

10.3 Netty自帶的拆包器

Netty中已經為我們提供了自帶的拆包器:

1)

FixedLengthFrameDecoder

:固定長度的拆包器,适用于每個資料包的長度都是固定的情況。使用時隻需要将這個拆包器加入到pipeline中,Netty将根據固定長度的資料包傳入到下一個Handler。

2)

LineBasedFrameDecoder

:行拆包器,适用于每個資料包之間按照行來分割的情況。

3)

DelimiterBasedFrameDecoder

:分隔符拆包器,行拆包器按照換行符分割,分隔符拆包器可以自定義分隔符來進行拆包。

4)

LengthFieldBasedFrameDecoder

:基于長度域拆包器,最通用的一種拆包器,隻要自定義協定中包含長度域字段,均可以使用該拆包器來實作。

一般我們使用第四個最通用的拆包器,隻要我們的協定裡面有資料長度這個字段即可。

// 最大位元組數,長度域偏移量,長度域長度
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4);
           

由于是拆包器,肯定是在責任鍊的第一個,是以隻需要将這個拆包器加入到pipeline中第一個位置即可完成拆包,解決TCP拆包問題。

原了解析

Netty中的拆包器都是繼承自

ByteToMessageDecoder

這個基類,這個類中内部有一個累加器,讀取到資料之後會觸發累加器,用于記錄目前已經讀取多少位元組,然後對累加的資料按照給定的資料長度進行拆包。

繼續閱讀