天天看點

Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (一)

目錄

源碼之下無秘密 ── 做最好的 Netty 源碼分析教程

  • Netty 源碼分析之 番外篇 Java NIO 的前生今世
    • Java NIO 的前生今世 之一 簡介
    • Java NIO 的前生今世 之二 NIO Channel 小結
    • Java NIO 的前生今世 之三 NIO Buffer 詳解
    • Java NIO 的前生今世 之四 NIO Selector 詳解
  • Netty 源碼分析之 零 磨刀不誤砍柴工 源碼分析環境搭建
  • Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭
    • Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭 (用戶端)
    • Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭 (伺服器端)
  • Netty 源碼分析之 二 貫穿 Netty 的大動脈 ── ChannelPipeline (一)

此文章已同步發送到我的 github 上

前言

這篇是 Netty 源碼分析 的第二篇, 在這篇文章中, 我會為讀者詳細地分析 Netty 中的 ChannelPipeline 機制.

Channel 與 ChannelPipeline

相信大家都知道了, 在 Netty 中每個 Channel 都有且僅有一個 ChannelPipeline 與之對應, 它們的組成關系如下:

Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (一)

通過上圖我們可以看到, 一個 Channel 包含了一個 ChannelPipeline, 而 ChannelPipeline 中又維護了一個由 ChannelHandlerContext 組成的雙向連結清單. 這個連結清單的頭是 HeadContext, 連結清單的尾是 TailContext, 并且每個 ChannelHandlerContext 中又關聯着一個 ChannelHandler.

上面的圖示給了我們一個對 ChannelPipeline 的直覺認識, 但是實際上 Netty 實作的 Channel 是否真的是這樣的呢? 我們繼續用源碼說話.

在第一章 Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭 中, 我們已經知道了一個 Channel 的初始化的基本過程, 下面我們再回顧一下.

下面的代碼是 AbstractChannel 構造器:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}
           

AbstractChannel 有一個 pipeline 字段, 在構造器中會初始化它為 DefaultChannelPipeline的執行個體. 這裡的代碼就印證了一點:

每個 Channel 都有一個 ChannelPipeline

.

接着我們跟蹤一下 DefaultChannelPipeline 的初始化過程.

首先進入到 DefaultChannelPipeline 構造器中:

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}
           

在 DefaultChannelPipeline 構造器中, 首先将與之關聯的 Channel 儲存到字段 channel 中, 然後執行個體化兩個 ChannelHandlerContext, 一個是 HeadContext 執行個體 head, 另一個是 TailContext 執行個體 tail. 接着将 head 和 tail 互相指向, 構成一個雙向連結清單.

特别注意到, 我們在開始的示意圖中, head 和 tail 并沒有包含 ChannelHandler

, 這是因為 HeadContext 和 TailContext 繼承于 AbstractChannelHandlerContext 的同時也實作了 ChannelHandler 接口了, 是以它們有 Context 和 Handler 的雙重屬性.

ChannelPipeline 的初始化再探

在第一章的時候, 我們已經對 ChannelPipeline 的初始化有了一個大緻的了解, 不過當時重點畢竟不在 ChannelPipeline 這裡, 是以沒有深入地分析它的初始化過程. 那麼下面我們就來看一下具體的 ChannelPipeline 的初始化都做了哪些工作吧.

ChannelPipeline 執行個體化過程

我們再來回顧一下, 在執行個體化一個 Channel 時, 會伴随着一個 ChannelPipeline 的執行個體化, 并且此 Channel 會與這個 ChannelPipeline 互相關聯, 這一點可以通過NioSocketChannel 的父類 AbstractChannel 的構造器予以佐證:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}
           

當執行個體化一個 Channel(這裡以 EchoClient 為例, 那麼 Channel 就是 NioSocketChannel), 其 pipeline 字段就是我們新建立的 DefaultChannelPipeline 對象, 那麼我們就來看一下 DefaultChannelPipeline 的構造方法吧:

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}
           

可以看到, 在 DefaultChannelPipeline 的構造方法中, 将傳入的 channel 指派給字段 this.channel, 接着又執行個體化了兩個特殊的字段: tail 與 head. 這兩個字段是一個雙向連結清單的頭和尾. 其實在 DefaultChannelPipeline 中, 維護了一個以 AbstractChannelHandlerContext 為節點的雙向連結清單, 這個連結清單是 Netty 實作 Pipeline 機制的關鍵.

再回顧一下 head 和 tail 的類層次結構:

Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (一)
Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (一)

從類層次結構圖中可以很清楚地看到, head 實作了 ChannelInboundHandler, 而 tail 實作了 ChannelOutboundHandler 接口, 并且它們都實作了 ChannelHandlerContext 接口, 是以可以說 head 和 tail 即是一個 ChannelHandler, 又是一個 ChannelHandlerContext.

接着看一下 HeadContext 的構造器:

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true);
    unsafe = pipeline.channel().unsafe();
}
           

它調用了父類 AbstractChannelHandlerContext 的構造器, 并傳入參數 inbound = false, outbound = true.

TailContext 的構造器與 HeadContext 的相反, 它調用了父類 AbstractChannelHandlerContext 的構造器, 并傳入參數 inbound = true, outbound = false.

即 header 是一個 outboundHandler, 而 tail 是一個inboundHandler, 關于這一點, 大家要特别注意, 因為在後面的分析中, 我們會反複用到 inbound 和 outbound 這兩個屬性.

ChannelInitializer 的添加

上面一小節中, 我們已經分析了 Channel 的組成, 其中我們了解到, 最開始的時候 ChannelPipeline 中含有兩個 ChannelHandlerContext(同時也是 ChannelHandler), 但是這個 Pipeline并不能實作什麼特殊的功能, 因為我們還沒有給它添加自定義的 ChannelHandler.

通常來說, 我們在初始化 Bootstrap, 會添加我們自定義的 ChannelHandler, 就以我們熟悉的 EchoClient 來舉例吧:

Bootstrap b = new Bootstrap();
b.group(group)
 .channel(NioSocketChannel.class)
 .option(ChannelOption.TCP_NODELAY, true)
 .handler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new EchoClientHandler());
     }
 });
           

上面代碼的初始化過程, 相信大家都不陌生. 在調用 handler 時, 傳入了 ChannelInitializer 對象, 它提供了一個 initChannel 方法供我們初始化 ChannelHandler. 那麼這個初始化過程是怎樣的呢? 下面我們就來揭開它的神秘面紗.

ChannelInitializer 實作了 ChannelHandler, 那麼它是在什麼時候添加到 ChannelPipeline 中的呢? 進行了一番搜尋後, 我們發現它是在 Bootstrap.init 方法中添加到 ChannelPipeline 中的.

其代碼如下:

@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(handler());
    ...
}
           

上面的代碼将 handler() 傳回的 ChannelHandler 添加到 Pipeline 中, 而 handler() 傳回的是handler 其實就是我們在初始化 Bootstrap 調用 handler 設定的 ChannelInitializer 執行個體, 是以這裡就是将 ChannelInitializer 插入到了 Pipeline 的末端.

此時 Pipeline 的結構如下圖所示:

Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (一)

有朋友可能就有疑惑了, 我明明插入的是一個 ChannelInitializer 執行個體, 為什麼在 ChannelPipeline 中的雙向連結清單中的元素卻是一個 ChannelHandlerContext? 為了解答這個問題, 我們繼續在代碼中尋找答案吧.

我們剛才提到, 在 Bootstrap.init 中會調用 p.addLast() 方法, 将 ChannelInitializer 插入到連結清單末端:

@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
    synchronized (this) {
        checkDuplicateName(name); // 檢查此 handler 是否有重複的名字

        AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
        addLast0(name, newCtx);
    }

    return this;
}
           

addLast 有很多重載的方法, 我們關注這個比較重要的方法就可以了.

上面的 addLast 方法中, 首先檢查這個 ChannelHandler 的名字是否是重複的, 如果不重複的話, 則為這個 Handler 建立一個對應的 DefaultChannelHandlerContext 執行個體, 并與之關聯起來(Context 中有一個 handler 屬性儲存着對應的 Handler 執行個體). 判斷此 Handler 是否重名的方法很簡單: Netty 中有一個 name2ctx Map 字段, key 是 handler 的名字, 而 value 則是 handler 本身. 是以通過如下代碼就可以判斷一個 handler 是否重名了:

private void checkDuplicateName(String name) {
    if (name2ctx.containsKey(name)) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}
           

為了添加一個 handler 到 pipeline 中, 必須把此 handler 包裝成 ChannelHandlerContext. 是以在上面的代碼中我們可以看到新執行個體化了一個 newCtx 對象, 并将 handler 作為參數傳遞到構造方法中. 那麼我們來看一下執行個體化的 DefaultChannelHandlerContext 到底有什麼玄機吧.

首先看它的構造器:

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, ChannelHandler handler) {
    super(pipeline, group, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}
           

DefaultChannelHandlerContext 的構造器中, 調用了兩個很有意思的方法: isInbound 與 isOutbound, 這兩個方法是做什麼的呢?

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}
           

從源碼中可以看到, 當一個 handler 實作了 ChannelInboundHandler 接口, 則 isInbound 傳回真; 相似地, 當一個 handler 實作了 ChannelOutboundHandler 接口, 則 isOutbound 就傳回真.

而這兩個 boolean 變量會傳遞到父類 AbstractChannelHandlerContext 中, 并初始化父類的兩個字段: inbound 與 outbound.

那麼這裡的 ChannelInitializer 所對應的 DefaultChannelHandlerContext 的 inbound 與 inbound 字段分别是什麼呢? 那就看一下 ChannelInitializer 到底實作了哪個接口不就行了? 如下是 ChannelInitializer 的類層次結構圖:

Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (一)

可以清楚地看到, ChannelInitializer 僅僅實作了 ChannelInboundHandler 接口, 是以這裡執行個體化的 DefaultChannelHandlerContext 的 inbound = true, outbound = false.

不就是 inbound 和 outbound 兩個字段嘛, 為什麼需要這麼大費周章地分析一番? 其實這兩個字段關系到 pipeline 的事件的流向與分類, 是以是十分關鍵的, 不過我在這裡先賣個關子, 後面我們再來詳細分析這兩個字段所起的作用. 在這裡, 讀者隻需要記住, ChannelInitializer 所對應的 DefaultChannelHandlerContext 的 inbound = true, outbound = false 即可.

當建立好 Context 後, 就将這個 Context 插入到 Pipeline 的雙向連結清單中:

private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
    checkMultiplicity(newCtx);

    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;

    name2ctx.put(name, newCtx);

    callHandlerAdded(newCtx);
}
           

顯然, 這個代碼就是典型的雙向連結清單的插入操作了. 當調用了 addLast 方法後, Netty 就會将此 handler 添加到雙向連結清單中 tail 元素之前的位置.

自定義 ChannelHandler 的添加過程

在上一小節中, 我們已經分析了一個 ChannelInitializer 如何插入到 Pipeline 中的, 接下來就來探讨一下 ChannelInitializer 在哪裡被調用, ChannelInitializer 的作用, 以及我們自定義的 ChannelHandler 是如何插入到 Pipeline 中的.

在 Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭 (用戶端) 一章的 channel 的注冊過程 小節中, 我們已經分析過 Channel 的注冊過程了, 這裡我們再簡單地複習一下:

  • 首先在 AbstractBootstrap.initAndRegister中, 通過 group().register(channel), 調用 MultithreadEventLoopGroup.register 方法
  • 在MultithreadEventLoopGroup.register 中, 通過 next() 擷取一個可用的 SingleThreadEventLoop, 然後調用它的 register
  • 在 SingleThreadEventLoop.register 中, 通過 channel.unsafe().register(this, promise) 來擷取 channel 的 unsafe() 底層操作對象, 然後調用它的 register.
  • 在 AbstractUnsafe.register 方法中, 調用 register0 方法注冊 Channel
  • 在 AbstractUnsafe.register0 中, 調用 AbstractNioChannel#doRegister 方法
  • AbstractNioChannel.doRegister 方法通過 javaChannel().register(eventLoop().selector, 0, this) 将 Channel 對應的 Java NIO SockerChannel 注冊到一個 eventLoop 的 Selector 中, 并且将目前 Channel 作為 attachment.

而我們自定義 ChannelHandler 的添加過程, 發生在 AbstractUnsafe.register0 中, 在這個方法中調用了 pipeline.fireChannelRegistered() 方法, 其實作如下:

@Override
public ChannelPipeline fireChannelRegistered() {
    head.fireChannelRegistered();
    return this;
}
           

上面的代碼很簡單, 就是調用了 head.fireChannelRegistered() 方法而已.

關于上面代碼的 head.fireXXX 的調用形式, 是 Netty 中 Pipeline 傳遞事件的常用方式, 我們以後會經常看到.

還記得 head 的 類層次結構圖不, head 是一個 AbstractChannelHandlerContext 執行個體, 并且它沒有重寫 fireChannelRegistered 方法, 是以 head.fireChannelRegistered 其實是調用的 AbstractChannelHandlerContext.fireChannelRegistered:

@Override
public ChannelHandlerContext fireChannelRegistered() {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
    return this;
}
           

這個方法的第一句是調用 findContextInbound 擷取一個 Context, 那麼它傳回的 Context 到底是什麼呢? 我們跟進代碼中看一下:

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}
           

很顯然, 這個代碼會從 head 開始周遊 Pipeline 的雙向連結清單, 然後找到第一個屬性 inbound 為 true 的 ChannelHandlerContext 執行個體. 想起來了沒? 我們在前面分析 ChannelInitializer 時, 花了大量的筆墨來分析了 inbound 和 outbound 屬性, 你看現在這裡就用上了. 回想一下, ChannelInitializer 實作了 ChannelInboudHandler, 是以它所對應的 ChannelHandlerContext 的 inbound 屬性就是 true, 是以這裡傳回就是 ChannelInitializer 執行個體所對應的 ChannelHandlerContext. 即:

Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (一)

當擷取到 inbound 的 Context 後, 就調用它的 invokeChannelRegistered 方法:

private void invokeChannelRegistered() {
    try {
        ((ChannelInboundHandler) handler()).channelRegistered(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}
           

我們已經強調過了, 每個 ChannelHandler 都與一個 ChannelHandlerContext 關聯, 我們可以通過 ChannelHandlerContext 擷取到對應的 ChannelHandler. 是以很顯然了, 這裡 handler() 傳回的, 其實就是一開始我們執行個體化的 ChannelInitializer 對象, 并接着調用了 ChannelInitializer.channelRegistered 方法. 看到這裡, 讀者朋友是否會覺得有點眼熟呢? ChannelInitializer.channelRegistered 這個方法我們在第一章的時候已經大量地接觸了, 但是我們并沒有深入地分析這個方法的調用過程, 那麼在這裡讀者朋友應該對它的調用有了更加深入的了解了吧.

那麼這個方法中又有什麼玄機呢? 繼續看代碼:

@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    initChannel((C) ctx.channel());
    ctx.pipeline().remove(this);
    ctx.fireChannelRegistered();
}
           

initChannel 這個方法我們很熟悉了吧, 它就是我們在初始化 Bootstrap 時, 調用 handler 方法傳入的匿名内部類所實作的方法:

.handler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new EchoClientHandler());
     }
 });
           

是以當調用了這個方法後, 我們自定義的 ChannelHandler 就插入到 Pipeline 了, 此時的 Pipeline 如下圖所示:

Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (一)

當添加了自定義的 ChannelHandler 後, 會删除 ChannelInitializer 這個 ChannelHandler, 即 "ctx.pipeline().remove(this)", 是以最後的 Pipeline 如下:

Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (一)

好了, 到了這裡, 我們的 自定義 ChannelHandler 的添加過程 也分析的查不多了.

下一小節 Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (二)

本文由 yongshun 發表于個人部落格, 采用 署名-相同方式共享 3.0 中國大陸許可協定.

Email: [email protected]

本文标題為: Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (一)

本文連結為: https://segmentfault.com/a/1190000007308934