天天看點

ChannelHandlerChannelHandlerChannelInboundHandler和ChannelOutboundHandlerChannelHandlerContextChannelPipeline

ChannelHandler

netty中的ChannelHandler用于處理Channel對應的事件,每一個ChannelHandler都會和一個channel綁定,ChannelHandler體系如下:

ChannelHandlerChannelHandlerChannelInboundHandler和ChannelOutboundHandlerChannelHandlerContextChannelPipeline

ChannelHandler接口裡面隻定義了三個生命周期方法:

void handlerAdded(ChannelHandlerContext var1) throws Exception;

    void handlerRemoved(ChannelHandlerContext var1) throws Exception;

    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;           

子接口ChannelInboundHandler和ChannelOutboundHandler對ChannelHandler進行了擴充,但netty架構提供了ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter和ChannelDuplexHandler三個适配類,實際使用時繼承這些适配類即可。

此外還有簡化類SimpleChannelInboundHandler,繼承SimpleChannelInboundHandler類後,會在接收到資料後⾃動release掉資料占⽤的Bytebuffer資源,并且繼承該類需要指定資料格式。⽽繼承ChannelInboundHandlerAdapter則不會⾃動釋放Bytebuffer資源,需要⼿動調⽤ReferenceCountUtil.release()等⽅法進⾏釋放,并且繼承該類不需要指定資料格式。

實際編寫server端時,需要繼承ChannelInboundHandlerAdapter,防⽌資料未處理完就⾃動釋放了。此外server端可能有多個用戶端連接配接,并且每⼀個用戶端請求的資料格式都不⼀緻,相比之下ChannelInboundHandlerAdapter更靈活。

用戶端根據情況可以繼承SimpleChannelInboundHandler類。好處是直接指定好傳輸的資料格式,就不需要再進⾏格式的轉換了。

ChannelHandler中的三個生命周期方法分别對應如下場景:

目前ChannelHander加入ChannelHandlerContext中;

目前從ChannelHandlerContext中移除;

ChannelHandler回調方法出現異常時被回調.           

ChannelInboundHandler和ChannelOutboundHandler

差別主要在于ChannelInboundHandler的channelRead和channelReadComplete回調和ChannelOutboundHandler的write和flush回調上,ChannelInboundHandler的channelRead回調負責執行入棧資料的decode邏輯,ChannelOutboundHandler的write負責執行出站資料的encode工作。

ChannelInboundHandler

ChannelInboundHandler定義了如下回調方法:

void channelRegistered(ChannelHandlerContext var1) throws Exception;

    void channelUnregistered(ChannelHandlerContext var1) throws Exception;

    void channelActive(ChannelHandlerContext var1) throws Exception;

    void channelInactive(ChannelHandlerContext var1) throws Exception;

    void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;

    void channelReadComplete(ChannelHandlerContext var1) throws Exception;

    void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;

    void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;

    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;           

其回調時機為:

channelRegistered 目前channel注冊到EventLoop;
    
    channelUnregistered 目前channel從EventLoop取消注冊;
    
    channelActive 目前channel激活的時候;
    
    channelInactive 目前channel失活的時候;
    
    channelRead 目前channel從遠端讀取到資料;
    
    channelReadComplete channel read消費完讀取的資料的時候被觸發;
    
    userEventTriggered 使用者事件觸發的時候;
    
    channelWritabilityChanged channel的寫狀态變化的時候觸發。           

ChannelHandlerContext作為參數,在每個回調事件處理完成之後,使用ChannelHandlerContext的fireChannelXXX方法來傳遞給pipeline中下一個ChannelHandler,netty的codec子產品和業務處理代碼分離就用到了這個鍊路處理。

ChannelOutboundHandler

ChannelOutboundHandler定義了如下回調方法:

void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;

    void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;

    void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

    void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

    void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

    void read(ChannelHandlerContext var1) throws Exception;

    void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;

    void flush(ChannelHandlerContext var1) throws Exception;           

回調方法觸發時機:

bind bind操作執行前觸發;
    
    connect connect 操作執行前觸發;
    
    disconnect disconnect 操作執行前觸發;
    
    close close操作執行前觸發;
    
    deregister deregister操作執行前觸發;
    
    read read操作執行前觸發;
    
    write write操作執行前觸發;
    
    flush flush操作執行前觸發;           

對于ChannelPromise這個參數,可以調用它的addListener注冊監聽,當回調方法所對應的操作完成後,會觸發這個監聽下面的代碼。

同樣添加監聽器的還有ChannelFuture,而ChannelFuture也是ChannelPromise的父接口:

public interface ChannelPromise extends ChannelFuture, Promise<Void> {
    ...
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> var1);
    ...
}               

例如:

ctx.writeAndFlush(toFullHttpResponse()).addListener(ChannelFutureListener.CLOSE);           

ChannelFutureListener.CLOSE這個監聽器就會在writeAndFlush完成之後被調用來關閉channel:

ChannelFutureListener CLOSE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            future.channel().close();
        }
    };           

ChannelHandlerContext

當ChannelHandler加入到ChannelPipeline的時候,會建立一個對應的ChannelHandlerContext并綁定,ChannelPipeline實際維護的是和ChannelHandlerContext的關系,例如在DefaultChannelPipeline:

public class DefaultChannelPipeline implements ChannelPipeline {
    ...
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
}           

DefaultChannelPipeline會儲存第一個ChannelHandlerContext以及最後一個ChannelHandlerContext的引用。

而AbstractChannelHandlerContext中維護了next和prev指針:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    ...
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
}           

這樣ChannelHandlerContext之間形成了雙向連結清單。

ChannelPipeline

在Channel建立的時候,會同時建立ChannelPipeline:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    ...
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        this.id = this.newId();
        this.unsafe = this.newUnsafe();
        this.pipeline = this.newChannelPipeline();
    }
    
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
}           

在ChannelPipeline中也會持有Channel的引用,ChannelPipeline會維護一個ChannelHandlerContext的雙向連結清單,連結清單的頭尾有預設實作:

public class DefaultChannelPipeline implements ChannelPipeline {

    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    private final Channel channel;

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
        this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
        this.voidPromise = new VoidChannelPromise(channel, true);
        this.tail = new DefaultChannelPipeline.TailContext(this);
        this.head = new DefaultChannelPipeline.HeadContext(this);
        this.head.next = this.tail;
        this.tail.prev = this.head;
    }
}           

我們添加的自定義ChannelHandler會插入到head和tail之間,以addLast為例:

public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return this.addLast((EventExecutorGroup)null, name, handler);
    }

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            ...
            this.addLast0(newCtx);
            ...
    }

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = this.tail.prev;
        newCtx.prev = prev;
        newCtx.next = this.tail;
        prev.next = newCtx;
        this.tail.prev = newCtx;
    }           

如果是ChannelInboundHandler的回調,根據插入的順序從head向tail進行鍊式調用,ChannelOutboundHandler則相反:

ChannelHandlerChannelHandlerChannelInboundHandler和ChannelOutboundHandlerChannelHandlerContextChannelPipeline

值得注意的是,整條鍊路的調用需要通過Channel接口直接觸發,如果使用ChannelContextHandler的接口方法間接觸發,鍊路會從該ChannelContextHandler對應的ChannelHandler開始,而不是從頭或尾開始。

ChannelPipeline入口在NioEventLoop的processSelectedKey():

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ...
        try {
            ...
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {...}
    }           

當為OP_READ事件時,調用unsafe.read():

@Override
        public final void read() {
            final ChannelPipeline pipeline = pipeline();
            ...
            try {
                do {
                    ...
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                    ....
                } while (allocHandle.continueReading());
            } catch (Throwable t) {...}
        }           

pipeline.fireChannelRead(byteBuf)是入棧入口,實際上是pipeline中ChannelHandlerContext的head節點進行fireChannelRead的同語義操作。

ChannelPipeline相關元素之間的關系如下:

ChannelHandlerChannelHandlerChannelInboundHandler和ChannelOutboundHandlerChannelHandlerContextChannelPipeline
1. 每個Channel都會綁定且隻綁定一個ChannelPipeline,ChannelPipeline中也會持有Channel的引用;

2. ChannelPipeline持有ChannelHandlerContext鍊路;

3. 每個ChannelHandlerContext對應一個ChannelHandler;

4. ChannelHandlerContext同時也會持有ChannelPipeline引用,也就間接持有Channel引用;

5. ChannelHandler鍊路會根據Handler的類型,分為InBound和OutBound兩條鍊路,inbound處理入棧資料,outbound處理出棧資料。           

繼續閱讀