天天看點

netty系列之:自定義編碼和解碼器要注意的問題簡介自定義編碼器和解碼器的實作ReplayingDecoder總結

簡介

在之前的系列文章中,我們提到了netty中的channel隻接受ByteBuf類型的對象,如果不是ByteBuf對象的話,需要用編碼和解碼器對其進行轉換,今天來聊一下netty自定義的編碼和解碼器實作中需要注意的問題。

自定義編碼器和解碼器的實作

在介紹netty自帶的編碼器和解碼器之前,告訴大家怎麼實作自定義的編碼器和解碼器。

netty中所有的編碼器和解碼器都是從ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter衍生而來的。

對于ChannelOutboundHandlerAdapter來說,最重要的兩個類是MessageToByteEncoder 和 MessageToMessageEncoder 。

MessageToByteEncoder是将消息編碼成為ByteBuf,這個類也是我們自定義編碼最常用的類,直接繼承這個類并實作encode方法即可。注意到這個類有一個泛型,這個泛型指定的就是消息的對象類型。

例如我們想将Integer轉換成為ByteBuf,可以這樣寫:

public class IntegerEncoder extends MessageToByteEncoder<Integer> {
            @Override
           public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
                   throws Exception {
               out.writeInt(msg);
           }
       }      

MessageToMessageEncoder是在消息和消息之間進行轉換,因為消息并不能直接寫入到channel中,是以需要和MessageToByteEncoder配合使用。

下面是一個Integer到String的例子:

public class IntegerToStringEncoder extends
               MessageToMessageEncoder<Integer> {
            @Override
           public void encode(ChannelHandlerContext ctx, Integer message, List<Object> out)
                   throws Exception {
               out.add(message.toString());
           }
       }      

對于ChannelInboundHandlerAdapter來說,最重要的兩個類是ByteToMessageDecoder和MessageToMessageDecoder 。

ByteToMessageDecoder是将ByteBuf轉換成對應的消息類型,我們需要繼承這個類,并實作decode方法,下面是一個從ByteBuf中讀取所有可讀的位元組,并将結果放到一個新的ByteBuf中,

public class SquareDecoder extends ByteToMessageDecoder {
            @Override
           public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
                   throws Exception {
               out.add(in.readBytes(in.readableBytes()));
           }
       }      

MessageToMessageDecoder是消息和消息之間的轉換,同樣的隻需要實作decode方法即可,如下從String轉換到Integer:

public class StringToIntegerDecoder extends
               MessageToMessageDecoder<String> {
            @Override
           public void decode(ChannelHandlerContext ctx, String message,
                              List<Object> out) throws Exception {
               out.add(message.length());
           }
       }      

ReplayingDecoder

上面的代碼看起來很簡單,但是在實作的過程中還有一些問題要注意。

對于Decoder來說,我們從ByteBuf中讀取資料,然後進行轉換。但是在讀取的過程中,并不知道ByteBuf中資料的變動情況,有可能在讀取的過程中ByteBuf還沒有準備好,那麼就需要在讀取的時候對ByteBuf中可讀位元組的大小進行判斷。

比如我們需要解析一個資料結構,這個資料結構的前4個位元組是一個int,表示後面byte數組的長度,我們需要先判斷ByteBuf中是否有4個位元組,然後讀取這4個位元組作為Byte數組的長度,然後再讀取這個長度的Byte數組,最終得到要讀取的結果,如果其中的某一步出現問題,或者說可讀的位元組長度不夠,那麼就需要直接傳回,等待下一次的讀取。如下所示:

public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
      @Override
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
       if (buf.readableBytes() < 4) {
          return;
       }
       buf.markReaderIndex();
       int length = buf.readInt();
       if (buf.readableBytes() < length) {
          buf.resetReaderIndex();
          return;
       }
       out.add(buf.readBytes(length));
     }
   }      

這種判斷是比較複雜同時也是可能出錯的,為了解決這個問題,netty提供了 ReplayingDecoder用來簡化上面的操作,在ReplayingDecoder中,假設所有的ByteBuf已經處于準備好的狀态,直接從中間讀取即可。

上面的例子用ReplayingDecoder重寫如下:

public class IntegerHeaderFrameDecoder
        extends ReplayingDecoder<Void> {
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
       out.add(buf.readBytes(buf.readInt()));
     }
   }      

它的實作原理是去嘗試讀取對應的位元組資訊,如果沒有讀到,則抛出異常,ReplayingDecoder接收到異常之後,會重新調用decode方法。

雖然ReplayingDecoder使用起來非常簡單,但是它有兩個問題。

第一個問題是性能問題,因為會去重複調用decode方法,如果ByteBuf本身并沒有變化,就會導緻重複decode同一個ByteBuf,照成性能的浪費。解決這個問題就是在在decode的過程中分階段進行,比如上面的例子中,我們需要先讀取Byte數組的長度,然後再讀取真正的byte數組。是以在讀完byte數組長度之和,可以調用checkpoint()方法做一個儲存點,下次再執行decode方法的時候就可以跳過這個儲存點,繼續後續的執行過程,如下所示:

public enum MyDecoderState {
     READ_LENGTH,
     READ_CONTENT;
   }
   public class IntegerHeaderFrameDecoder
        extends ReplayingDecoder<MyDecoderState> {
     private int length;
     public IntegerHeaderFrameDecoder() {
       // Set the initial state.
       super(MyDecoderState.READ_LENGTH);
     }
      @Override
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
       switch (state()) {
       case READ_LENGTH:
         length = buf.readInt();
         checkpoint(MyDecoderState.READ_CONTENT);
       case READ_CONTENT:
         ByteBuf frame = buf.readBytes(length);
         checkpoint(MyDecoderState.READ_LENGTH);
         out.add(frame);
         break;
       default:
         throw new Error("Shouldn't reach here.");
       }
     }
   }      

第二個問題是同一個執行個體的decode方法可能會被調用多次,如果我們在ReplayingDecoder中有私有變量的話,則需要考慮對這個私有變量的清洗工作,避免多次調用造成的資料污染。

總結

通過繼承上面的幾個類,我們就可以自己實作編碼和解碼的邏輯了。但是好像還有點問題,自定義編碼和解碼器是不是太複雜了?還需要判斷要讀取的byte數組的大小。有沒有更加簡單的方法呢?

有的,敬請期待netty系列的下一篇文章:netty自帶的編碼器和解碼器.

本文的例子可以參考:

learn-netty4
本文已收錄于 http://www.flydean.com/14-netty-cust-codec/

最通俗的解讀,最深刻的幹貨,最簡潔的教程,衆多你不知道的小技巧等你來發現!

歡迎關注我的公衆号:「程式那些事」,懂技術,更懂你!

繼續閱讀