天天看點

netty系列之:基于流的資料傳輸

簡介

我們知道由兩種資料的傳輸方式,分别是字元流和位元組流,字元流的意思是傳輸的對象就是字元串,格式已經被設定好了,發送方和接收方按照特定的格式去讀取就行了,而位元組流是指将資料作為最原始的二進制位元組來進行傳輸。

今天給大家介紹一下在netty中的基于流的資料傳輸。

package和byte

熟悉TCP/IP協定的同學應該知道,在TCP/IP中,因為底層協定有支援的資料包的最大值,是以對于大資料傳輸來說,需要對資料進行拆分和封包處理,并将這些拆分組裝過的包進行發送,最後在接收方對這些包進行組合。在各個包中有固定的結構,是以接收方可以很清楚的知道到底應該組合多少個包作為最終的結果。

那麼對于netty來說,channel中傳輸的是ByteBuf,實際上最最最底層的就是byte數組。對于這種byte數組來說,接收方并不知道到底應該組合多少個byte來合成原來的消息,是以需要在接收端對收到的byte進行組合,進而生成最終的資料。

那麼對于netty中的byte資料流應該怎麼組合呢?我們接下來看兩種組合方法。

手動組合

這種組合的方式的基本思路是構造一個目标大小的ByteBuf,然後将接收到的byte通過調用ByteBuf的writeBytes方法寫入到ByteBuf中。最後從ByteBuf中讀取對應的資料。

比如我們想從服務端發送一個int數字給用戶端,一般來說int是32bits,然後一個byte是8bits,那麼一個int就需要4個bytes組成。

在server端,可以建立一個byte的數組,數組中包含4個元素。将4個元素的byte發送給用戶端,那麼用戶端該如何處理呢?

首先我們需要建立一個clientHander,這個handler應該繼承ChannelInboundHandlerAdapter,并且在其handler被添加到ChannelPipeline的時候初始化一個包含4個byte的byteBuf。

handler被添加的時候會觸發一個handlerAdded事件,是以我們可以這樣寫:

private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        //建立一個4個byte的緩沖器
        buf = ctx.alloc().buffer(4); 
    }
           

上例中,我們從ctx配置設定了一個4個位元組的緩沖器,并将其指派給handler中的私有變量buf。

當handler執行完畢,從ChannelPipeline中删除的時候,會觸發handlerRemoved事件,在這個事件中,我們可以對配置設定的Bytebuf進行清理,通常來說,可以調用其release方法,如下所示:

public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // 釋放buf
        buf = null;
    }
           

然後最關鍵的一步就是從channel中讀取byte并将其放到4個位元組的byteBuf中。在之前的文章中我們提到了,可以在channelRead方法中,處理消息讀取的邏輯。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // 寫入一個byte
        m.release();

        if (buf.readableBytes() >= 4) { // 已經湊夠4個byte,将4個byte組合稱為一個int
            long result = buf.readUnsignedInt();
            ctx.close();
        }
    }
           

每次觸發channelRead方法,都會将讀取到的一個位元組的byte通過調用writeBytes方法寫入buf中。當buf的可讀byte大于等于4個的時候就說明4個位元組已經讀滿了,可以對其進行操作了。

這裡我們将4個位元組組合成一個unsignedInt,并使用readUnsignedInt方法從buf中讀取出來組合稱為一個int數字。

上面的例子雖然可以解決4個位元組的byte問題,但是如果資料結構再負責一點,上面的方式就會力不從心,需要考慮太多的資料組合問題。接下來我們看另外一種方式。

Byte的轉換類

netty提供了一個ByteToMessageDecoder的轉換類,可以友善的對Byte轉換為其他的類型。

我們隻需要重新其中的decode方法,就可以實作對ByteBuf的轉換:

public class SquareDecoder extends ByteToMessageDecoder {
            @Override
           public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
                   throws Exception {
               out.add(in.readBytes(in.readableBytes()));
           }
       }
           

上面的例子将byte從input轉換到output中,當然,你還可以在上面的方法中進行格式轉換,如下所示:

public class TimeDecoder extends ByteToMessageDecoder { 
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { 
        if (in.readableBytes() < 4) {
            return; 
        }

        out.add(in.readBytes(4)); 
    }
}
           

上面的例子會先判斷in中是否有4個byte,如果有就将其讀出來放到out中去。那麼有同學會問了,輸入不是一個byte一個byte來的嗎?為什麼這裡可以一次讀取到4個byte?這是因為ByteToMessageDecoder内置了一個緩存裝置,是以這裡的in實際上是一個緩存集合。

ReplayingDecoder

netty還提供了一個更簡單的轉換ReplayingDecoder,如果使用ReplayingDecoder重新上面的邏輯就是這樣的:

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}
           

隻需要一行代碼即可。

事實上ReplayingDecoder 是ByteToMessageDecoder 的子類,是在ByteToMessageDecoder上豐富了一些功能的結果。

他們兩的差別在于ByteToMessageDecoder 還需要通過調用readableBytes來判斷是否有足夠的可以讀byte,而使用ReplayingDecoder直接讀取即可,它假設的是所有的bytes都已經接受成功了。

比如下面使用ByteToMessageDecoder的代碼:

public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {

      @Override
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {

       if (buf.readableBytes() < 4) {
          return;
       }

       buf.markReaderIndex();
       int length = buf.readInt();

       if (buf.readableBytes() < length) {
          buf.resetReaderIndex();
          return;
       }

       out.add(buf.readBytes(length));
     }
   }

           

上例假設在byte的頭部是一個int大小的數組,代表着byte數組的長度,需要先讀取int值,然後再根據int值來讀取對應的byte資料。

和下面的代碼是等價的:

public class IntegerHeaderFrameDecoder
        extends ReplayingDecoder<Void> {

     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {

       out.add(buf.readBytes(buf.readInt()));
     }
   }

           

上面代碼少了判斷的步驟。

那麼這是怎麼實作的呢?

事實上ReplayingDecoder 會傳遞一個會抛出 Error的 ByteBuf , 當 ByteBuf 讀取的byte個數不滿足要求的時候,會抛出異常,當ReplayingDecoder 捕獲到這個異常之後,會重置buffer的readerIndex到最初的狀态,然後等待後續的資料進來,然後再次調用decode方法。

是以,ReplayingDecoder的效率會比較低,為了解決這個問題,netty提供了checkpoint() 方法。這是一個儲存點,當報錯的時候,可以不會退到最初的狀态,而是回退到checkpoint() 調用時候儲存的狀态,進而可以減少不必要的浪費。

總結

本文介紹了在netty中進行stream操作和變換的幾種方式,希望大家能夠喜歡。