天天看點

Netty 編解碼與TCP拆包粘包1、編解碼2、Netty粘包拆包

文章目錄

  • 1、編解碼
    • protostuff編解碼實作
  • 2、Netty粘包拆包
    • 解決方案

學習位址: https://dongzl.github.io/netty-handbook/#/

1、編解碼

Netty

涉及到編解碼的元件有

Channel

ChannelHandler

ChannelPipe

等,先大概了解下這幾個元件的作用。

ChannelHandler

ChannelHandler

充當了處理入站和出站資料的應用程式邏輯容器。例如,實作

ChannelInboundHandler

接口(或

ChannelInboundHandlerAdapter

),你就可以接收入站事件和資料,這些資料随後會被你的應用程式的業務邏輯處理。當你要給連接配接的用戶端發送響應時,也可以從

ChannelInboundHandler

沖刷資料。你的業務邏輯通常寫在一個或者多個

ChannelInboundHandler

中。

ChannelOutboundHandler

原理一樣,隻不過它是用來處理出站資料的。

ChannelPipeline

ChannelPipeline

提供了

ChannelHandler

鍊的容器。以用戶端應用程式為例,如果事件的運動方向是從用戶端到服務端的,那麼我們稱這些事件為出站的,即用戶端發送給服務端的資料會通過

pipeline

中的一系列

ChannelOutboundHandler

(

ChannelOutboundHandler

調用是從

tail

head

方向逐個調用每個

handler

的邏輯),并被這些

Handler

處理,反之則稱為入站的,入站隻調用

pipeline

裡的

ChannelInboundHandler

邏輯(

ChannelInboundHandler

調用是從

head

tail

方向逐個調用每個handler的邏輯)。

Netty 編解碼與TCP拆包粘包1、編解碼2、Netty粘包拆包

編碼解碼器

當你通過

Netty

發送或者接受一個消息的時候,就将會發生一次資料轉換。入站消息會被解碼:從位元組轉換為另一種格式(比如

java

對象);如果是出站消息,它會被編碼成位元組。

Netty

提供了一系列實用的編碼解碼器,他們都實作了

ChannelInboundHadnler

或者

ChannelOutboundHandler

接口。在這些類中,

channelRead

方法已經被重寫了。

以入站為例,對于每個從入站Channel讀取的消息,這個方法會被調用。随後,它将調用由已知解碼器 所提供的decode()方法進行解碼,并将已經解碼的位元組轉發給ChannelPipeline中的下一個ChannelInboundHandler。

Netty提供了很多編解碼器,比如編解碼字元串的StringEncoder和StringDecoder,編解碼對象的ObjectEncoder和ObjectDecoder 等。如果要實作高效的編解碼可以用protobuf,但是protobuf需要維護大量的proto檔案比較麻煩,現在一般可以使用protostuff。 protostuff是一個基于protobuf實作的序列化方法,它較于protobuf最明顯的好處是,在幾乎不損耗性能的情況下做到了不用我們 寫.proto檔案來實作序列化。使用它也非常簡單,代碼如下:

protostuff編解碼實作

1、引入依賴

<dependency>
            <groupId>com.dyuproject.protostuff</groupId>
            <artifactId>protostuff-api</artifactId>
            <version>1.0.10</version>
        </dependency>
        <dependency>
            <groupId>com.dyuproject.protostuff</groupId>
            <artifactId>protostuff-core</artifactId>
            <version>1.0.10</version>
        </dependency>
        <dependency>
            <groupId>com.dyuproject.protostuff</groupId>
            <artifactId>protostuff-runtime</artifactId>
  <version>1.0.10</version>
           

2、序列化工具類

public class ProtostuffUtil {

    private static final Map<Class<?>, Schema<?>> CACHED_SCHEMA = new ConcurrentHashMap<>();

    private static <T> Schema<T> getSchema(Class<T> clazz) {
        @SuppressWarnings("unchecked")
        Schema<T> schema = (Schema<T>) CACHED_SCHEMA.get(clazz);
        if (schema == null) {
            schema = RuntimeSchema.getSchema(clazz);
            if (schema != null) {
                CACHED_SCHEMA.put(clazz, schema);
            }
        }
        return schema;
    }

    /**
     * 序列化
     *
     * @param obj
     * @return
     */
    public static <T> byte[] serializer(T obj) {
        @SuppressWarnings("unchecked")
        Class<T> clazz = (Class<T>) obj.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        try {
            Schema<T> schema = getSchema(clazz);
            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        } finally {
            buffer.clear();
        }
    }

    /**
     * 反序列化
     *
     * @param data
     * @param clazz
     * @return
     */
    public static <T> T deserializer(byte[] data, Class<T> clazz) {
        try {
            T obj = clazz.newInstance();
            Schema<T> schema = getSchema(clazz);
            ProtostuffIOUtil.mergeFrom(data, obj, schema);
            return obj;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

}
           

3、編碼器

public class ProtoEncoder extends MessageToByteEncoder<Object> {

    private final Class<?> genericClass;

    public ProtoEncoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (genericClass.isInstance(in)) {
            byte[] data = ProtostuffUtil.serializer(in);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
}

           

4、解碼器

public class ProtoDecoder extends ByteToMessageDecoder {

    private final Class<?> genericClass;

    public ProtoDecoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }

    @Override
    public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int dataLength = in.readInt();
        if (dataLength < 0) {
            ctx.close();
        }
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);
        Object obj = ProtostuffUtil.deserializer(data, genericClass);
        out.add(obj);
    }
}
           

2、Netty粘包拆包

TCP

是一個流協定,就是沒有界限的一長串二進制資料。

TCP

作為傳輸層協定并不不了解上層業務資料的具體含義,它會根據

TCP

緩沖區 的實際情況進行資料包的劃分,是以在業務上認為是一個完整的包,可能會被

TCP

拆分成多個包進行發送,也有可能把多個小的包封裝成 一個大的資料包發送,這就是所謂的

TCP

粘包和拆包問題。面向流的通信是無消息保護邊界的。 如下圖所示,

client

發了兩個資料包

D1

D2

,但是

server

端可能會收到如下幾種情況的資料。

Netty 編解碼與TCP拆包粘包1、編解碼2、Netty粘包拆包

解決方案

  1. 消息定長度,傳輸的資料大小固定長度,例如每段的長度固定為100位元組,如果不夠空位補空格
  2. 在資料包尾部添加特殊分隔符,比如下劃線,中劃線等,這種方法簡單易行,但選擇分隔符的時候一定要注意每條資料的内部一定不 能出現分隔符。
  3. 發送長度:發送每條資料的時候,将資料的長度一并發送,比如可以選擇每條資料的前4位是資料的長度,應用層處理時可以根據長度來判斷每條資料的開始和結束。

Netty提供了多個解碼器,可以進行分包的操作,如下:

  • LineBasedFrameDecoder (回車換行分包)
  • DelimiterBasedFrameDecoder(特殊分隔符分包)
  • FixedLengthFrameDecoder(固定長度封包來分包)

現在常用的是第三種解決方案:

  • 先定義一個協定包
/**
 * @Author xiao7
 * @Description 協定包
 * @Date 8:32 下午 2021/6/24
 **/
public class MessageProtocol {
    /**
     * 資訊長度
     */
    private int len;
    /**
     * 内容
     */
    private byte[] content;

    public int getLen() {
        return len;
    }

    public void setLen(int len) {
        this.len = len;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }
}
           
  • 定義編碼器
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyMessageEncoder encode 方法被調用");
        // 寫長度
        out.writeInt(msg.getLen());
        // 寫内容
        out.writeBytes(msg.getContent());
    }
}

           
  • 定義解碼器
public class MyMessageDecoder extends ReplayingDecoder<Void> {
    private int length = 0;
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyMessageDecoder decode 被調用");
        // 可讀必須大于int長度才去讀
        if (in.readableBytes() >= 4) {
            if (length == 0) {
                length = in.readInt();
            }
            // 讀完長度後,再去讀内容長度,長度不足length的時候就不讀,等下一個包再說
            if (in.readableBytes() >= length) {
                // 足夠長了就讀一個包
                byte[] content = new byte[length];
                in.readBytes(content);

                //封裝成 MessageProtocol 對象,放入 out, 傳遞下一個handler業務處理
                MessageProtocol messageProtocol = new MessageProtocol();
                messageProtocol.setLen(length);
                messageProtocol.setContent(content);

                out.add(messageProtocol);
            }
            // 不讀直接跳過這個包
            else {
                return;
            }

            // 讀完一個包,記得歸零需要讀的長度
            length = 0;
        }

    }
}
           
  • 最後給用戶端配置編解碼器,測試就可以了。