天天看點

Netty源碼分析——拆包器的奧秘

基于Netty源代碼版本:netty-all-4.1.33.Final

前言

Netty 的解碼器有很多種,比如基于長度的,基于分割符的,私有協定的。但是,總體的思路都是一緻的。

拆包思路:當資料滿足了 解碼條件時,将其拆開。放到數組。然後發送到業務 handler 處理。

半包思路: 當讀取的資料不夠時,先存起來,直到滿足解碼條件後,放進數組。送到業務 handler 處理。

拆包的原理

在沒有netty的情況下,使用者如果自己需要拆包,基本原理就是不斷從TCP緩沖區中讀取資料,每次讀取完都需要判斷是否是一個完整的資料包

  • 1、如果目前讀取的資料不足以拼接成一個完整的業務資料包,那就保留該資料,繼續從tcp緩沖區中讀取,直到得到一個完整的資料包
  • 2、如果目前讀到的資料加上已經讀取的資料足夠拼接成一個資料包,那就将已經讀取的資料拼接上本次讀取的資料,夠成一個完整的業務資料包傳遞到業務邏輯,多餘的資料仍然保留,以便和下次讀到的資料嘗試拼接

netty中拆包的基類

netty 中的拆包也是如上這個原理,在每個SocketChannel中會一個 pipeline ,pipeline 内部會加入解碼器,解碼器都繼承基類 ByteToMessageDecoder,其内部會有一個累加器,每次從目前SocketChannel讀取到資料都會不斷累加,然後嘗試對累加到的資料進行拆包,拆成一個完整的業務資料包,下面我們先詳細分析下這個類

看名字的意思是:将位元組轉換成消息的解碼器。人如其名。而他本身也是一個入站 handler,是以,我們還是從他的 channelRead 方法入手。

channelRead 方法

我們先看看基類中的屬性,cumulation是此基類中的一個 ByteBuf 類型的累積區,每次從目前SocketChannel讀取到資料都會不斷累加,然後嘗試對累加到的資料進行拆包,拆成一個完整的業務資料包,如果不夠一個完整的資料包,則等待下一次從TCP的資料到來,繼續累加到此cumulation中

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
	ByteBuf cumulation;
    private Cumulator cumulator = MERGE_CUMULATOR;
    private boolean singleDecode;
    private boolean decodeWasNull;
    private boolean first;
	private byte decodeState = STATE_INIT;
    private int discardAfterReads = 16;
    private int numReads;
}
           

channelRead方法是每次從TCP緩沖區讀到資料都會調用的方法,觸發點在AbstractNioByteChannel的read方法中,裡面有個while循環不斷讀取,讀取到一次就觸發一次channelRead

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
	if (msg instanceof ByteBuf) {
		// 從對象池中取出一個List
		CodecOutputList out = CodecOutputList.newInstance();
		try {
			ByteBuf data = (ByteBuf) msg;
			first = cumulation == null;
			if (first) {
				// 第一次解碼
				cumulation = data;//直接指派
			} else {
				// 第二次解碼,就将 data 向 cumulation 追加,并釋放 data
				cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
			}
			// 得到追加後的 cumulation 後,調用 decode 方法進行解碼
            // 主要目的是将累積區cumulation的内容 decode 到 out數組中
			callDecode(ctx, cumulation, out);
		} catch (DecoderException e) {
			throw e;
		} catch (Exception e) {
			throw new DecoderException(e);
		} finally {
			// 如果累計區沒有可讀位元組了,有可能在上面callDecode方法中已經将cumulation全部讀完了,此時writerIndex==readerIndex
            // 每讀一個位元組,readerIndex會+1
			if (cumulation != null && !cumulation.isReadable()) {
				// 将次數歸零
				numReads = 0;
				// 釋放累計區,因為累計區裡面的位元組都全部讀完了
				cumulation.release();
				// 便于 gc
				cumulation = null;
				// 如果超過了 16 次,還有位元組沒有讀完,就将已經讀過的資料丢棄,将 readIndex 歸零。
			} else if (++ numReads >= discardAfterReads) {
				// We did enough reads already try to discard some bytes so we not risk to see a OOME.
				// See https://github.com/netty/netty/issues/4275
				numReads = 0;
				//将已經讀過的資料丢棄,将 readIndex 歸零。
				discardSomeReadBytes();
			}

			int size = out.size();
			decodeWasNull = !out.insertSinceRecycled();
			//循環數組,向後面的 handler 發送資料
			fireChannelRead(ctx, out, size);
			out.recycle();
		}
	} else {
		ctx.fireChannelRead(msg);
	}
}
           
  • 1、從對象池中取出一個空的數組。
  • 2、判斷成員變量是否是第一次使用,将 unsafe 中傳遞來的資料寫入到這個 cumulation 累積區中。
  • 3、寫到累積區後,在callDecode方法中調用子類的 decode 方法,嘗試将累積區的内容解碼,每成功解碼一個,就調用後面節點的 channelRead 方法。若沒有解碼成功,什麼都不做。
  • 4、如果累積區沒有未讀資料了,就釋放累積區。
  • 5、如果還有未讀資料,且解碼超過了 16 次(預設),就對累積區進行壓縮。将讀取過的資料清空,也就是将 readIndex 設定為0.
  • 6、調用 fireChannelRead 方法,将數組中的元素發送到後面的 handler 中。
  • 7、将數組清空。并還給對象池。

下面來說說詳細的步驟。

寫入累積區

如果目前累加器沒有資料,就直接跳過記憶體拷貝,直接将位元組容器的指針指向新讀取的資料,否則,調用累加器累加資料至位元組容器

ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
	// 第一次解碼
	cumulation = data;//直接指派
} else {
	// 第二次解碼,就将 data 向 cumulation 追加,并釋放 data
	cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
           

然後看到ByteToMessageDecoder的成員變量private Cumulator cumulator = MERGE_CUMULATOR;接下來看看 MERGE_CUMULATOR

public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
	@Override
	public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
		try {
			final ByteBuf buffer;
			if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
				|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
				// Expand cumulation (by replace it) when either there is not more room in the buffer
				// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
				// duplicate().retain() or if its read-only.
				//
				// See:
				// - https://github.com/netty/netty/issues/2327
				// - https://github.com/netty/netty/issues/1764
				buffer = expandCumulation(alloc, cumulation, in.readableBytes());
			} else {
				buffer = cumulation;
			}
			buffer.writeBytes(in);
			return buffer;
		} finally {
			// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
			// for whatever release (for example because of OutOfMemoryError)
			in.release();
		}
	}
};
           

MERGE_CUMULATOR是基類ByteToMessageDecoder中的一個靜态常量,其重寫了cumulate方法,下面我們看一下 MERGE_CUMULATOR 是如何将新讀取到的資料累加到位元組容器裡的

netty 中ByteBuf的抽象,使得累加非常簡單,通過一個簡單的api調用 buffer.writeBytes(in); 便将新資料累加到位元組容器中,為了防止位元組容器大小不夠,在累加之前還進行了擴容處理

static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
	ByteBuf oldCumulation = cumulation;
	cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
	cumulation.writeBytes(oldCumulation);
	oldCumulation.release();
	return cumulation;
}
           

擴容也是一個記憶體拷貝操作,新增的大小即是新讀取資料的大小

将累加到的資料傳遞給業務進行拆包

當資料追加到累積區之後,需要調用 decode 方法進行解碼,代碼如下:

public abstract class AbstractByteBuf extends ByteBuf {  
	@Override
    public boolean isReadable() {
		//寫的坐标大于讀的坐标則說明還有資料可讀
        return writerIndex > readerIndex;
    }
}

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
	protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
		try {
			// 如果累計區還有可讀位元組,循環解碼,因為這裡in有可能是粘包,即多次完整的資料包粘在一起,通過換行符連接配接
			// 下面的decode方法隻能處理一個完整的資料包,是以這裡循環處理粘包
			while (in.isReadable()) {
				int outSize = out.size();
				// 上次循環成功解碼
				if (outSize > 0) {
					// 處理一個粘包就 調用一次後面的業務 handler 的  ChannelRead 方法
					fireChannelRead(ctx, out, outSize);
					// 将 size 置為0
					out.clear();

					// Check if this handler was removed before continuing with decoding.
					// If it was removed, it is not safe to continue to operate on the buffer.
					//
					// See:
					// - https://github.com/netty/netty/issues/4635
					if (ctx.isRemoved()) {
						break;
					}
					outSize = 0;
				}
				// 得到可讀位元組數
				int oldInputLength = in.readableBytes();
				// 調用 decode 方法,将成功解碼後的資料放入道 out 數組中
				decodeRemovalReentryProtection(ctx, in, out);

				// Check if this handler was removed before continuing the loop.
				// If it was removed, it is not safe to continue to operate on the buffer.
				//
				// See https://github.com/netty/netty/issues/1664
				if (ctx.isRemoved()) {
					break;
				}

				if (outSize == out.size()) {
					if (oldInputLength == in.readableBytes()) {
						break;
					} else {
						continue;
					}
				}

				if (oldInputLength == in.readableBytes()) {
					throw new DecoderException(
							StringUtil.simpleClassName(getClass()) +
									".decode() did not read anything but decoded a message.");
				}

				if (isSingleDecode()) {
					break;
				}
			}
		} catch (DecoderException e) {
			throw e;
		} catch (Exception cause) {
			throw new DecoderException(cause);
		}
	}

    final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        decodeState = STATE_CALLING_CHILD_DECODE;
        try {
            decode(ctx, in, out);
        } finally {
            boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
            decodeState = STATE_INIT;
            if (removePending) {
                handlerRemoved(ctx);
            }
        }
    }
}
           

我們看看 fireChannelRead

static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
	if (msgs instanceof CodecOutputList) {
		fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
	} else {
		//将所有已解碼的資料向下業務hadder傳遞
		for (int i = 0; i < numElements; i++) {
			ctx.fireChannelRead(msgs.get(i));
		}
	}
}
           

該方法主要邏輯:隻要累積區還有未讀資料,就循環進行讀取。

  • 1、調用 decodeRemovalReentryProtection 方法,内部調用了子類重寫的 decode 方法,很明顯,這裡是個模闆模式。decode 方法的邏輯就是将累積區的内容按照約定進行解碼,如果成功解碼,就添加到數組中。同時該方法也會檢查該 handler 的狀态,如果被移除出 pipeline 了,就将累積區的内容直接重新整理到後面的 handler 中。
  • 2、如果 Context 節點被移除了,直接結束循環。如果解碼前的數組大小和解碼後的數組大小相等,且累積區的可讀位元組數沒有變化,說明此次讀取什麼都沒做,就直接結束。如果位元組數變化了,說明雖然數組沒有增加,但确實在讀取位元組,就再繼續讀取。
  • 3、如果上面的判斷過了,說明數組讀到資料了,但如果累積區的 readIndex 沒有變化,則抛出異常,說明沒有讀取資料,但數組卻增加了,子類的操作是不對的。
  • 4、如果是個單次解碼器,解碼一次就直接結束了,如果資料包一次就解碼完了,則下一次循環時 in.isReadable()就為false,因為 writerIndex = this.readerIndex 了

是以,這段代碼的關鍵就是子類需要重寫 decode 方法,将累積區的資料正确的解碼并添加到數組中。每添加一次成功,就會調用 fireChannelRead 方法,将數組中的資料傳遞給後面的 handler。完成之後将數組的 size 設定為 0.

是以,如果你的業務 handler 在這個地方可能會被多次調用。也可能一次也不調用。取決于數組中的值。

解碼器最主要的邏輯:

将 read 方法的資料讀取到累積區,使用解碼器解碼累積區的資料,解碼成功一個就放入到一個數組中,并将數組中的資料一次次的傳遞到後面的handler。

####清理位元組容器

業務拆包完成之後,隻是從累積區中取走了資料,但是這部分空間對于累積區來說依然保留着,而位元組容器每次累加位元組資料的時候都是将位元組資料追加到尾部,如果不對累積區做清理,那麼時間一長就會OOM,清理部分的代碼如下:

finally {
	// 如果累計區沒有可讀位元組了,有可能在上面callDecode方法中已經将cumulation全部讀完了,此時writerIndex==readerIndex
	// 每讀一個位元組,readerIndex會+1
	if (cumulation != null && !cumulation.isReadable()) {
		// 将次數歸零
		numReads = 0;
		// 釋放累計區,因為累計區裡面的位元組都全部讀完了
		cumulation.release();
		// 便于 gc
		cumulation = null;
		// 如果超過了 16 次,還有位元組沒有讀完,就将已經讀過的資料丢棄,将 readIndex 歸零。
	} else if (++ numReads >= discardAfterReads) {
		// We did enough reads already try to discard some bytes so we not risk to see a OOME.
		// See https://github.com/netty/netty/issues/4275
		numReads = 0;
		//将已經讀過的資料丢棄,将 readIndex 歸零。
		discardSomeReadBytes();
	}

	int size = out.size();
	decodeWasNull = !out.insertSinceRecycled();
	//循環數組,向後面的 handler 發送資料
	fireChannelRead(ctx, out, size);
	out.recycle();
}
           
  • 1、如果累積區沒有可讀資料了,将計數器歸零,并釋放累積區。
  • 2、如果不滿足上面的條件,且計數器超過了 16 次,就壓縮累積區的内容,壓縮手段是删除已讀的資料。将 readIndex 置為 0。還記得 ByteBuf 的指針結構嗎?
    Netty源碼分析——拆包器的奧秘
public abstract class AbstractByteBuf extends ByteBuf {   
	@Override
    public ByteBuf discardSomeReadBytes() {
        ensureAccessible();
        if (readerIndex == 0) {
            return this;
        }

        if (readerIndex == writerIndex) {
            adjustMarkers(readerIndex);
            writerIndex = readerIndex = 0;
            return this;
        }
		//讀指針超過了Buffer容量的一半時做清理工作
        if (readerIndex >= capacity() >>> 1) {
			//拷貝,從readerIndex開始,拷貝this.writerIndex - this.readerIndex 長度
            setBytes(0, this, readerIndex, writerIndex - readerIndex);
            writerIndex -= readerIndex;
            adjustMarkers(readerIndex);
			//将讀指針重置為0
            readerIndex = 0;
        }
        return this;
    }
}
           

我們看到discardSomeReadBytes 主要是将未讀的資料拷貝到原Buffer,重置 readerIndex 和 writerIndex

我們看到最後還調用 fireChannelRead 方法,嘗試将數組中的資料發送到後面的 handler。為什麼要這麼做。按道理,到這一步的時候,數組不可能是空,為什麼這裡還要這麼謹慎的再發送一次?

如果是單次解碼器,就需要發送了,因為單詞解碼器是不會在 callDecode 方法中發送的。

總結

可以說,ByteToMessageDecoder 是解碼器的核心所做,Netty 在這裡使用了模闆模式,留給子類擴充的方法就是 decode 方法。

主要邏輯就是将所有的資料全部放入累積區,子類從累積區取出資料進行解碼後放入到一個 數組中,ByteToMessageDecoder 會循環數組調用後面的 handler 方法,将資料一幀幀的發送到業務 handler 。完成這個的解碼邏輯。

使用這種方式,無論是粘包還是拆包,都可以完美的實作。

Netty 所有的解碼器,都可以在此類上擴充,一切取決于 decode 的實作。隻要遵守 ByteToMessageDecoder 的約定即可。

Netty中内置了幾個編解碼器,可以很簡單的處理包界限問題。

LengthFieldBasedFrameDecoder

通過在標頭增加消息體長度的解碼器,解析資料時首先擷取首部長度,然後定長讀取socket中的資料。

LineBasedFrameDecoder

換行符解碼器,封包尾部增加強定換行符rn,解析資料時以換行符作為封包結尾。

DelimiterBasedFrameDecoder

分隔符解碼器,使用特定分隔符作為封包的結尾,解析資料時以定義的分隔符作為封包結尾

FixedLengthFrameDecoder

定長解碼器,這個最簡單,消息體固定長度,解析資料時按長度讀取即可