文章目錄
- 簡介
- 自定義編碼器和解碼器的實作
- ReplayingDecoder
- 總結
在之前的系列文章中,我們提到了netty中的channel隻接受ByteBuf類型的對象,如果不是ByteBuf對象的話,需要用編碼和解碼器對其進行轉換,今天來聊一下netty自定義的編碼和解碼器實作中需要注意的問題。
在介紹netty自帶的編碼器和解碼器之前,告訴大家怎麼實作自定義的編碼器和解碼器。
netty中所有的編碼器和解碼器都是從ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter衍生而來的。
對于ChannelOutboundHandlerAdapter來說,最重要的兩個類是MessageToByteEncoder 和 MessageToMessageEncoder 。
MessageToByteEncoder是将消息編碼成為ByteBuf,這個類也是我們自定義編碼最常用的類,直接繼承這個類并實作encode方法即可。注意到這個類有一個泛型,這個泛型指定的就是消息的對象類型。
例如我們想将Integer轉換成為ByteBuf,可以這樣寫:
public class IntegerEncoder extends MessageToByteEncoder<Integer> {
@Override
public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
throws Exception {
out.writeInt(msg);
}
}
MessageToMessageEncoder是在消息和消息之間進行轉換,因為消息并不能直接寫入到channel中,是以需要和MessageToByteEncoder配合使用。
下面是一個Integer到String的例子:
public class IntegerToStringEncoder extends
MessageToMessageEncoder<Integer> {
@Override
public void encode(ChannelHandlerContext ctx, Integer message, List<Object> out)
throws Exception {
out.add(message.toString());
}
}
對于ChannelInboundHandlerAdapter來說,最重要的兩個類是ByteToMessageDecoder和MessageToMessageDecoder 。
ByteToMessageDecoder是将ByteBuf轉換成對應的消息類型,我們需要繼承這個類,并實作decode方法,下面是一個從ByteBuf中讀取所有可讀的位元組,并将結果放到一個新的ByteBuf中,
public class SquareDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
out.add(in.readBytes(in.readableBytes()));
}
}
MessageToMessageDecoder是消息和消息之間的轉換,同樣的隻需要實作decode方法即可,如下從String轉換到Integer:
public class StringToIntegerDecoder extends
MessageToMessageDecoder<String> {
@Override
public void decode(ChannelHandlerContext ctx, String message,
List<Object> out) throws Exception {
out.add(message.length());
}
}
上面的代碼看起來很簡單,但是在實作的過程中還有一些問題要注意。
對于Decoder來說,我們從ByteBuf中讀取資料,然後進行轉換。但是在讀取的過程中,并不知道ByteBuf中資料的變動情況,有可能在讀取的過程中ByteBuf還沒有準備好,那麼就需要在讀取的時候對ByteBuf中可讀位元組的大小進行判斷。
比如我們需要解析一個資料結構,這個資料結構的前4個位元組是一個int,表示後面byte數組的長度,我們需要先判斷ByteBuf中是否有4個位元組,然後讀取這4個位元組作為Byte數組的長度,然後再讀取這個長度的Byte數組,最終得到要讀取的結果,如果其中的某一步出現問題,或者說可讀的位元組長度不夠,那麼就需要直接傳回,等待下一次的讀取。如下所示:
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
if (buf.readableBytes() < 4) {
return;
}
buf.markReaderIndex();
int length = buf.readInt();
if (buf.readableBytes() < length) {
buf.resetReaderIndex();
return;
}
out.add(buf.readBytes(length));
}
}
這種判斷是比較複雜同時也是可能出錯的,為了解決這個問題,netty提供了 ReplayingDecoder用來簡化上面的操作,在ReplayingDecoder中,假設所有的ByteBuf已經處于準備好的狀态,直接從中間讀取即可。
上面的例子用ReplayingDecoder重寫如下:
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<Void> {
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
out.add(buf.readBytes(buf.readInt()));
}
}
它的實作原理是去嘗試讀取對應的位元組資訊,如果沒有讀到,則抛出異常,ReplayingDecoder接收到異常之後,會重新調用decode方法。
雖然ReplayingDecoder使用起來非常簡單,但是它有兩個問題。
第一個問題是性能問題,因為會去重複調用decode方法,如果ByteBuf本身并沒有變化,就會導緻重複decode同一個ByteBuf,照成性能的浪費。解決這個問題就是在在decode的過程中分階段進行,比如上面的例子中,我們需要先讀取Byte數組的長度,然後再讀取真正的byte數組。是以在讀完byte數組長度之和,可以調用checkpoint()方法做一個儲存點,下次再執行decode方法的時候就可以跳過這個儲存點,繼續後續的執行過程,如下所示:
public enum MyDecoderState {
READ_LENGTH,
READ_CONTENT;
}
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<MyDecoderState> {
private int length;
public IntegerHeaderFrameDecoder() {
// Set the initial state.
super(MyDecoderState.READ_LENGTH);
}
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
switch (state()) {
case READ_LENGTH:
length = buf.readInt();
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
ByteBuf frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
out.add(frame);
break;
default:
throw new Error("Shouldn't reach here.");
}
}
}
第二個問題是同一個執行個體的decode方法可能會被調用多次,如果我們在ReplayingDecoder中有私有變量的話,則需要考慮對這個私有變量的清洗工作,避免多次調用造成的資料污染。