天天看點

Netty詳解(五):Netty TCP粘包 拆包1. 概述2. TCP底層的粘包和拆包機制3. Netty提供半包解碼器來解決TCP粘包/拆包問題4. 總結

1. 概述

無論是服務端還是用戶端,我們讀取或者發送消息的時候,都需要考慮TCP底層的粘包和拆包機制。下面我們來通過Netty來詳解TCP底層的粘包和拆包機制。

2. TCP底層的粘包和拆包機制

TCP是一個“流”協定,所謂流,就是沒有界限的一串資料。大家可以想想河裡的水流,它們是連城有一片的,期間沒有界限。TCP底層并不了解上層業務資料的具體含義,他會根據TCP緩沖區的實際情況進行包的劃分,是以在業務上認為,一個完整的包可能會被TCP拆分成多個包進行發送,也有可能把多個小的包封裝成一個大的的資料包進行發送,這就是所謂的TCP的粘包和拆包機制。

2.1 TCP粘包和拆包問題說明

假設用戶端分别發送了兩個資料包D1 和D2給服務商廈 ,由于服務端一次讀取到的位元組數是不确定的,故可能存在4種情況:

  1. 服務端分兩次讀取到了兩個獨立的資料包,分别是D1 和 D2,沒有粘包和拆包
  2. 服務端一次接收到了兩個資料包,D1和D2粘合在一起,被稱為TCP粘包
  3. 服務端分兩次讀取到了兩個資料包,第一次讀取到了完整的D1包和D2包的部分内容,第二次讀取到了D2包的剩餘内容,這稱為TCP拆包
  4. 服務端分兩次讀取到了兩個資料包,第一次讀取到了D1包的部分内容D1_1,第二次讀取到了D1包的剩餘内容D1_2和D2包的整包。
  5. 如果此時服務端TCP接收滑窗非常小,而資料包D1和D2比較大,很可能會發生第5種可能,即服務端分多次才能将D1和D2包接收完全,期間發生多次拆包
Netty詳解(五):Netty TCP粘包 拆包1. 概述2. TCP底層的粘包和拆包機制3. Netty提供半包解碼器來解決TCP粘包/拆包問題4. 總結

2.2 TCP粘包/拆包發生的原因

問題産生的原因有三個,分别如下:

  1. 應用程式write寫入的位元組大小大于套接口發送緩沖區大小
  2. 0進行MSS大小的TCP分段
  3. 以太網幀的payload大于MTU進行IP分片
  • MSS:TCP傳輸層(傳輸幀)最大封包段長度。Maxitum Segment Size最大分段大小。為了達到最佳的傳輸效能TCP協定在建立連接配接的時候通常要協商雙方的MSS值,這個值TCP協定在實作的時候往往用MTU值代替,值往往為1460.IPV6中通常是1440
  • MTU:Maxitum Transmission Unit最大傳輸單元。這個最大傳輸單元實際上和鍊路層協定有着密切的關系,EthernetII 幀的結構DMAC+SMAC+Type+Data+CRC。由于以太網傳輸限制,每個以太網幀都有最小的大小64bytes,最大不能超過1518bytes,對于小于或大于這個限制的以太網幀我們都可以視之為錯誤的資料幀,一般的以太網轉發裝置會丢棄這些資料幀。
Netty詳解(五):Netty TCP粘包 拆包1. 概述2. TCP底層的粘包和拆包機制3. Netty提供半包解碼器來解決TCP粘包/拆包問題4. 總結

2.3 粘包問題的解決政策

底層的TCP無法了解上層的業務資料,需要在上層的應用協定棧調來來解決。

  1. 消息定義,例如每個封包的長度大小固定200位元組,如果不夠,空格補空位。
  2. 在包尾增加回車換行符,如FTP協定
  3. 将消息分成消息頭和消息體,消息頭中包含表示消息總長度(或者消息體長度)的字段,通常設計思路為消息頭的第一個字段使用int32來表示消息的總長度
  4. 更複雜的應用層協定

3. Netty提供半包解碼器來解決TCP粘包/拆包問題

3.1 LineBasedFrameDecoder

LineBasedFrameDecoder 是依次周遊ByteBuf中的可讀位元組,判斷看是否有\n 或 \r\n,如果有,就以此位置為結束位置,以換行符為結束标志的解碼器。它支援攜帶結束符或者不攜帶結束符兩種解碼方式,同時支援配置單行的最大長度。如果連續讀取到最大長度後仍然沒有發現換行符,就會抛出異常,同時忽略掉之前讀到的異常碼流。

StringDecoder的功能非常簡單,就是将接受到的對象轉換成字元串,然後繼續調用後面的Handler。LineBasedFrameDecoder+StringDecoder組合就是按行切換的文本解碼器,它被設計用來支援TCP的粘包和拆包。

在ChannelInitializer類中添加LineBasedFrameDecoder+StringDecoder

EventLoopGroup group=new NioEventLoopGroup();
        try{
            Bootstrap b=new Bootstrap();
            //Channel需要設定為NioSocketChannel,然後為其添加Handler
            b.group(group).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY,true)
            .handler(new ChannelInitializer<SocketChannel>(){
                //為了簡單直接建立匿名内部類,實作initChannel方法
                //其作用是當建立NioSocketChannel成功之後,在進行初始化時,
                //将它的ChannelHandler設定到ChannelPipeline中,用于處理網絡I/O事件
                @Override
                public void initChannel(SocketChannel ch) throws Exception{
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            //發起異步連接配接,然後調用同步方法等待連接配接成功
            ChannelFuture f=b.connect(host,port).sync();
            //當用戶端連接配接關閉之後,用戶端主函數退出,退出前釋放NIO線程組的資源
            f.channel().closeFuture().sync();
        }finally{

        }
           

TimeServerHandler.java 關鍵代碼

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
	    throws Exception {
		System.out.println("channelRead start");
		ByteBuf buf = (ByteBuf) msg;
		byte[] req = new byte[buf.readableBytes()];
		buf.readBytes(req);
		String body = new String(req, "UTF-8");
		System.out.println("The time server receive order : " + body);
		String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
				System.currentTimeMillis()).toString() : "BAD ORDER";
		ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
		ctx.write(resp);
		System.out.println("channelRead end");
	}

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		System.out.println("channelReadComplete start");
		ctx.flush();
		System.out.println("channelReadComplete end");
    }
           

TimeClientHandler.java 關鍵代碼

/**
     * Creates a client-side handler.
     */
    public TimeClientHandler() {
	req = ("QUERY TIME ORDER" + System.getProperty("line.separator"))
		.getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
	ByteBuf message = null;
	for (int i = 0; i < 100; i++) {
	    message = Unpooled.buffer(req.length);
	    message.writeBytes(req);
	    ctx.writeAndFlush(message);
	}
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
	    throws Exception {
	String body = (String) msg;
	System.out.println("Now is : " + body + " ; the counter is : "
		+ ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
	// 釋放資源
	logger.warning("Unexpected exception from downstream : "
		+ cause.getMessage());
	ctx.close();
    }
           

3.2 DelimiterBasedFrameDecoder

以分隔符作為碼流結束辨別的消息的解碼。示例:Echo服務,以$_作為分隔符。

EchoServer.java關鍵代碼

@Override
public void initChannel(SocketChannel ch) throws Exception{
    //建立分隔符緩沖對象ByteBuf,以$_為分隔符
   ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes());
   //1024表示單條消息的最大長度,當達到該長度後仍然沒有查找到分隔符
   //就抛出TooLongFrameException異常
   //第二個參數是分隔符緩沖對象
   new DelimiterBasedFrameDecoder(1024,delimiter));  //後續的ChannelHandler接收到的msg對象将會是完整的消息包
   ch.pipeline().addLast(new StringDecoder()); //将ByteBuf解碼成字元串對象 
   ch.pipeline().addLast(new EchoServerHandler());  //接收到的msg消息就是解碼後的字元串對象
}
           

DelimiterBasedFrameDecoder 有多個構造方法,這裡我們傳遞兩個參數:第一個1024表示單條最大長度,當達到該長度之後仍然沒有找到分隔符,就抛出TooLongFrameException異常,防止由于異常流缺失分隔符導緻的記憶體溢出,這就是Netty解碼器的可靠性保證。第二個參數就是分隔符緩沖對象。

EchoServerHandler.java 關鍵代碼

@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
   String body=(String)msg;
   System.out.println("This is " + ++counter + " times receive client : [" + body + "]");
   body+="$_"; //$_已被過濾掉了,是以這裡要拼接上
   ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
   ctx.writeAndFlush(echo);
}
           

由于DelimiterBasedFrameDecoder自動對請求消息進行了編碼,後續的ChannelHandler接受到的msg對象就是個完整的消息包;第二個ChannelHandler是StringDecoder,它将ByteBuffer解碼成字元串對象;第三個EchoServerHandler接受到的msg消息就是解碼後的字元串對象。

EchoClient.java 關鍵代碼

@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
   String body=(String)msg;
   System.out.println("This is " + ++counter + " times receive client : [" + body + "]");
   body+="$_"; //$_已被過濾掉了,是以這裡要拼接上
   ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
   ctx.writeAndFlush(echo);
}
           

EchoClientHandler.java 關鍵代碼

@Override
public void channelActive(ChannelHandlerContext ctx){
   for(int i=0;i<10;i++){
      ctx.writeAndFlush(Unpooled.copiedBuffer(ECHOREQ.getBytes()));
   }
}
           

3.3 FixedLengthFrameDecoder

FixedLengthFrameDecoder是固定長度解碼器,它能夠按照指定的長度對消息進行自動解碼,按照指定的長度對消息進行自動解碼,開發者不需要考慮TCP的粘包/拆包問題。

EchoServer.java 關鍵代碼

@Override
public void initChannel (SocketChannel ch) throws Exception{
   ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
   ch.pipeline().addLast(new StringDecoder());
   ch.pipeline().addLast(new EchoServerHandler()));
}
           

在服務端的ChannelPipeline中新增FixedLengthFrameDecoder,長度設定為20,然後再依次增加字元串解碼器和EchoHandler。

EchoServerHandler.java 關鍵代碼

@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
   System.out.println("Receive client : [" + msg + "]");
}
           

利用FixedLengthFrameDecoder解碼器,無論一次接收到多少資料報,他都會按照構造函數中設定的固定長度間解碼,如果是半包消息,FixedLengthFrameDecoder會緩存半包消息并等待下一個包到達後進行拼包,直到讀取到一個完整的包。

4. 總結

DelimiterBasedFrameDecoder 用于對使用分隔符結尾的消息間自動解碼,FixedLengthFrameDecoder用于對固定長度的消息進行自動解碼。有了上述兩種解碼器,再結合其他的解碼器,如字元串解碼器等,可以輕松完成對很多消息的自動解碼,而且不需要考慮TCP粘包和拆包問題。