天天看點

netty系列之:自定義編碼解碼器簡介自定義編碼器自定義解碼器添加編碼解碼器到pipeline計算2的N次方總結

簡介

在之前的netty系列文章中,我們講到了如何将對象或者String轉換成為ByteBuf,通過使用netty自帶的encoder和decoder可以實作非常友善的對象和ByteBuf之間的轉換,然後就可以向channel中随意寫入對象和字元串了。

使用netty自帶的編碼器當然很好,但是如果你有些特殊的需求,比如希望在編碼的過程中對資料進行變換,或者對對象的字段進行選擇,那麼可能就需要自定義編碼解碼器了。

自定義編碼器

自定義編碼器需要繼承MessageToByteEncoder 類,并實作encode方法,在該方法中寫入具體的編碼邏輯。

本例我們希望計算2的N次方,據說将一張紙折疊100次可以達到地球到月亮的高度,這麼大的資料普通的number肯定是裝不下的,我們将會使用BigInteger來對這個巨大的數字進行儲存。

那麼對于被編碼器來說,則需要将這個BigInteger轉換成為byte數組。同時在byte數組讀取的過程中,我們需要界定到底哪些byte資料是屬于同一個BigInteger的,這就需要對寫入的資料格式做一個約定。

這裡我們使用三部分的資料結構來表示一個BigInteger。第一部分是一個magic word也就是魔法詞,這裡我們使用魔法詞“N”,當讀取到這個魔法詞就表示接下來的數字是BigInteger。第二部分是表示bigInteger數字的byte數組的長度,擷取到這個長度值,就可以讀取到所有的byte數組值,最後将其轉換成為BigInteger。

因為BigInteger是Number的子類,為了更加泛化編碼器,我們使用Number作為MessageToByteEncoder的泛型,核心編碼代碼如下:

protected void encode(ChannelHandlerContext ctx, Number msg, ByteBuf out) {
        // 将number編碼成為ByteBuf
        BigInteger v;
        if (msg instanceof BigInteger) {
            v = (BigInteger) msg;
        } else {
            v = new BigInteger(String.valueOf(msg));
        }
        // 将BigInteger轉換成為byte[]數組
        byte[] data = v.toByteArray();
        int dataLength = data.length;
        // 将Number進行編碼
        out.writeByte((byte) 'N'); // 魔法詞
        out.writeInt(dataLength);  // 數組長度
        out.writeBytes(data);      // 最終的資料
    }      

自定義解碼器

有了編碼之後的byte數組,就可以在解碼器中對其解碼了。

上一節介紹了,編碼過後的資料格式是魔法詞N+數組長度+真正的資料。

其中魔法詞長度是一個位元組,數組長度是四個位元組,前面部分總共是5個位元組。是以在解碼的時候,首先判斷ByteBuf中可讀位元組的長度是否小于5,如果小于5說明資料是無效的,可以直接return。

如果可讀位元組的長度大于5,則表示資料是有效的,可以進行資料的解碼了。

解碼過程中需要注意的是,并不是所有的資料都是我們所希望的格式,如果在讀取的過程中讀到了我們不認識的格式,那麼說明這個資料并不是我們想要的,則可以交由其他的handler進行處理。

但是對于ByteBuf來說,一旦調用read方法,就會導緻reader index移動位置,是以在真正的讀取資料之前需要調用ByteBuf的markReaderIndex方法,對readerIndex進行記錄。然後分别讀取魔法詞、數組長度和剩餘的資料,最後将資料轉換成為BigInteger,如下所示:

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 保證魔法詞和數組長度有效
        if (in.readableBytes() < 5) {
            return;
        }
        in.markReaderIndex();
        // 檢查魔法詞
        int magicNumber = in.readUnsignedByte();
        if (magicNumber != 'N') {
            in.resetReaderIndex();
            throw new CorruptedFrameException("無效的魔法詞: " + magicNumber);
        }
        // 讀取所有的資料
        int dataLength = in.readInt();
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }
        // 将剩下的資料轉換成為BigInteger
        byte[] decoded = new byte[dataLength];
        in.readBytes(decoded);
        out.add(new BigInteger(decoded));
    }      

添加編碼解碼器到pipeline

有了兩個編碼解碼器,還需要将其添加到pipeline中進行調用。

在實作ChannelInitializer中的initChannel中,可以對ChannelPipeline進行初始化,本例中的初始化代碼如下:

// 對流進行壓縮
        pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
        pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
        // 添加number編碼解碼器
        pipeline.addLast(new NumberDecoder());
        pipeline.addLast(new NumberEncoder());
        // 添加業務處理邏輯
        pipeline.addLast(new CustomProtocolServerHandler());      

其中最後一行是真正的業務處理邏輯,NumberDecoder和NumberEncoder是編碼和解碼器。這裡我們還使用了一個ZlibEncoder用于對流資料進行壓縮,這裡使用的壓縮方式是GZIP。

壓縮的好處就是可以減少資料傳輸的數量,提升傳輸效率。其本質也是一個編碼解碼器。

計算2的N次方

計算2的N次方的邏輯是這樣的,首先用戶端發送2給伺服器端,伺服器端接收到該消息和結果1相乘,并将結果寫回給用戶端,用戶端收到消息之後再發送2給伺服器端,伺服器端将上次的計算結果乘以2,再發送給用戶端,以此類推直到執行N次。

首先看下用戶端的發送邏輯:

// 最大計算2的1000次方
        ChannelFuture future = null;
        for (int i = 0; i < 1000 && next <= CustomProtocolClient.COUNT; i++) {
            future = ctx.write(2);
            next++;
        }      

當next小于等于要計算的COUNT時,就将2寫入到channel中。

對于伺服器來說,在channelRead0方法中,讀取消息,并将其和結果相乘,再把結果寫回給用戶端。

public void channelRead0(ChannelHandlerContext ctx, BigInteger msg) throws Exception {
        // 将接收到的msg乘以2,然後傳回給用戶端
        count++;
        result = result.multiply(msg);
        ctx.writeAndFlush(result);
    }      

用戶端統計讀取到的消息個數,如果消息個數=COUNT,說明計算完畢,就可以将結果儲存起來供後續使用,其核心代碼如下:

public void channelRead0(ChannelHandlerContext ctx, final BigInteger msg) {
        receivedMessages ++;
        if (receivedMessages == CustomProtocolClient.COUNT) {
            // 計算完畢,将結果放入answer中
            ctx.channel().close().addListener(future -> {
                boolean offered = answer.offer(msg);
                assert offered;
            });
        }
    }      

總結

本文實作了一個Number的編碼解碼器,事實上你可以自定義實作任何對象的編碼解碼器。

本文的例子可以參考:

learn-netty4
本文已收錄于 http://www.flydean.com/13-netty-customprotocol/

最通俗的解讀,最深刻的幹貨,最簡潔的教程,衆多你不知道的小技巧等你來發現!

歡迎關注我的公衆号:「程式那些事」,懂技術,更懂你!

繼續閱讀