天天看點

【Netty權威指南】09-Netty服務端建立分析1、Netty服務端建立源碼分析2、用戶端接入源碼分析

1、Netty服務端建立源碼分析

當我們直接使用 JDK NIO的類庫開發基于NIO的異步服務端時,需要使用到多路複用器 Selector、 ServerSocketChannel、 SocketChannel、 ByteBuffer、 SelectionKey等,相比于傳統的BIO開發,NIO的開發要複雜很多,開發出穩定、高性能的異步通信架構,一直是個難題,Netty為了向使用者屏蔽NlO通信的底層細節,在和使用者互動的邊界做了封裝,目的就是為了減少使用者開發工作量,降低開發難度。 ServerBootstrap是 Socket服務端的啟動輔助類,使用者通過 ServerBootstrap可以友善地建立Netty的服務端。

1.1、Netty服務端建立時序圖
【Netty權威指南】09-Netty服務端建立分析1、Netty服務端建立源碼分析2、用戶端接入源碼分析

步驟1:建立 ServerBootstrap執行個體。 ServerBootstrap是 Netty服務端的啟動輔助類,它提供了一系列的方法用于設定服務端啟動相關的參數。底層通過門面模式對各種能力進行抽象和封裝,盡量不需要使用者跟過多的底層API打交道,以降低使用者的開發難度。我們在建立 ServerBootstrap執行個體時,會驚訝地發現 ServerBootstrap隻有一個無參的構造函數,作為啟動輔助類這讓人不可思議,因為它需要與多個其他元件或者類互動。ServerBootstrap構造函數沒有參數的根本原因是因為它的參數太多了,而且未來也可能會發生變化,為了解決這個問題,就需要引入 Builder模式。《 Effective java》第二版第2條建議遇到多個構造器參數時要考慮用建構器,關于多個參數構造函數的缺點和使用建構器的優點大家可以查閱《 Effective java》,在此不再詳述。

步驟2:設定并綁定 Reactor線程池。Netty的 Reactor線程池是 EventLoopGroup,它實際就是 EventLoop的數組。 EventLoop的職責是處理所有注冊到本線程多路複用器Selector上的 Channel, Selector的輪詢操作由綁定的 EventLoop線程run方法驅動,在一個循環體内循環執行。值得說明的是, EventLoop的職責不僅僅是處理網絡IO事件,使用者自定義的Task和定時任務Task也統一由 EventLoop負責處理,這樣線程模型就實作了統一。從排程層面看,也不存在從 EventLoop線程中再啟動其他類型的線程用于異步執行另外的任務,這樣就避免了多線程并發操作和鎖競争,提升了IO線程的處理和排程性能。

步驟3:設定并綁定服務端 Channel作為NIO服務端,需要建立 ServerSocketChannel,Netty對原生的NIO類庫進行了封裝,對應實作是 NioServerSocketChannel對于使用者而言,不需要關心服務端 Channel的底層實作細節和工作原理,隻需要指定具體使用哪種服務端

Channel即可。是以,Netty的 ServerBootstrap方法提供了 channel方法用于指定服務端Channel的類型。Netty通過工廠類,利用反射建立 NioServerSocketChannel對象。由于服務端監聽端口往往隻需要在系統啟動時才會調用,是以反射對性能的影響并不大。相關代碼如下。

public ServerBootstrap channel(Class<? extends ServerChannel> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ServerBootstrapChannelFactory<ServerChannel>(channelClass));
}
           

步驟4:鍊路建立的時候建立并初始化 ChannelPipeline。 ChannelPipeline并不是NlO服務端必需的,它本質就是一個負責處理網絡事件的職責鍊,負責管理和執行ChannelHandler。網絡事件以事件流的形式在 ChannelPipeline中流轉,由 ChannelPipeline根據 ChannelHandler的執行政策排程 ChannelHandler的執行。典型的網絡事件如下

(1)鍊路注冊;

(2)鍊路激活

(3)鍊路斷開;

(4)接收到請求消息;

(5)請求消息接收并處理完畢;

(6)發送應答消息

(7)鍊路發生異常;

(8)發生使用者自定義事件。

步驟5:初始化 ChannelPipeline完成之後,添加并設定 ChannelHandler。ChannelHandler是Netty提供給使用者定制和擴充的關鍵接口。利用 ChannelHandler使用者可以完成大多數的功能定制,例如消息編解碼、心跳、安全認證、 TSLISSL認證、流量控制和流量整形等。Netty同時也提供了大量的系統 ChannelHandler供使用者使用,比較實用的系統Channelhandler總結如下。

(1)系統編解碼架構 ByteToMessageCodec;

(2)通用基于長度的半包解碼器—— LengthFieldBasedFrameDecoder;

(3)碼流日志列印 Handler—— Logging Handler

(4)SSL安全認證 Handler—— SsIHandler;

(5)鍊路空閑檢測 Handler—— IdleStateHandler;

(6)流量整形 Handler—— ChannelTrafficShapingHandler;

(7)Base64編解碼——Base64Decoder和Base64Encoder。

建立和添加 ChannelHandler的代碼示例如下。

.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch)
            throws Exception {
        ch.pipeline().addLast(new EchoServerHandler());
    }
});
           

步驟6:綁定并啟動監聽端口。在綁定監聽端口之前系統會做一系列的初始化和檢測工作,完成之後,會啟動監聽端口,并将 ServerSocketChannel注冊到 Selector上監聽用戶端連接配接,相關代碼如下。

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    javaChannel().socket().bind(localAddress, config.getBacklog());
}
           

步驟7: Selector輪詢。由 Reactor線程 NioEventLoop負責排程和執行 Selector輪詢操作,選擇準備就緒的 Channel集合,相關代碼如下。

private void select() throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;

            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {
                // Selected something,
                // waken up by user, or
                // the task queue has a pending task.
                break;
            }

            if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                        "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                        selectCnt);

                rebuildSelector();
                selector = this.selector;

                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            currentTimeNanos = System.nanoTime();
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
        }
        // Harmless exception - log anyway
    }
}
           

步驟8:當輪詢到準備就緒的 Channel之後,就由 Reactor線程 NioEventLoop執行ChannelPipeline的相應方法,最終排程并執行 ChannelHandler,接口如圖所示。

【Netty權威指南】09-Netty服務端建立分析1、Netty服務端建立源碼分析2、用戶端接入源碼分析

步驟9:執行 Netty系統 Channelhandler和使用者添加定制的 Channelhandler。ChannelPipeline根據網絡事件的類型,排程并執行 Channelhandler,相關代碼如下。

@Override
public ChannelHandlerContext fireChannelRead(Object msg) {
    DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
    next.invoker.invokeChannelRead(next, msg);
    return this;
}
           

1.2、Netty服務端建立源碼分析

首先通過構造函數建立 ServerBootstrap執行個體,随後,通常會建立兩個EventLoopGroup(并不是必須要建立兩個不同的 EventLoopGroup,也可以隻建立一個并共享),代碼如下

EventLoopGroup acceptorGroup = new NioEventLoopGroup();

EventLoopGroup IOGroup = new NioEventLoopGroup();

NioEventLoopGroup實際就是Reactor線程池,負責排程和執行用戶端的接入、網絡讀寫事件的處理、使用者自定義任務和定時任務的執行。通過 ServerBootstrap的 group方法将兩個 EventLoopGroup執行個體傳入,代碼如下。

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup;
    return this;
}
           

其中父 NioEventLoopGroup被傳入了父類構造函數中,代碼如下。

public B group(EventLoopGroup group) {
    if (group == null) {
        throw new NullPointerException("group");
    }
    if (this.group != null) {
        throw new IllegalStateException("group set already");
    }
    this.group = group;
    return (B) this;
}
           

該方法會被用戶端和服務端重用,用于設定工作I/O線程,執行和排程網絡事件的讀寫線程組和線程類型設定完成後,需要設定服務端 Channel用于端口監聽和用戶端鍊路接入。Netty通過 Channel工廠類來建立不同類型的 Channel,對于服務端,需要建立NioServerSocketChannel。是以,通過指定 Channel類型的方式建立 Channel工廠。

ServerBootstrapChannelFactory是 ServerBootstrap的内部靜态類,職責是根據 Channel的類型通過反射建立 Channel的執行個體,服務端需要建立的是 NioServerSocketChannel執行個體,代碼如下。

public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
    try {
        Constructor<? extends T> constructor = clazz.getConstructor(EventLoop.class, EventLoopGroup.class);
        return constructor.newInstance(eventLoop, childGroup);
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + clazz, t);
    }
}
           

指定 NioServerSocketChannel後,需要設定TCP的一些參數,作為服務端,主要是要設定TCP的 backlog參數,底層C的對應接口定義如下。

int listen(int fd, int backlog);
           

backlog指定了核心為此套接口排隊的最大連接配接個數,對于給定的監聽套接口,核心要維護兩個隊列:未連結隊列和已連接配接隊列,根據TCP三路握手過程中三個分節來分隔這兩個隊列。伺服器處于 listen狀态時,收到用戶端syn分節( connect)時在未完成隊列中建立一個新的條目,然後用三路握手的第二個分節即伺服器的syn響應用戶端,此條目在第三個分節到達前(用戶端對伺服器syn的ack)一直保留在未完成連接配接隊列中,如果三路握手完成,該條目将從未完成連接配接隊列搬到已完成連接配接隊列尾部。當程序調用 accept時,從已完成隊列中的頭部取出一個條目給程序,當已完成隊列為空時程序将睡眠,直到有條目在已完成連接配接隊列中才喚醒。 backlog被規定為兩個隊列總和的最大值,大多數實作預設值為5,但在高并發web伺服器中此值顯然不夠,Lighttpd中此值達到128×8。需要設定此值更大一些的原因是未完成連接配接隊列的長度可能因為用戶端syn的到達及等待三路握手第三個分節的到達延時而增大。Nety預設的 backlog為100,當然,使用者可以修改預設值,這需要根據實際場景和網絡狀況進行靈活設定。

TCP參數設定完成後,使用者可以為啟動輔助類和其父類分别指定 Handler兩類 Handler的用途不同:子類中的 Handler是 NioServerSocketchannel對應的 ChannelPipeline的Handler;父類中的 Handler是用戶端新接入的連接配接 Socketchannel對應的 ChannelPipeline的 Handler。兩者的差別可以通過圖13-3來展示。

【Netty權威指南】09-Netty服務端建立分析1、Netty服務端建立源碼分析2、用戶端接入源碼分析

本質差別就是: ServerBootstrap中的 Handler是 NioServerSocketchannel使用的,所有連接配接該監聽端口的用戶端都會執行它;父類 AbstractBootstrap中的 Handler是個工廠類,它為每個新接入的用戶端都建立一個新的 Handler服務端啟動的最後一步,就是綁定本地端口,啟動服務,下面我們來分析下這部分代碼。

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();//NO.1
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    final ChannelPromise promise;
    if (regFuture.isDone()) {//NO.2
        promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
        regFuture.addListener(new ChannelFutureListener() {
            @Override//NO.3
            public void operationComplete(ChannelFuture future) throws Exception {
                doBind0(regFuture, channel, localAddress, promise);
            }
        });
    }

    return promise;
}
           

先看下NO.1。首先建立 Channel, createChannel由子類 ServerBootstrap實作,建立新的 NioServerSocketChannel。它有兩個參數:參數1是從父類的NO線程池中順序擷取個 NioEventLoop,它就是服務端用于監聽和接收用戶端連接配接的 Reactor線程;參數2是所謂的 worker Group線程池,它就是處理I/O讀寫的 Reactor線程組,相關代碼如下。

final ChannelFuture initAndRegister() {
    Channel channel;
    try {
        channel = createChannel();
    } catch (Throwable t) {
        return VoidChannel.INSTANCE.newFailedFuture(t);
    }

    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        return channel.newFailedFuture(t);
    }

    ChannelPromise regFuture = channel.newPromise();
    channel.unsafe().register(regFuture);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}
           

NioServerSocketchannel建立成功後,對它進行初始化,初始化工作主要有以下三點。

@Override
void init(Channel channel) throws Exception {
    //(1)設定 Socket參數和 NioServerSocketChannel的附加屬性,代碼如下。
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
    //(2)将AbstractBootstrap的Handler添加到NioServerSocketChannel的ChannelPipeline中,代碼如下。
    ChannelPipeline p = channel.pipeline();
    if (handler() != null) {
        p.addLast(handler());
    }

    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }
    //(3)将用于服務端注冊的HandlerServerBootstrapAcceptor添加到ChannelPipeline中,代碼如下。
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
                    currentChildAttrs));
        }
    });
}
           

到此,Netty服務端監聽的相關資源已經初始化完畢,就剩下最後一步一一注冊NioServerSocketChannel到Reactor線程的多路複用器上,然後輪詢用戶端連接配接事件。在分析注冊代碼之前,我們先通過圖13-4看看目前NioServerSocketchannel的ChannelPipeline的組成。

【Netty權威指南】09-Netty服務端建立分析1、Netty服務端建立源碼分析2、用戶端接入源碼分析

最後,我們看下 NioServer SocketChannel的注冊。當 NioServerSocketChannel初始化完成之後,需要将它注冊到 Reactor線程的多路複用器上監聽新用戶端的接入,代碼如下。

@Override
public final void register(final ChannelPromise promise) {
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            promise.setFailure(t);
        }
    }
}
           

首先判斷是否是 Nio EventLoop自身發起的操作。如果是,則不存在并發操作,直接執行 Channel注冊:如果由其他線程發起,則封裝成一個Task放入消息隊列中異步執行。此處,由于是由 Server Bootstrap所線上程執行的注冊操作,是以會将其封裝成Task投遞到

NioEventLoop中執行,代碼如下。

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!ensureOpen(promise)) {
            return;
        }
        doRegister();
        registered = true;
        promise.setSuccess();
        pipeline.fireChannelRegistered();
        if (isActive()) {
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        if (!promise.tryFailure(t)) {
            logger.warn(
                    "Tried to fail the registration promise, but it is complete already. " +
                            "Swallowing the cause of the registration failure:", t);
        }
    }
}
           

将 NioServerSocketchannel注冊到 NioEventLoop的 Selector上,代碼如下:

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}
           

大家可能會很詫異,應該注冊 OP_ACCEPT(16)到多路複用器上,怎麼注冊0呢?

0表示隻注冊,不監聽任何網絡操作。這樣做的原因如下。

(1)注冊方法是多态的,它既可以被 NioServerSocketchannel用來監聽用戶端的連接配接接入,也可以注冊 Socketchannel用來監聽網絡讀或者寫操作;

(2)通過 SelectionKey的 interestOps(int ops)方法可以友善地修改監聽操作位。是以,此處注冊需要擷取 SelectionKey并給 AbstractNioChannel的成員變量 selectionKey指派。注冊成功之後,觸發 ChannelRegistered事件,方法如下

promise.setSuccess();
pipeline.fireChannelRegistered();
           

當 ChannelRegistered事件傳遞到 TailHandler後結束, TailHandler也不關心ChannelRegistered事件,是以是空實作,代碼如下

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
           

ChannelRegistered事件傳遞完成後,判斷 Server Socketchannel監聽是否成功,如果成功,需要出發 NioServerSocketChannel的 ChannelActive事件,代碼如下

if (inActive()){
    pipeline. fireChannelActive();
}
           

inActive也是個多态方法。如果是服務端,判斷監聽是否啟動;如果是用戶端,判斷TCP連接配接是否完成。 ChannelActive事件在 ChannelPipeline中傳遞,完成之後根據配置決定是否自動觸發 Channel的讀操作,代碼如下。

@Override
public ChannelPipeline fireChannelActive() {
    head.fireChannelActive();

    if (channel.config().isAutoRead()) {
        channel.read();
    }

    return this;
}
           

Abstractchannel的讀操作觸發 ChannelPipeline的讀操作,最終調用到 HeadHandler的讀方法,代碼如下。

@Override
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}
           

繼續看 AbstractUnsafe的 beginRead方法,代碼如下。

@Override
public void beginRead() {
    if (!isActive()) {
        return;
    }

    try {
        doBeginRead();
    } catch (final Exception e) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireExceptionCaught(e);
            }
        });
        close(voidPromise());
    }
}
           

由于不同類型的 Channel對讀操作的準備工作不同,是以, beginRead也是個多态方法,對于NIO通信,無論是用戶端還是服務端,都是要修改網絡監聽操作位為自身感興趣的,對于 NioServer SocketChannel感興趣的操作是 OP_ACCEPT(16),于是重新修改注冊的操作位為 OP_ACCEPT,代碼如下。

@Override
protected void doBeginRead() throws Exception {
    if (inputShutdown) {
        return;
    }

    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
           

在某些場景下,目前監聽的操作類型和 Chanel關心的網絡事件是一緻的,不需要重複注冊,是以增加了&操作的判斷,隻有兩者不一緻,才需要重新注冊操作位。

JDK SelectionKey有4種操作類型,分别為:

(1) OP_READ=1<<0;

(2) OP_WRITE=1<<2;

(3) OP_CONNECT=1<<3:

(4) OP_ACCEPT=1<<4。

由于隻有4種網絡操作類型,是以用4bit就可以表示所有的網絡操作位,由于Java語言沒有bit類型,是以使用了整型來表示,每個操作位代表一種網絡操作類型,分别為:0001、0010、0100、1000,這樣做的好處是可以非常友善地通過位操作來進行網絡操作位的狀态判斷和狀态修改,進而提升操作性能。

由于建立 NioServer SocketChannel将 readInterestOp設定成了 OP_ACCEPT,是以,在服務端鍊路注冊成功之後重新将操作位設定為監聽用戶端的網絡連接配接操作,初始化NioServerSocketchannel的代碼如下。

public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
    super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);
    config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
}
           

2、用戶端接入源碼分析

負責處理網絡讀寫、連接配接和用戶端請求接入的 Reactor線程就是 NioEventLoop,下面我們分析下 NioEventLoop是如何處理新的用戶端連接配接接入的。當多路複用器檢測到新的準備就緒的 Channel時,預設執行 processSelectedKeysOptimized方法,代碼如下

if (selectedKeys != null) {
    processSelectedKeysOptimized(selectedKeys.flip());
} else {
    processSelectedKeysPlain(selector.selectedKeys());
}
           

由于 Channel的 Attachment是 NioServerSocketchannel,是以執行 processSelectedKey方法,根據就緒的操作位,執行不同的操作。此處,由于監聽的是連接配接操作,是以執行unsafe.read()方法。由于不同的 Channel執行不同的操作,是以 NioUnsafe被設計成接口,

由不同的 Channel内部的 NioUnsafe實作類負責具體實作。我們發現 reado方法的實作有兩個,分别是 NioByteUnsafe和 NioMessageUnsafe。對于 NioServerSocketChannel,它使用的是 NioMessageUnsafe,它的read方法代碼如下

@Override
public void read() {
    assert eventLoop().inEventLoop();
    if (!config().isAutoRead()) {
        removeReadOp();
    }

    final ChannelConfig config = config();
    final int maxMessagesPerRead = config.getMaxMessagesPerRead();
    final boolean autoRead = config.isAutoRead();
    final ChannelPipeline pipeline = pipeline();
    boolean closed = false;
    Throwable exception = null;
    try {
        for (;;) {
            int localRead = doReadMessages(readBuf);
            if (localRead == 0) {
                break;
            }
            if (localRead < 0) {
                closed = true;
                break;
            }

            if (readBuf.size() >= maxMessagesPerRead | !autoRead) {
                break;
            }
        }
    } catch (Throwable t) {
        exception = t;
    }

    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        pipeline.fireChannelRead(readBuf.get(i));
    }
    readBuf.clear();
    pipeline.fireChannelReadComplete();

    if (exception != null) {
        if (exception instanceof IOException) {
            // ServerChannel should not be closed even on IOException because it can often continue
            // accepting incoming connections. (e.g. too many open files)
            closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
        }

        pipeline.fireExceptionCaught(exception);
    }

    if (closed) {
        if (isOpen()) {
            close(voidPromise());
        }
    }
}
           

對 doReadMessages方法進行分析,發現它實際就是接收新的用戶端連接配接并建立NioSocketChannel,代碼如下。

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}
           

接收到新的用戶端連接配接後,觸發 ChannelPipeline的 ChannelRead方法,代碼如下。

int size = readBuf.size();
for (int i = 0; i < size; i ++) {
    pipeline.fireChannelRead(readBuf.get(i));
}
           

執行 headChannelHandlerContext的 fireChannelRead方法,事件在 ChannelPipeline中傳遞,執行 ServerBootstrapAcceptor的 channelread方法,代碼如下。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    for (Entry<ChannelOption<?>, Object> e: childOptions) {
        try {
            if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                logger.warn("Unknown channel option: " + e);
            }
        } catch (Throwable t) {
            logger.warn("Failed to set a channel option: " + child, t);
        }
    }

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    child.unsafe().register(child.newPromise());
}
           

該方法主要分為如下三個步驟。

第一步:将啟動時傳入的 childhandler加入到用戶端 SocketChannel的ChannelPipeline中。

第二步:設定用戶端 Socketchannel的TCP參數

第三步:注冊 Socketchannel到多路複用器。

以上三個步驟執行完成之後,下面我們展開看下 NioSocketChannel的 register方法,代碼如下所示。

@Override
public final void register(final ChannelPromise promise) {
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            promise.setFailure(t);
        }
    }
}

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!ensureOpen(promise)) {
            return;
        }
        doRegister();
        registered = true;
        promise.setSuccess();
        pipeline.fireChannelRegistered();
        if (isActive()) {
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        if (!promise.tryFailure(t)) {
            logger.warn(
                    "Tried to fail the registration promise, but it is complete already. " +
                            "Swallowing the cause of the registration failure:", t);
        }
    }
}
           

NioSocketchannel的注冊方法與 ServerSocketChannel的一緻,也是将 Channel注冊到Reactor線程的多路複用器上。由于注冊的操作位是0,是以,此時 NioSocketchannel還不能讀取用戶端發送的消息,那什麼時候修改監聽操作位為 OP_READ呢,别着急,繼續看代碼。

執行完注冊操作之後,緊接着會觸發 ChannelReadComplete事件。我們繼續分析ChannelReadComplete在 ChannelPipeline中的處理流程:Netty的 Header和Tail本身不關注 ChannelReadComplete事件就直接透傳,執行完 ChannelReadComplete後,接着執行Pipeline的 read()方法,最終執行 HeadHandler的 read()方法。

HeadHandler read()方法的代碼已經在之前的小節介紹過,用來将網絡操作位修改為讀操作。建立 NioSocket Channel的時候已經将 AbstractNioChannel的 readInterestOp設定為OP_READ,這樣,執行 selectionKey.interestOps( interestOps | readInterestOp)操作時就會把操作位設定為 OP_READ。代碼如下。

protected AbstractNioByteChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch) {
    super(parent, eventLoop, ch, SelectionKey.OP_READ);
}
           

到此,新接入的用戶端連接配接處理完成,可以進行網絡讀寫等IO操作。

繼續閱讀