網絡協定的基本要素
一個完備的網絡協定需要具備哪些基本要素
- 魔數:魔數是通信雙方協商的一個暗号,通常采用固定的幾個位元組表示。魔數的作用是防止任何人随便向伺服器的端口上發送資料。
- 協定版本号:随着業務需求的變化,協定可能需要對結構或字段進行改動,不同版本的協定對應的解析方法也是不同的。是以在生産級項目中強烈建議預留協定版本号這個字段。
- 序列化算法:表示資料發送方應該采用何種方法将請求的對象轉化為二進制,以及如何再将二進制轉化為對象
- 封包類型:封包可能存在不同的類型。例如在 RPC 架構中有請求、響應、心跳等類型的封包,在 IM 即時通信的場景中有登陸、建立群聊、發送消息、接收消息、退出群聊等類型的封包。
- 長度域字段:代表請求資料的長度,接收方根據長度域字段擷取一個完整的封包。
- 請求資料:通常為序列化之後得到的二進制流
- 狀态:狀态字段用于辨別請求是否正常。一般由被調用方設定。例如一次 RPC 調用失敗,狀态字段可被服務提供方設定為異常狀态。
- 保留字段:保留字段是可選項,為了應對協定更新的可能性,可以預留若幹位元組的保留字段,以備不時之需。
lua複制代碼+---------------------------------------------------------------+
| 魔數 2byte | 協定版本号 1byte | 序列化算法 1byte | 封包類型 1byte |
+---------------------------------------------------------------+
| 狀态 1byte | 保留字段 4byte | 資料長度 4byte |
+---------------------------------------------------------------+
| 資料内容 (長度不定) |
+---------------------------------------------------------------+
舉例如下:
如何實作自定義通信協定
Netty 作為一個非常優秀的網絡通信架構,已經為我們提供了非常豐富的編解碼抽象基類,幫助我們更友善地基于這些抽象基類擴充實作自定義協定。 Netty 常用編碼器類型:
- MessageToByteEncoder 對象編碼成位元組流;
- MessageToMessageEncoder 一種消息類型編碼成另外一種消息類型。
Netty 常用解碼器類型:
- ByteToMessageDecoder/ReplayingDecoder 将位元組流解碼為消息對象;
- MessageToMessageDecoder 将一種消息類型解碼為另外一種消息類型。
編解碼器可以分為一次解碼器和二次解碼器,一次解碼器用于解決 TCP 拆包/粘包問題,按協定解析後得到的位元組資料。如果你需要對解析後的位元組資料做對象模型的轉換,這時候便需要用到二次解碼器,同理編碼器的過程是反過來的。 一次編解碼器:MessageToByteEncoder/ByteToMessageDecoder。 二次編解碼器:MessageToMessageEncoder/MessageToMessageDecoder。
抽象編碼類
通過抽象編碼類的繼承圖可以看出,編碼類是 ChanneOutboundHandler 的抽象類實作,具體操作的是 Outbound 出站資料。
MessageToByteEncoder
MessageToByteEncoder 用于将對象編碼成位元組流,MessageToByteEncoder 提供了唯一的 encode 抽象方法,我們隻需要實作encode 方法即可完成自定義編碼。 編碼器實作非常簡單,不需要關注拆包/粘包問題。如下例子,展示了如何将字元串類型的資料寫入到 ByteBuf 執行個體,ByteBuf 執行個體将傳遞給 ChannelPipeline 連結清單中的下一個 ChannelOutboundHandler。
java複制代碼public class StringToByteEncoder extends MessageToByteEncoder<String> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String data, ByteBuf byteBuf) throws Exception {
byteBuf.writeBytes(data.getBytes());
}
}
encode什麼時候被調用的
MessageToByteEncoder 重寫了 ChanneOutboundHandler 的 write() 方法,其主要邏輯分為以下幾個步驟:
- acceptOutboundMessage 判斷是否有比對的消息類型,如果比對需要執行編碼流程,如果不比對直接繼續傳遞給下一個 ChannelOutboundHandler;
- 配置設定 ByteBuf 資源,預設使用堆外記憶體;
- 調用子類實作的 encode 方法完成資料編碼,一旦消息被成功編碼,會通過調用 ReferenceCountUtil.release(cast) 自動釋放;
- 如果 ByteBuf 可讀,說明已經成功編碼得到資料,然後寫入 ChannelHandlerContext 交到下一個節點;如果 ByteBuf 不可讀,則釋放 ByteBuf 資源,向下傳遞空的 ByteBuf 對象。
java複制代碼@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) { // 1. 消息類型是否比對
@SuppressWarnings("unchecked")
I cast = (I) msg;
buf = allocateBuffer(ctx, cast, preferDirect); // 2. 配置設定 ByteBuf 資源
try {
encode(ctx, cast, buf); // 3. 執行 encode 方法完成資料編碼
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
ctx.write(buf, promise); // 4. 向後傳遞寫事件
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
MessageToMessageEncoder
MessageToMessageEncoder 與 MessageToByteEncoder 類似,同樣隻需要實作 encode 方法。
MessageToMessageEncoder常用的實作子類有StringEncoder、LineEncoder、Base64Encoder等。
以StringEncoder為例看下MessageToMessageEncoder 的用法。
源碼示例如下:将 CharSequence 類型(String、StringBuilder、StringBuffer 等)轉換成 ByteBuf 類型,結合 StringDecoder 可以直接實作 String 類型資料的編解碼。
java複制代碼@Override
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
if (msg.length() == 0) {
return;
}
out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
}
抽象解碼類
解碼類是 ChanneInboundHandler 的抽象類實作,操作的是 Inbound 入站資料。解碼器實作的難度要遠大于編碼器,因為解碼器需要考慮拆包/粘包問題。
由于接收方有可能沒有接收到完整的消息,是以解碼架構需要對入站的資料做緩沖操作,直至擷取到完整的消息。
ByteToMessageDecoder
使用 ByteToMessageDecoder,Netty 會自動進行記憶體的釋放,我們不用操心太多的記憶體管理方面的邏輯。 首先,我們看下 ByteToMessageDecoder 定義的抽象方法:
java複制代碼public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.isReadable()) {
decodeRemovalReentryProtection(ctx, in, out);
}
}
}
我們隻需要實作一下decode()方法,這裡的 in 大家可以看到,傳遞進來的時候就已經是 ByteBuf 類型,是以我們不再需要強轉,第三個參數是List類型,我們通過往這個List裡面添加解碼後的結果對象,就可以自動實作結果往下一個 handler 進行傳遞,這樣,我們就實作了解碼的邏輯 handler。
為什麼存取解碼後的資料是用List
由于 TCP 粘包問題,ByteBuf 中可能包含多個有效的封包,或者不夠一個完整的封包。
Netty 會重複回調 decode() 方法,直到沒有解碼出新的完整封包可以添加到 List 當中,或者 ByteBuf 沒有更多可讀取的資料為止。
如果此時 List 的内容不為空,那麼會傳遞給 ChannelPipeline 中的下一個ChannelInboundHandler。
java複制代碼static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
//循環傳播 有多少調用多少
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
decodeLast
ByteToMessageDecoder 還定義了 decodeLast() 方法。為什麼抽象解碼器要比編碼器多一個 decodeLast() 方法呢?
因為 decodeLast 在 Channel 關閉後會被調用一次,主要用于處理 ByteBuf 最後剩餘的位元組資料。Netty 中 decodeLast 的預設實作隻是簡單調用了 decode() 方法。如果有特殊的業務需求,則可以通過重寫 decodeLast() 方法擴充自定義邏輯。
ReplayingDecoder
ByteToMessageDecoder 還有一個抽象子類是 ReplayingDecoder。它封裝了緩沖區的管理,在讀取緩沖區資料時,你無須再對位元組長度進行檢查。因為如果沒有足夠長度的位元組資料,ReplayingDecoder 将終止解碼操作。ReplayingDecoder 的性能相比直接使用 ByteToMessageDecoder 要慢,大部分情況下并不推薦使用 ReplayingDecoder。
MessageToMessageDecoder
與 ByteToMessageDecoder 不同的是 MessageToMessageDecoder 并不會對資料封包進行緩存,它主要用作轉換消息模型。 比較推薦的做法是使用 ByteToMessageDecoder 解析 TCP 協定,解決拆包/粘包問題。解析得到有效的 ByteBuf 資料,然後傳遞給後續的 MessageToMessageDecoder 做資料對象的轉換,具體流程如下圖所示:
案例如下:
java複制代碼public class MyTcpDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 檢查ByteBuf資料是否完整
if (in.readableBytes() < 4) {
return;
}
// 标記ByteBuf讀取索引位置
in.markReaderIndex();
// 讀取資料包長度
int length = in.readInt();
// 如果ByteBuf中可讀位元組數不足一個資料包長度,則将讀取索引位置恢複到标記位置,等待下一次讀取
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}
// 讀取資料
ByteBuf data = in.readBytes(length);
// 将資料傳遞給下一個解碼器進行轉換,轉換後的資料對象添加到out中
ctx.fireChannelRead(data);
}
}
public class MyDataDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
// 将讀取到的ByteBuf資料轉換為自定義的資料對象
MyData data = decode(msg);
if (data != null) {
// 将轉換後的資料對象添加到out中,表示解碼成功
out.add(data);
}
}
private MyData decode(ByteBuf buf) {
// 實作自定義的資料轉換邏輯
// ...
return myData;
}
}
實戰案例
如何判斷 ByteBuf 是否存在完整的封包? 最常用的做法就是通過讀取消息長度 dataLength 進行判斷。如果 ByteBuf 的可讀資料長度小于 dataLength,說明 ByteBuf 還不夠擷取一個完整的封包。在該協定前面的消息頭部分包含了魔數、協定版本号、資料長度等固定字段,共 14 個位元組。 固定字段長度和資料長度可以作為我們判斷消息完整性的依據,具體編碼器實作ByteToMessageDecoder邏輯示例如下:
java複制代碼/*
+---------------------------------------------------------------+
| 魔數 2byte | 協定版本号 1byte | 序列化算法 1byte | 封包類型 1byte |
+---------------------------------------------------------------+
| 狀态 1byte | 保留字段 4byte | 資料長度 4byte |
+---------------------------------------------------------------+
| 資料内容 (長度不定) |
+---------------------------------------------------------------+
*/
@Override
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 判斷 ByteBuf 可讀取位元組
if (in.readableBytes() < 14) {
return;
}
// 标記 ByteBuf 讀指針位置
in.markReaderIndex();
// 跳過魔數
in.skipBytes(2);
// 跳過協定版本号
in.skipBytes(1);
byte serializeType = in.readByte();
// 跳過封包類型
in.skipBytes(1);
// 跳過狀态字段
in.skipBytes(1);
// 跳過保留字段
in.skipBytes(4);
// 驗證封包長度,不對的話就重置指針位置
int dataLength = in.readInt();
if (in.readableBytes() < dataLength) {
in.resetReaderIndex(); // 重置 ByteBuf 讀指針位置,這一步很重要
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
// 方式一:在解碼器中就将資料解碼成具體的對象
SerializeService serializeService = getSerializeServiceByType(serializeType);
Object obj = serializeService.deserialize(data);
if (obj != null) {
out.add(obj);
}
// 方式二:這一步可以不在解碼器中處理,将請求資料讀取到一個新的byteBuf然後丢給handler處理
// 建立新的 ByteBuf 對象來存儲有效負載資料
ByteBuf payload = Unpooled.buffer((int) dataSize);
// 讀取有效負載資料并寫入到 payload 中
in.readBytes(payload);
if (payload.isReadable()) {
out.add(payload);
}
}
擴充
什麼是位元組序
位元組順序,是指資料在記憶體中的存放順序 使用16進制表示:0x12345678。在記憶體中有兩種方法存儲這個數字,
不同在于,對于某一個要表示的值,是把值的低位存到低位址,還是把值的高位存到低位址。
位元組順序分類
位元組的排列方式有兩種。例如,将一個多位元組對象的低位放在較小的位址處,高位放在較大的位址處,則稱小端序;反之則稱大端序。 典型的情況是整數在記憶體中的存放方式(小端/主機位元組序)和網絡傳輸的傳輸順序(大端/網絡位元組序)
1. 網絡位元組序(Network Order):TCP/IP各層協定将位元組序定義為大端(Big Endian) ,是以TCP/IP協定中使用的位元組序通常稱之為網絡位元組序。
- 是以當兩台主機之間要通過TCP/IP協定進行通信的時候就需要調用相應的函數進行主機序列(Little Endian)和網絡序(Big Endian)的轉換。這樣一來,也就達到了與CPU、作業系統無關,實作了網絡通信的标準化。
2. 主機位元組序(Host Order): 整數在記憶體中儲存的順序,它遵循小端(Little Endian)規則(不一定,要看主機的CPU架構,不過大多數都是小端)。
- 同型号計算機上寫的程式,在相同的系統上面運作是沒有問題的。
結論
Java中虛拟機屏蔽了大小端問題,如果是Java之間通信則無需考慮,隻有在跨語言通信的場景下才需要處理大小端問題。
回到本文的重點,我們在編解碼時也要注意大小端的問題,一般來說如果是小端序的話,我們用Netty取值的時候都要用LE結尾的方法。
原文連結:https://juejin.cn/post/7256039179086807095