天天看點

自頂向下深入分析Netty(七)--ChannelHandlerContext源碼實作

7.2.2 ChannelHandlerContext

7.2.2.1 AbstractChannelHandlerContext

AbstractChannelHandlerContext的類簽名如下:

abstract class AbstractChannelHandlerContext extends
              DefaultAttributeMap implements ChannelHandlerContext                

該類作為其他ChannelHandlerContext的基類,Netty4.0中繼承自DefaultAttributeMap實作屬性鍵值對的存儲和擷取,由于此種用法與

channel.attr()

存在混淆,故不建議使用。确有需要時,請直接使用

channel.attr()

下面介紹其中的關鍵字段:

// Context形成雙向連結清單,next和prev分别是後繼節點和前驅節點
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;

    private final boolean inbound;  // 對應處理器為InboudHandler
    private final boolean outbound; // 對應處理器為OutboudHandler
    private final DefaultChannelPipeline pipeline;  
    private final String name;  // Context的名稱
    private final boolean ordered;  // 事件順序标記

    final EventExecutor executor; // 事件執行線程

    private volatile int handlerState = INIT;   // 狀态                

其中

handlerState

的字面意思容易使人誤解,為此列出四種狀态并加以解釋:

// 初始狀态
    private static final int INIT = ; 
    // 對應Handler的handlerAdded方法将要被調用但還未調用
    private static final int ADD_PENDING = ;
    // 對應Handler的handlerAdded方法被調用
    private static final int ADD_COMPLETE = ;
    // 對應Handler的handlerRemoved方法被調用
    private static final int REMOVE_COMPLETE = ;
           

AbstractChannelHandlerContext隻有一個構造方法:

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, 
              EventExecutor executor, String name, 
              boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;

        // 隻有執行線程為EventLoop或者标記為OrderedEventExecutor才是順序的
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }                

由于Netty将事件抽象為入站事件和出站事件,AbstractChannelHandlerContext對事件的處理也分為兩類,分别以ChannelRead和read事件為例分析。首先分析ChannelRead事件:

@Override
    public ChannelHandlerContext fireChannelRead(final Object msg){
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }                

回顧之前的分析,事件在ChannelPipeline中不會自動流動,入站事件需要調用

fireXXX

方法将事件傳遞到下一個處理器,出站事件需要調用諸如

write

方法傳遞給下一個處理器處理。查找入站處理器的代碼如下:

private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next; // 入站事件向後查找
        } while (!ctx.inbound);
        return ctx;
    }                

代碼十分簡潔,向雙向連結清單前進方向查找到最近的入站處理器即可;可推知,出站事件則向雙向連結清單後退方向查找最近的出站處理器。這也是出站\入站事件傳播方向不同的原因。

找到事件傳播的下一個處理器後,在處理器中執行處理過程的代碼如下:

static void invokeChannelRead(final AbstractChannelHandlerContext next, 
              final Object msg) {
        ObjectUtil.checkNotNull(msg, "msg");
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            // 使用目前IO線程執行使用者自定義處理
            next.invokeChannelRead(msg);
        } else {
            // 使用使用者定義的線程執行處理過程
            executor.execute(() -> {next.invokeChannelRead(msg);});
        }
    }

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {   
                // 處理器執行使用者自定義的處理過程
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }                

Netty為使用者提供了友善的線程切換處理,特别是在版本4.1後,可以直接使用JDK提供的線程池。最佳實踐:在一個處理器中如果有耗時較大的業務邏輯,可以指定該處理器的處理線程Executor,以避免占用IO線程造成性能損失。

之後再看一下

invokeHandler()

notifyHandlerException(t)

方法,invokeHandler決定是否調用處理器,進行業務邏輯處理:

private boolean invokeHandler() {
        // handlerState為volatile變量,存儲為本地變量,以便減少volatile讀
        int handlerState = this.handlerState;
        return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
    }                

可知,當一個處理器還沒有調用HandlerAdded方法時,隻有處理器的執行線程是非順序線程池的執行個體才能執行業務處理邏輯;否則必須等待已調用handlerAdded方法,才能處理業務邏輯。這部分的處理,保證了ChannelPipeline的線程安全性,由此使用者可以随意增加删除Handler。

notifyHandlerException方法對處理過程中出現的異常進行處理:

private void notifyHandlerException(Throwable cause) {
        if (inExceptionCaught(cause)) {
            // 處理異常的過程中出現異常
            logger.warn("...)
            return;
        }
        invokeExceptionCaught(cause);
    }

    private void invokeExceptionCaught(final Throwable cause) {
        if (invokeHandler()) {
            try {
                handler().exceptionCaught(this, cause);
            } catch (Throwable error) {
                logger.warn("...")
            }
        } else {
            fireExceptionCaught(cause);
        }
    }                

invokeExceptionCaught方法可類比invokeChannelRead方法,其形式一緻。由代碼可知,當一個Handler的處理過程出現異常時,會調用該Handler的

exceptionCaught()

方法進行處理。注意:異常事件的傳播也是由使用者使用fireExceptionCaught方法控制。

至此,ChannelRead入站事件的處理已分析完畢,read出站事件的處理也可類比,不同的是:出站事件需要反向找出站處理器Handler,不再贅述。

7.2.2.2 DefaultChannelHandlerContext

DefaultChannelHandlerContext是使用Netty時經常使用的事實Context類,其中的大部分功能已在AbstractChannelHandlerContext中完成,是以該類十分簡單,其構造方法如下:

DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, 
              EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }                

對出站\入站Handler的判别使用粗暴的instanceof:

private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }                

實作中引入了一個新的字段:

private final ChannelHandler handler;
           

回憶ChannelPipeline,pipeline其中形成Context的雙向連結清單,而處理邏輯則在Handler中。設計模式中指出,關聯Handler與Context的方法有兩種:組合和繼承。本例中,使用的是組合,下面分析繼承的方法。

7.2.2.3 HeadContext和TailContext

HeadContext和TailContext使用繼承的方式關聯Handler,作為ChannelPipeline雙向連結清單的頭節點和尾節點。首先分析HeadContext,類簽名如下:

final class HeadContext extends AbstractChannelHandlerContext
                 implements ChannelOutboundHandler, ChannelInboundHandler                

由于既繼承OutboundHandler又繼承InboundHandler,可知HeadContext需要同時處理出站事件和入站事件。以read和ChannelRead事件為例,當使用者調用read出站事件時,是在告訴IO線程:我需要向網絡讀資料做處理;當IO線程讀到資料後,則使用ChannelRead事件通知使用者:已讀取到資料。

read()

方法代碼如下:

public void read(ChannelHandlerContext ctx) {
        unsafe.beginRead();
    }
           

其中

unsafe

是Netty内部實作底層IO細節的類,

beginRead()

方法設定底層Selector關心read事件,如果read事件就緒,則會調用

unsafe.read()

方法讀取資料,然後調用

channelPipe.fireChannelRead()

方法通知使用者已讀取到資料,可進行業務處理。 HeadContext的

ChannelRead()

方法代碼如下:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }                

可知HeadContext隻是簡單的将事件傳播到下一個入站處理器。

也許你會有疑問:自己根本沒使用過read出站事件,為什麼資料自動讀取了呢?這是因為預設設定自動讀取

autoRead

。通過HeadContext的

channelActive

代碼分析:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
        readIfIsAutoRead(); // 自動讀取
    }

    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }
           

當channel激活後,如果配置了自動讀取,則會調用

channel.read()

,注意這将在ChannelPipeline中傳播出站事件read,最終傳播到HeadContext的

read()

方法,最後調用

unsafe.beginRead()

設定關心底層read事件,進而實作激活後自動讀取資料。而當讀取完一組資料後,在

channelReadComplete()

方法中繼續下一組資料的自動讀取。

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
        readIfIsAutoRead(); // 自動讀取下一組資料
    }                

分析完HeadContext,再分析TailContext。首先看方法簽名:

final class TailContext extends AbstractChannelHandlerContext 
            implements ChannelInboundHandler                

TailContext隻繼承入站事件處理器,作為雙向連結清單的尾節點,它對入站事件的處理極其簡單,那就是:什麼也不做。但有兩個方法除外:

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
            throws Exception {
        onUnhandledInboundException(cause);
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        onUnhandledInboundMessage(msg);
    }
           

而這也僅僅是做個簡單提示,告知使用者這兩個方法必須在ChannelPipeline中有處理器處理,否則很有可能是代碼出了問題:

protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
              "Discarded inbound message {} that reached at the tail of the pipeline. " +
              "Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }                

如果開啟了日志的Debug級别,不難相信,你已經接觸過這樣的提示。