天天看點

Netty 系列六(編解碼器).

一、概念

    網絡傳輸的機關是位元組,如何将應用程式的資料轉換為位元組,以及将位元組轉換為應用程式的資料,就要說到到我們該篇介紹的編碼器和解碼器。

    将應用程式的資料轉換為網絡格式,以及将網絡格式轉換為應用程式的資料的元件分别叫作編碼器和解碼器,同時具有這兩種功能的單一元件叫作編解碼器。 Netty 提供了一系列用來建立所有這些編碼器、解碼器以及編解碼器的工具,還可以按需定制通用的消息轉換編解碼器。

    Netty 的編(解)碼器實作了 ChannelHandlerAdapter,也是一種特殊的 ChannelHandler,是以依賴于 ChannelPipeline,可以将多個編(解)碼器連結在一起,以實作複雜的轉換邏輯。

    對于編碼器和解碼器來說,其過程也是相當的簡單:一旦消息被編碼或者解碼,它就會被 ReferenceCountUtil.release(message)調用自動釋放。如果你需要保留引用以便稍後使用,那麼你可以調用 ReferenceCountUtil.retain(message)方法。這将會增加該引用計數,進而防止該消息被釋放。

二、編碼器

    編碼器将應用程式的資料轉換為網絡格式,出站消息時被調用,主要有兩類:

1、将消息編碼為位元組:MessageToByteEncoder

public abstract class MessageToByteEncoder<I> extends ChannelHandlerAdapter{}      
public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
    
    /**
     * 1、類型為 I 的出站消息被編碼為 ByteBuf
     * 2、該 ByteBuf 随後将會被轉發給 ChannelPipeline中的下一個 ChannelOutboundHandler。
     */
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Short aShort, ByteBuf byteBuf) throws Exception {
        byteBuf.writeShort(aShort);
    }
}      

2、将消息編碼為消息:MessageToMessageEncoder

public abstract class MessageToMessageEncoder<I> extends ChannelHandlerAdapter{}      
public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> {
    
    /**
     * 1、類型為 I 的出站消息被編碼為目标類型 存入List 中
     * 2、該 List 随後将會被轉發給 ChannelPipeline中的下一個 ChannelOutboundHandler。
     */
    @Override
    protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        out.add(String.valueOf(msg));
    }
}      

三、解碼器

    解碼器将網絡格式轉換為應用程式的資料,入站消息時被調用。

    解碼器比編碼器多了一個 decodeLast 方法,原因是解碼器通常需要在 Channel 關閉之後産生最後一個消息。這顯然不适用于編碼器的場景 —— 在連接配接被關閉之後仍然産生一個消息是毫無意義的。是以,當 Channel 的狀态變為非活動時,這個方法将會被調用一次。可以重寫該方法以提供特殊的處理。

    解碼器主要有兩類:

1、将位元組解碼為消息:ByteToMessageDecoder 和 ReplayingDecoder

public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {}      
public class MyDecoder extends ByteToMessageDecoder {

    private static final int MAX_FRAME_SIZE = 1024;

    /**
     * 1、該方法被調用時,将會傳入一個包含了傳入資料的 ByteBuf,以及一個用來添加解碼消息的 List.
     * 2、對該方法的調用将會重複進行,直到确定沒有新的元素被添加到該 List,或者Butebuf 沒有更多可讀取的位元組為止。
     * 3、List 的内容将會被傳遞給 ChannelPipeline 中的下一個 ChannelInboundHandler。
     */
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        int readableBytes = byteBuf.readableBytes();
        //不能讓解碼器緩沖大量的資料以緻于耗盡可用的記憶體
        if (readableBytes > MAX_FRAME_SIZE){
            //跳過所有的可讀位元組
            byteBuf.skipBytes(readableBytes);
            throw new TooLongFrameException("資料超過可緩存位元組...");
        }
        //假設需要解析 int 類型的消息(int 4個位元組)
        if  (readableBytes > 4){
            list.add(byteBuf.readInt());
        }

    }
}      

       ----分割線----

//類型參數 S 指定了用于狀态管理的類型,其中 Void 代表不需要狀态管理。
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder{}      
public class MyReplayingDecoder extends ReplayingDecoder<Void> {

    /**
     * 1、ReplayingDecoder 擴充了 ByteToMessageDecoder,并且自定義了 ByteBuf 的實作 ReplayingDecoderByteBuf。
     * 2、ReplayingDecoderByteBuf 對要轉換的消息的位元組數進行内部管理,如果沒有足夠的位元組使用,将會抛出一個 Signal,由ReplayingDecoder進行處理。
     *
     * @param byteBuf 傳入的 ByteBuf 實際上是 ReplayingDecoderByteBuf*/
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
       list.add(byteBuf.readInt());
    }
}      

    繼承 ByteToMessageDecoder 和繼承 ReplayingDecoder 有什麼差別呢?ReplayingDecoder 唯一的好處就是解碼的時候不用進行位元組數的判斷,如上 ,因為它交由自定義的 ReplayingDecoderByteBuf 去處理了。但是 ReplayingDecoder 的效率稍慢于ByteToMessageDecoder。一般我們在這兩個解碼器中進行抉擇的準則是:如果使用  ByteToMessageDecoder 不會引入太多的複雜性,那麼請使用它;否則,請使用 ReplayingDecoder!

2、将消息解碼為消息:MessageToMessageDecoder

public abstract class MessageToMessageEncoder<I> extends ChannelHandlerAdapter{}      
public class IntegerToStringDecoder extends MessageToMessageEncoder<Integer> {
    
    /**
     * 1、對于每個需要被解碼為另一種格式的入站消息來說,該方法将會被調用。
     * 2、解碼消息随後會被傳遞給 ChannelPipeline 中的下一個 ChannelInboundHandler
     */
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Integer integer, List<Object> list) throws Exception {
        list.add(String.valueOf(integer));
    }
}      

四、編解碼器

    Netty 的抽象編解碼器類捆綁一個解碼器/編碼器對,主要用于在同一個類中管理入站和出站資料和消息的轉換。

    個人覺得這個編解碼器略顯雞肋呀,還是喜歡将編碼器和解碼器分開來寫。因為 Netty 設計的一個基本準則就是:盡可能地将兩種功能(編碼器、解碼器)分開,最大化代碼的可重用性和可擴充性。

    編解碼器也主要有兩類:

1、位元組消息編解碼器:ByteToMessageCodec

public abstract class ByteToMessageCodec<I> extends ChannelHandlerAdapter {}      
public class MyBytetoMessageCodec extends ByteToMessageCodec<Short> {
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        while (in.readableBytes() > 4){
            out.add(in.readInt());
        }
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
        out.writeShort(msg);
    }
}      

2、消息轉換編解碼器:MessageToMessageCodec

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelHandlerAdapter {}      
Netty 系列六(編解碼器).
Netty 系列六(編解碼器).
public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {

    /**
     * 1、對于每個 OUTBOUND_IN 類型的消息,這個方法都會被調用。
     * 2、這個消息将會被編碼為 INBOUND_IN 類型的消息。
     * 3、然後被轉發給 ChannelPipeline 中的下一個 ChannelOutboundHandler
     */
    @Override
    protected void encode(ChannelHandlerContext ctx, MyWebSocketFrame msg, List<Object> out) throws Exception {
        ByteBuf byteBuf = msg.getByteBuf().duplicate().retain();
        switch (msg.getFrameType()) {
            case BINARY:
                out.add(new BinaryWebSocketFrame(byteBuf));
                break;
            case TEXT:
                out.add(new TextWebSocketFrame(byteBuf));
                break;
            case CLOSE:
                out.add(new CloseWebSocketFrame(true, 0, byteBuf));
                break;
            case CONTINUATION:
                out.add(new ContinuationWebSocketFrame(byteBuf));
                break;
            case PONG:
                out.add(new PongWebSocketFrame(byteBuf));
                break;
            case PING:
                out.add(new PingWebSocketFrame(byteBuf));
            default:
                break;
        }


    }

    /**
     * 1、傳入 INBOUND_IN 類型的消息,該方法會被調用。
     * 2、這個消息會被解碼為 OUTBOUND_IN 類型的消息。
     * 3、然後被轉發給 ChannelPipeline 中的下一個 ChannelInboundHandler
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
        ByteBuf byteBuf = msg.content().duplicate().retain();
        if (msg instanceof BinaryWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, byteBuf));
        } else if (msg instanceof CloseWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, byteBuf));
        } else if (msg instanceof TextWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, byteBuf));
        } else if (msg instanceof PingWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, byteBuf));
        } else if (msg instanceof PongWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, byteBuf));
        } else if (msg instanceof ContinuationWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, byteBuf));
        } else {
            throw new IllegalStateException("Unsupported websocket msg " + msg);
        }

    }


    public static final class MyWebSocketFrame {

        public enum FrameType {
            BINARY,
            CLOSE,
            PING,
            PONG,
            TEXT,
            CONTINUATION
        }

        private final FrameType frameType;
        private final ByteBuf byteBuf;

        public MyWebSocketFrame(FrameType frameType, ByteBuf byteBuf) {
            this.frameType = frameType;
            this.byteBuf = byteBuf;
        }

        public FrameType getFrameType() {
            return frameType;
        }

        public ByteBuf getByteBuf() {
            return byteBuf;
        }
    }
}      

這個類寫的還是很棒的~~

參考資料:《Netty IN ACTION》

示範源代碼:https://github.com/JMCuixy/NettyDemo