文章目錄
- 1、編解碼
-
- protostuff編解碼實作
- 2、Netty粘包拆包
-
- 解決方案
學習位址: https://dongzl.github.io/netty-handbook/#/
1、編解碼
Netty
涉及到編解碼的元件有
Channel
、
ChannelHandler
、
ChannelPipe
等,先大概了解下這幾個元件的作用。
ChannelHandler
ChannelHandler
充當了處理入站和出站資料的應用程式邏輯容器。例如,實作
ChannelInboundHandler
接口(或
ChannelInboundHandlerAdapter
),你就可以接收入站事件和資料,這些資料随後會被你的應用程式的業務邏輯處理。當你要給連接配接的用戶端發送響應時,也可以從
ChannelInboundHandler
沖刷資料。你的業務邏輯通常寫在一個或者多個
ChannelInboundHandler
中。
ChannelOutboundHandler
原理一樣,隻不過它是用來處理出站資料的。
ChannelPipeline
ChannelPipeline
提供了
ChannelHandler
鍊的容器。以用戶端應用程式為例,如果事件的運動方向是從用戶端到服務端的,那麼我們稱這些事件為出站的,即用戶端發送給服務端的資料會通過
pipeline
中的一系列
ChannelOutboundHandler
(
ChannelOutboundHandler
調用是從
tail
到
head
方向逐個調用每個
handler
的邏輯),并被這些
Handler
處理,反之則稱為入站的,入站隻調用
pipeline
裡的
ChannelInboundHandler
邏輯(
ChannelInboundHandler
調用是從
head
到
tail
方向逐個調用每個handler的邏輯)。

編碼解碼器
當你通過
Netty
發送或者接受一個消息的時候,就将會發生一次資料轉換。入站消息會被解碼:從位元組轉換為另一種格式(比如
java
對象);如果是出站消息,它會被編碼成位元組。
Netty
提供了一系列實用的編碼解碼器,他們都實作了
ChannelInboundHadnler
或者
ChannelOutboundHandler
接口。在這些類中,
channelRead
方法已經被重寫了。
以入站為例,對于每個從入站Channel讀取的消息,這個方法會被調用。随後,它将調用由已知解碼器 所提供的decode()方法進行解碼,并将已經解碼的位元組轉發給ChannelPipeline中的下一個ChannelInboundHandler。
Netty提供了很多編解碼器,比如編解碼字元串的StringEncoder和StringDecoder,編解碼對象的ObjectEncoder和ObjectDecoder 等。如果要實作高效的編解碼可以用protobuf,但是protobuf需要維護大量的proto檔案比較麻煩,現在一般可以使用protostuff。 protostuff是一個基于protobuf實作的序列化方法,它較于protobuf最明顯的好處是,在幾乎不損耗性能的情況下做到了不用我們 寫.proto檔案來實作序列化。使用它也非常簡單,代碼如下:
protostuff編解碼實作
1、引入依賴
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-api</artifactId>
<version>1.0.10</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.0.10</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.0.10</version>
2、序列化工具類
public class ProtostuffUtil {
private static final Map<Class<?>, Schema<?>> CACHED_SCHEMA = new ConcurrentHashMap<>();
private static <T> Schema<T> getSchema(Class<T> clazz) {
@SuppressWarnings("unchecked")
Schema<T> schema = (Schema<T>) CACHED_SCHEMA.get(clazz);
if (schema == null) {
schema = RuntimeSchema.getSchema(clazz);
if (schema != null) {
CACHED_SCHEMA.put(clazz, schema);
}
}
return schema;
}
/**
* 序列化
*
* @param obj
* @return
*/
public static <T> byte[] serializer(T obj) {
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(clazz);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
/**
* 反序列化
*
* @param data
* @param clazz
* @return
*/
public static <T> T deserializer(byte[] data, Class<T> clazz) {
try {
T obj = clazz.newInstance();
Schema<T> schema = getSchema(clazz);
ProtostuffIOUtil.mergeFrom(data, obj, schema);
return obj;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
3、編碼器
public class ProtoEncoder extends MessageToByteEncoder<Object> {
private final Class<?> genericClass;
public ProtoEncoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
if (genericClass.isInstance(in)) {
byte[] data = ProtostuffUtil.serializer(in);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}
4、解碼器
public class ProtoDecoder extends ByteToMessageDecoder {
private final Class<?> genericClass;
public ProtoDecoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if (dataLength < 0) {
ctx.close();
}
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = ProtostuffUtil.deserializer(data, genericClass);
out.add(obj);
}
}
2、Netty粘包拆包
TCP
是一個流協定,就是沒有界限的一長串二進制資料。
TCP
作為傳輸層協定并不不了解上層業務資料的具體含義,它會根據
TCP
緩沖區 的實際情況進行資料包的劃分,是以在業務上認為是一個完整的包,可能會被
TCP
拆分成多個包進行發送,也有可能把多個小的包封裝成 一個大的資料包發送,這就是所謂的
TCP
粘包和拆包問題。面向流的通信是無消息保護邊界的。 如下圖所示,
client
發了兩個資料包
D1
和
D2
,但是
server
端可能會收到如下幾種情況的資料。
解決方案
- 消息定長度,傳輸的資料大小固定長度,例如每段的長度固定為100位元組,如果不夠空位補空格
- 在資料包尾部添加特殊分隔符,比如下劃線,中劃線等,這種方法簡單易行,但選擇分隔符的時候一定要注意每條資料的内部一定不 能出現分隔符。
- 發送長度:發送每條資料的時候,将資料的長度一并發送,比如可以選擇每條資料的前4位是資料的長度,應用層處理時可以根據長度來判斷每條資料的開始和結束。
Netty提供了多個解碼器,可以進行分包的操作,如下:
- LineBasedFrameDecoder (回車換行分包)
- DelimiterBasedFrameDecoder(特殊分隔符分包)
- FixedLengthFrameDecoder(固定長度封包來分包)
現在常用的是第三種解決方案:
- 先定義一個協定包
/**
* @Author xiao7
* @Description 協定包
* @Date 8:32 下午 2021/6/24
**/
public class MessageProtocol {
/**
* 資訊長度
*/
private int len;
/**
* 内容
*/
private byte[] content;
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
- 定義編碼器
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("MyMessageEncoder encode 方法被調用");
// 寫長度
out.writeInt(msg.getLen());
// 寫内容
out.writeBytes(msg.getContent());
}
}
- 定義解碼器
public class MyMessageDecoder extends ReplayingDecoder<Void> {
private int length = 0;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyMessageDecoder decode 被調用");
// 可讀必須大于int長度才去讀
if (in.readableBytes() >= 4) {
if (length == 0) {
length = in.readInt();
}
// 讀完長度後,再去讀内容長度,長度不足length的時候就不讀,等下一個包再說
if (in.readableBytes() >= length) {
// 足夠長了就讀一個包
byte[] content = new byte[length];
in.readBytes(content);
//封裝成 MessageProtocol 對象,放入 out, 傳遞下一個handler業務處理
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
out.add(messageProtocol);
}
// 不讀直接跳過這個包
else {
return;
}
// 讀完一個包,記得歸零需要讀的長度
length = 0;
}
}
}
- 最後給用戶端配置編解碼器,測試就可以了。