天天看點

怎麼使用Netty解碼自定義通信協定

作者:java小悠

網絡協定的基本要素

一個完備的網絡協定需要具備哪些基本要素

  1. 魔數:魔數是通信雙方協商的一個暗号,通常采用固定的幾個位元組表示。魔數的作用是防止任何人随便向伺服器的端口上發送資料。
  2. 協定版本号:随着業務需求的變化,協定可能需要對結構或字段進行改動,不同版本的協定對應的解析方法也是不同的。是以在生産級項目中強烈建議預留協定版本号這個字段。
  3. 序列化算法:表示資料發送方應該采用何種方法将請求的對象轉化為二進制,以及如何再将二進制轉化為對象
  4. 封包類型:封包可能存在不同的類型。例如在 RPC 架構中有請求、響應、心跳等類型的封包,在 IM 即時通信的場景中有登陸、建立群聊、發送消息、接收消息、退出群聊等類型的封包。
  5. 長度域字段:代表請求資料的長度,接收方根據長度域字段擷取一個完整的封包。
  6. 請求資料:通常為序列化之後得到的二進制流
  7. 狀态:狀态字段用于辨別請求是否正常。一般由被調用方設定。例如一次 RPC 調用失敗,狀态字段可被服務提供方設定為異常狀态。
  8. 保留字段:保留字段是可選項,為了應對協定更新的可能性,可以預留若幹位元組的保留字段,以備不時之需。
lua複制代碼+---------------------------------------------------------------+

| 魔數 2byte | 協定版本号 1byte | 序列化算法 1byte | 封包類型 1byte  |

+---------------------------------------------------------------+

| 狀态 1byte |        保留字段 4byte     |      資料長度 4byte     | 

+---------------------------------------------------------------+

|                   資料内容 (長度不定)                          |

+---------------------------------------------------------------+
           

舉例如下:

怎麼使用Netty解碼自定義通信協定

如何實作自定義通信協定

Netty 作為一個非常優秀的網絡通信架構,已經為我們提供了非常豐富的編解碼抽象基類,幫助我們更友善地基于這些抽象基類擴充實作自定義協定。 Netty 常用編碼器類型:

  • MessageToByteEncoder 對象編碼成位元組流;
  • MessageToMessageEncoder 一種消息類型編碼成另外一種消息類型。

Netty 常用解碼器類型:

  • ByteToMessageDecoder/ReplayingDecoder 将位元組流解碼為消息對象;
  • MessageToMessageDecoder 将一種消息類型解碼為另外一種消息類型。

編解碼器可以分為一次解碼器和二次解碼器,一次解碼器用于解決 TCP 拆包/粘包問題,按協定解析後得到的位元組資料。如果你需要對解析後的位元組資料做對象模型的轉換,這時候便需要用到二次解碼器,同理編碼器的過程是反過來的。 一次編解碼器:MessageToByteEncoder/ByteToMessageDecoder。 二次編解碼器:MessageToMessageEncoder/MessageToMessageDecoder。

抽象編碼類

怎麼使用Netty解碼自定義通信協定

通過抽象編碼類的繼承圖可以看出,編碼類是 ChanneOutboundHandler 的抽象類實作,具體操作的是 Outbound 出站資料。

MessageToByteEncoder

MessageToByteEncoder 用于将對象編碼成位元組流,MessageToByteEncoder 提供了唯一的 encode 抽象方法,我們隻需要實作encode 方法即可完成自定義編碼。 編碼器實作非常簡單,不需要關注拆包/粘包問題。如下例子,展示了如何将字元串類型的資料寫入到 ByteBuf 執行個體,ByteBuf 執行個體将傳遞給 ChannelPipeline 連結清單中的下一個 ChannelOutboundHandler。

java複制代碼public class StringToByteEncoder extends MessageToByteEncoder<String> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, String data, ByteBuf byteBuf) throws Exception {
        byteBuf.writeBytes(data.getBytes());
    }
}
           

encode什麼時候被調用的

MessageToByteEncoder 重寫了 ChanneOutboundHandler 的 write() 方法,其主要邏輯分為以下幾個步驟:

  1. acceptOutboundMessage 判斷是否有比對的消息類型,如果比對需要執行編碼流程,如果不比對直接繼續傳遞給下一個 ChannelOutboundHandler;
  2. 配置設定 ByteBuf 資源,預設使用堆外記憶體;
  3. 調用子類實作的 encode 方法完成資料編碼,一旦消息被成功編碼,會通過調用 ReferenceCountUtil.release(cast) 自動釋放;
  4. 如果 ByteBuf 可讀,說明已經成功編碼得到資料,然後寫入 ChannelHandlerContext 交到下一個節點;如果 ByteBuf 不可讀,則釋放 ByteBuf 資源,向下傳遞空的 ByteBuf 對象。
java複制代碼@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) { // 1. 消息類型是否比對
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            buf = allocateBuffer(ctx, cast, preferDirect); // 2. 配置設定 ByteBuf 資源
            try {
                encode(ctx, cast, buf); // 3. 執行 encode 方法完成資料編碼
            } finally {
                ReferenceCountUtil.release(cast);
            }
            if (buf.isReadable()) {
                ctx.write(buf, promise); // 4. 向後傳遞寫事件
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if (buf != null) {
            buf.release();
        }
    }
}
           

MessageToMessageEncoder

MessageToMessageEncoder 與 MessageToByteEncoder 類似,同樣隻需要實作 encode 方法。

MessageToMessageEncoder常用的實作子類有StringEncoder、LineEncoder、Base64Encoder等。

以StringEncoder為例看下MessageToMessageEncoder 的用法。

源碼示例如下:将 CharSequence 類型(String、StringBuilder、StringBuffer 等)轉換成 ByteBuf 類型,結合 StringDecoder 可以直接實作 String 類型資料的編解碼。

java複制代碼@Override
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
    if (msg.length() == 0) {
        return;
    }
    out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
}
           

抽象解碼類

解碼類是 ChanneInboundHandler 的抽象類實作,操作的是 Inbound 入站資料。解碼器實作的難度要遠大于編碼器,因為解碼器需要考慮拆包/粘包問題。

由于接收方有可能沒有接收到完整的消息,是以解碼架構需要對入站的資料做緩沖操作,直至擷取到完整的消息。

怎麼使用Netty解碼自定義通信協定

ByteToMessageDecoder

使用 ByteToMessageDecoder,Netty 會自動進行記憶體的釋放,我們不用操心太多的記憶體管理方面的邏輯。 首先,我們看下 ByteToMessageDecoder 定義的抽象方法:

java複制代碼public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
    protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.isReadable()) {
            decodeRemovalReentryProtection(ctx, in, out);
        }
    }
}
           

我們隻需要實作一下decode()方法,這裡的 in 大家可以看到,傳遞進來的時候就已經是 ByteBuf 類型,是以我們不再需要強轉,第三個參數是List類型,我們通過往這個List裡面添加解碼後的結果對象,就可以自動實作結果往下一個 handler 進行傳遞,這樣,我們就實作了解碼的邏輯 handler。

怎麼使用Netty解碼自定義通信協定

為什麼存取解碼後的資料是用List

由于 TCP 粘包問題,ByteBuf 中可能包含多個有效的封包,或者不夠一個完整的封包。

Netty 會重複回調 decode() 方法,直到沒有解碼出新的完整封包可以添加到 List 當中,或者 ByteBuf 沒有更多可讀取的資料為止。

如果此時 List 的内容不為空,那麼會傳遞給 ChannelPipeline 中的下一個ChannelInboundHandler。

java複制代碼static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
    for (int i = 0; i < numElements; i ++) {
        //循環傳播  有多少調用多少
        ctx.fireChannelRead(msgs.getUnsafe(i));
    }
}
           

decodeLast

ByteToMessageDecoder 還定義了 decodeLast() 方法。為什麼抽象解碼器要比編碼器多一個 decodeLast() 方法呢?

因為 decodeLast 在 Channel 關閉後會被調用一次,主要用于處理 ByteBuf 最後剩餘的位元組資料。Netty 中 decodeLast 的預設實作隻是簡單調用了 decode() 方法。如果有特殊的業務需求,則可以通過重寫 decodeLast() 方法擴充自定義邏輯。

ReplayingDecoder

ByteToMessageDecoder 還有一個抽象子類是 ReplayingDecoder。它封裝了緩沖區的管理,在讀取緩沖區資料時,你無須再對位元組長度進行檢查。因為如果沒有足夠長度的位元組資料,ReplayingDecoder 将終止解碼操作。ReplayingDecoder 的性能相比直接使用 ByteToMessageDecoder 要慢,大部分情況下并不推薦使用 ReplayingDecoder。

MessageToMessageDecoder

與 ByteToMessageDecoder 不同的是 MessageToMessageDecoder 并不會對資料封包進行緩存,它主要用作轉換消息模型。 比較推薦的做法是使用 ByteToMessageDecoder 解析 TCP 協定,解決拆包/粘包問題。解析得到有效的 ByteBuf 資料,然後傳遞給後續的 MessageToMessageDecoder 做資料對象的轉換,具體流程如下圖所示:

怎麼使用Netty解碼自定義通信協定

案例如下:

java複制代碼public class MyTcpDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 檢查ByteBuf資料是否完整
        if (in.readableBytes() < 4) {
            return;
        }

        // 标記ByteBuf讀取索引位置
        in.markReaderIndex();

        // 讀取資料包長度
        int length = in.readInt();

        // 如果ByteBuf中可讀位元組數不足一個資料包長度,則将讀取索引位置恢複到标記位置,等待下一次讀取
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }

        // 讀取資料
        ByteBuf data = in.readBytes(length);

        // 将資料傳遞給下一個解碼器進行轉換,轉換後的資料對象添加到out中
        ctx.fireChannelRead(data);
    }
}

public class MyDataDecoder extends MessageToMessageDecoder<ByteBuf> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        // 将讀取到的ByteBuf資料轉換為自定義的資料對象
        MyData data = decode(msg);
        if (data != null) {
            // 将轉換後的資料對象添加到out中,表示解碼成功
            out.add(data);
        }
    }

    private MyData decode(ByteBuf buf) {
        // 實作自定義的資料轉換邏輯
        // ...
        return myData;
    }
}
           

實戰案例

如何判斷 ByteBuf 是否存在完整的封包? 最常用的做法就是通過讀取消息長度 dataLength 進行判斷。如果 ByteBuf 的可讀資料長度小于 dataLength,說明 ByteBuf 還不夠擷取一個完整的封包。在該協定前面的消息頭部分包含了魔數、協定版本号、資料長度等固定字段,共 14 個位元組。 固定字段長度和資料長度可以作為我們判斷消息完整性的依據,具體編碼器實作ByteToMessageDecoder邏輯示例如下:

java複制代碼/*
+---------------------------------------------------------------+
| 魔數 2byte | 協定版本号 1byte | 序列化算法 1byte | 封包類型 1byte  |
+---------------------------------------------------------------+
| 狀态 1byte |        保留字段 4byte     |      資料長度 4byte     | 
+---------------------------------------------------------------+
|                   資料内容 (長度不定)                          |
+---------------------------------------------------------------+
 */
@Override
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    // 判斷 ByteBuf 可讀取位元組
    if (in.readableBytes() < 14) { 
        return;
    }
    // 标記 ByteBuf 讀指針位置
    in.markReaderIndex();
    // 跳過魔數
    in.skipBytes(2);
    // 跳過協定版本号
    in.skipBytes(1);
    byte serializeType = in.readByte();
     // 跳過封包類型
    in.skipBytes(1);
    // 跳過狀态字段
    in.skipBytes(1);
    // 跳過保留字段
    in.skipBytes(4);
    
    // 驗證封包長度,不對的話就重置指針位置
    int dataLength = in.readInt();
    if (in.readableBytes() < dataLength) {
        in.resetReaderIndex(); // 重置 ByteBuf 讀指針位置,這一步很重要
        return;
    }
    byte[] data = new byte[dataLength];
    in.readBytes(data);
    // 方式一:在解碼器中就将資料解碼成具體的對象
    SerializeService serializeService = getSerializeServiceByType(serializeType);
    Object obj = serializeService.deserialize(data);
    if (obj != null) {
        out.add(obj);
    }
    // 方式二:這一步可以不在解碼器中處理,将請求資料讀取到一個新的byteBuf然後丢給handler處理
    // 建立新的 ByteBuf 對象來存儲有效負載資料
    ByteBuf payload = Unpooled.buffer((int) dataSize);

    // 讀取有效負載資料并寫入到 payload 中
    in.readBytes(payload);
    if (payload.isReadable()) {
        out.add(payload);
    }
}
           

擴充

什麼是位元組序

位元組順序,是指資料在記憶體中的存放順序 使用16進制表示:0x12345678。在記憶體中有兩種方法存儲這個數字,

怎麼使用Netty解碼自定義通信協定

不同在于,對于某一個要表示的值,是把值的低位存到低位址,還是把值的高位存到低位址。

位元組順序分類

位元組的排列方式有兩種。例如,将一個多位元組對象的低位放在較小的位址處,高位放在較大的位址處,則稱小端序;反之則稱大端序。 典型的情況是整數在記憶體中的存放方式(小端/主機位元組序)和網絡傳輸的傳輸順序(大端/網絡位元組序)

1. 網絡位元組序(Network Order):TCP/IP各層協定将位元組序定義為大端(Big Endian) ,是以TCP/IP協定中使用的位元組序通常稱之為網絡位元組序。

  • 是以當兩台主機之間要通過TCP/IP協定進行通信的時候就需要調用相應的函數進行主機序列(Little Endian)和網絡序(Big Endian)的轉換。這樣一來,也就達到了與CPU、作業系統無關,實作了網絡通信的标準化。

2. 主機位元組序(Host Order): 整數在記憶體中儲存的順序,它遵循小端(Little Endian)規則(不一定,要看主機的CPU架構,不過大多數都是小端)。

  • 同型号計算機上寫的程式,在相同的系統上面運作是沒有問題的。

結論

Java中虛拟機屏蔽了大小端問題,如果是Java之間通信則無需考慮,隻有在跨語言通信的場景下才需要處理大小端問題。

回到本文的重點,我們在編解碼時也要注意大小端的問題,一般來說如果是小端序的話,我們用Netty取值的時候都要用LE結尾的方法。

原文連結:https://juejin.cn/post/7256039179086807095

繼續閱讀