天天看點

Channel的注冊流程分析

AbstractBootStrap

類的

initAndRegister()

方法中,先進行對

Channel

的初始化,在初始化過程結束之後,會執行

Channel

的注冊流程邏輯。如下圖:

Channel的注冊流程分析

上圖所示源碼中,

ChannelFuture regFuture = config().group().register(channel);

執行

Channel

注冊到

EventLoop

上的流程。

1.

config()

方法

先看該行代碼中的第一個方法

config()

Channel的注冊流程分析

該方法是一個抽象方法,傳回的是

AbstractBootstrapConfig

類,它可以擷取到

bootstrap

的目前配置。該抽象方法在

AbstractBootstrapConfig

的子類

Bootstrap

ServerBootstrap

中實作。這裡着重看一下

ServerBootStrap

中的實作:

Channel的注冊流程分析

傳回了一個成員變量

config

,

Channel的注冊流程分析

是一個

ServerBootstrapConfig

類,該類包含了

ServerBootstrap

的所有配置。

2.

group()

方法

Channel的注冊流程分析

該方法傳回一個已經配置的

EventLoopGroup

類,如果還未配置,則傳回

null

3.

register(Channel channel)

方法

Channel的注冊流程分析

在該

EventLoop

方法上注冊一個

Channel

,注冊完成後,傳回的

ChannelFuture

将收到通知。因為

NioEventLoopGroup

繼承了

MultiThreadEventLoopGroup

,是以在使用

NioEventLoopGroup

進行程式設計時,架構内部調用的是

MultiThreadEventLoopGroup

register

方法。

Channel的注冊流程分析

3.1

next()

方法

Channel的注冊流程分析

next()

方法傳回一個

EventLoop

,内部調用了父類的

next()

方法。看一下父類

MultithreadEventExecutorGroup

next()

方法。

Channel的注冊流程分析

又調用了

chooser.next()

chooser

是個啥玩意呢?繼續看源碼:

Channel的注冊流程分析

這玩意是一個工廠類中的内部類:

Channel的注冊流程分析

裡面就一個方法

next()

,用來建立一個新的

EventExecutor

。這個

EventExecutorChooserFactory

有一個實作類

DefaultEventExecutorChooserFactory

,該類主要使用簡單輪詢方法選擇下一個

Executor

,裡面的

newChooser()

方法是這樣實作的:

Channel的注冊流程分析

意思就是如果參數的長度是2的幂,那麼就傳回一個

PowerOfTwoEventExecutorChooser

,否則就傳回一個

GenericEventExecutorChooser

,主要是用于提升性能,這裡也可以看出netty這個架構對于性能的追求已經趨于極緻了,這麼小的細節都考慮到了性能問題。

Channel的注冊流程分析

看一下

GenericEventExecutorChooser.next()

方法的實作,實際上就是在對

Executor[]

的長度取模。找下一個

EventExecutor

3.2

next().register()

在上面我們看到在開始執行注冊流程時調用的是

MultiThreadEventLoopGroup.register()

方法,其内部調用了

next().register(channel)

方法,這裡

next()

傳回一個

EventLoop

,也就是調用了

EventLoop.register

,具體實作方法是

SingleThreadEventLoopGroup.register()

Channel的注冊流程分析

随後執行

SingleThreadEventLoopGroup

重載的一個

register()

方法:

Channel的注冊流程分析

先檢查傳入的參數是否為

null

promise.channel().unsafe()

傳回一個

Unsafe

,執行底層的

register()

方法,該方法實作在

AbstractChannel

中的内部類

AbstractUnsafe

中。

@Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            //1.檢查eventLoop是否為空
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            //2.檢查是否已經注冊,如果已經注冊,抛出異常
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            //3.檢查參數是否是給定類的執行個體或是其子類的執行個體
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
			
            AbstractChannel.this.eventLoop = eventLoop;
			//4.檢查執行的線程是否是目前的線程,如果是則不存在并發問題,
			//如果不是目前線程則則将注冊流程放到任務隊列中執行
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }
           

接下來看

register0()

:

private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                //register的具體實作
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();
				
                safeSetSuccess(promise);
                //調用下一個channelInboundHandler的channelRegistered()方法。
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                //所謂isActive就是指Channel處于活動 狀态,已經連接配接或者綁定并且已經就緒
                if (isActive()) {
                    if (firstRegistration) {
                    //調用下一個channelInbooundHandler的channelActive()方法
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
           

可以看到,有一個

doRegister()

方法,該方法定義在

AbstractChannel

Channel的注冊流程分析

具體實作在其子類中,這裡着重看

AbstractNioChannel

中的實作:

@Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
            //1.javaChannel()傳回java nio中的SelectableChannel,
            //eventLoop().unwrappedSelector()傳回jdk中的Selector
            
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
           

這段代碼終于看到Java Nio的身影了.将

channel

注冊到

Selector

上。