天天看點

Netty源碼之ChannelPipeline

這幾天一直在看netty,感覺跟之前看過的tomcat原理多多少少有點類似,看了些書,覺得還是看源碼比較實在,有感覺。

Netty源碼之ChannelPipeline

這個圖是 netty實戰上面扣來的我覺得很實在(它是經過抽象看起來兩條鍊,實際上就一條雙向連結清單,無非類型不符合則不處理),在ChannelPipeLine中,維護了具體處理相關通知的成員handler,組成的雙向連結清單,連結清單成員為ChannelHandlerContext。左邊是連結清單頭,右邊是連結清單尾。

根據請求的方向分為Outboundhandler跟Inboundhandler,fireIN_EVT()即從連結清單頭向尾傳遞,僅僅類型是Inboundhandler才處理,資料的出站運動則從連結清單尾部向頭部傳遞,僅被Outboundhandler處理。出站資料将會達到網絡層,Socket傳出。

每一個Channel都會有一個對應的ChannelPipLine。

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        unsafe = newUnsafe();
        pipeline = new DefaultChannelPipeline(this);
    }
           

下面是ChannelPipLine的構造函數

public DefaultChannelPipeline(AbstractChannel channel) {
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        this.channel = channel;

        //TailHandler是DefaultChannelPipeline的一個預設實作ChannelInboundHandler的一個内部類
        TailHandler tailHandler = new TailHandler();
        tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler);


        //HeadHandler是DefaultChannelPipeline的一個預設實作ChannelOutboundHandler的一個内部類
        HeadHandler headHandler = new HeadHandler(channel.unsafe());
        head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler);

        head.next = tail;
        tail.prev = head;
    }
           

邏輯很簡單,無非構造兩個頭尾handler跟ChannelHandlerContext,并将headContext跟tailContext構造成雙向連結清單。之後的addLast,addFirst無非向這兩之間的添加context,雙向連結清單的頭尾依舊是以上兩個。

下面看addFirst()

@Override
    public ChannelPipeline addFirst(String name, ChannelHandler handler) {
        return addFirst(null, name, handler);
    }


    @Override
    public ChannelPipeline addFirst(EventExecutorGroup group, final String name, ChannelHandler handler) {
        synchronized (this) {
            //檢查是否已添加同名的handler
            checkDuplicateName(name);
            //根據pipline,group,name,handler建構出一個新的DefaultChannelHandlerContext
            DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
            //添加這個newCtx到pipline:通過操作連結清單,将該handlerContext添加到head的後面
            addFirst0(name, newCtx);
        }

        return this;
    }
           

一口氣貼太多可能看不過來,以上邏輯無非先檢查是否有同名headler,再根據headler,pipline封裝資料得到綁定的Context(其中設定了inbound/outbound屬性),最後調用addFirst0,将context加入雙向連結清單中。

private void checkDuplicateName(String name) {
        if (name2ctx.containsKey(name)) {
            throw new IllegalArgumentException("Duplicate handler name: " + name);
        }
    }
           

name2ctx是一個以handler名字為key,ChannelHandlerContext為值的map;

private void addFirst0(String name, DefaultChannelHandlerContext newCtx) {
        /**
         * 檢查newCtx中的handler是否被重複添加:
         * 對于一個沒有加Shareble的hanlder類,如果每次都是new出來不同的對象,是可以重複添加到
         * 同一個pipline的,但是如果是同一個對象執行個體,是不允許重複添加到同一個pipline的
         */
        checkMultiplicity(newCtx);
        DefaultChannelHandlerContext nextCtx = head.next;
        newCtx.prev = head;
        newCtx.next = nextCtx;
        head.next = newCtx;
        nextCtx.prev = newCtx;

        name2ctx.put(name, newCtx);

        callHandlerAdded(newCtx);
    }
           

可以看到,先checkMultiplicity 檢查ctx中的handler是否多次添加的合法性,将新的ctx插入到head節點後。順便将新的存入name2ctx中。并調用handlerAdded觸發的事件。

private static void checkMultiplicity(ChannelHandlerContext ctx) {
        ChannelHandler handler = ctx.handler();
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            /**
             * 1.判斷目前添加的ChannelHandlerContext的handler是否加了注解@Sharable
             * 2.判斷handler是否已經被添加過
             * 如果沒有添加@Sharable注解,又被添加過,那麼将會抛出異常
             * 如果加了@Sharable注解,那麼這個handler可以被多次添加
             */
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                                " is not a @Sharable handler, so can't be added or removed mul
           
@Override
    public ChannelHandlerContext fireChannelActive() {
        //先找到目前ChannelHandlerContext的下一個ChannelHandlerContext
        final DefaultChannelHandlerContext next = findContextInbound();
        //得到執行器
        EventExecutor executor = next.executor();
        /**
         * 1.首先executor.inEventLoop()方法判斷目前線程是不是netty建立的
         */
        //調用上面得到的next的真正的invokeChannelActive
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
        return this;
    }
           

基本上,到這裡,整個連結清單的構造理清了~下面看看雙向連結清單中的事件的傳遞。

當ServerSocket剛剛監聽端口bind的時候,這裡會觸發一次fireChannelActive事件;那我們從這開始跟蹤~

@Override
    public ChannelPipeline fireChannelActive() {
        //從pipline的head開始觸發ChannelActive事件
        head.fireChannelActive();
        //判斷isAutoRead的值,如果為true(預設值為true),則自動調用read
        if (channel.config().isAutoRead()) {
            channel.read();
        }
        return this;
    }
           

我們可以看到,pipline的fireChannelActive是調用head的fireChannel,從head開始,并順着連結清單往後調用,繼續看下面代碼

@Override
    public ChannelHandlerContext fireChannelActive() {
        //先找到目前ChannelHandlerContext的下一個ChannelHandlerContext
        final DefaultChannelHandlerContext next = findContextInbound();
        //得到執行器
        EventExecutor executor = next.executor();
        /**
         * 1.首先executor.inEventLoop()方法判斷目前線程是不是netty建立的
         */
        //調用上面得到的next的真正的invokeChannelActive
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
        return this;
    }
           

這裡很有意思,從這裡也可以看出netty線程模型的優點~慢慢來,先得到連結清單下一個handlercontext

private DefaultChannelHandlerContext findContextInbound() {
        /**
         * inbound時候,從head往tail周遊handle,并找出這些handle裡面所有inbound類型的handle,
         * 一直周遊到TailHandle,TailHandle是netty預設實作的一個inbound類型的handle,這個TailHandle預設實作的
         * ChannelInboundHandler接口都是空方法,是以當調用到tail對應的方法的時候,調用鍊就會終止
         */
        DefaultChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }
           

先得到連結清單上下一個inbound屬性為true的ctx并傳回,通過這個其實就已經把雙向連結清單給使用者抽象成兩條連結清單的感覺,如最上面的那張圖。

再繼續回到上一個函數,得到執行器,然後判斷目前線程是否是該執行器綁定Eventloop的指定線程,如果是的話則直接調用事件next.invokeChannelActive();否則将該事件分裝成任務加入該執行器的任務隊列中。(這裡涉及到netty的線程模型和任務排程,在後面将eventLoop時會仔細分析)

private void invokeChannelActive() {
        try {
            ((ChannelInboundHandler) handler).channelActive(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }
           

上面的this為下一個ctx,如果inbound為ture,那麼它一定是ChannelInboundHandler的子類,netty這裡采用了擴充卡模型,我們直接看到對應的adapter的方法

@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }
           

實作很簡單,調用下一個inbound ctx的firchannelActive,保證了事件在連結清單中傳遞。

總結

看完netty實戰後,書總體上還是不錯,個人覺得僅僅算本源碼指導書,光看書收獲不大,看完書後再看源碼真的舒服得多,pipline模型算是用的恰到好處,了解tomcat内部的pipline後再看有那麼點親切感。

繼續閱讀