在
AbstractBootStrap
類的
initAndRegister()
方法中,先進行對
Channel
的初始化,在初始化過程結束之後,會執行
Channel
的注冊流程邏輯。如下圖:
上圖所示源碼中,
ChannelFuture regFuture = config().group().register(channel);
執行
Channel
注冊到
EventLoop
上的流程。
1. config()
方法
config()
先看該行代碼中的第一個方法
config()
:
該方法是一個抽象方法,傳回的是
AbstractBootstrapConfig
類,它可以擷取到
bootstrap
的目前配置。該抽象方法在
AbstractBootstrapConfig
的子類
Bootstrap
和
ServerBootstrap
中實作。這裡着重看一下
ServerBootStrap
中的實作:
傳回了一個成員變量
config
,
是一個
ServerBootstrapConfig
類,該類包含了
ServerBootstrap
的所有配置。
2. group()
方法
group()
該方法傳回一個已經配置的
EventLoopGroup
類,如果還未配置,則傳回
null
。
3. register(Channel channel)
方法
register(Channel channel)
在該
EventLoop
方法上注冊一個
Channel
,注冊完成後,傳回的
ChannelFuture
将收到通知。因為
NioEventLoopGroup
繼承了
MultiThreadEventLoopGroup
,是以在使用
NioEventLoopGroup
進行程式設計時,架構内部調用的是
MultiThreadEventLoopGroup
的
register
方法。
3.1 next()
方法
next()
next()
方法傳回一個
EventLoop
,内部調用了父類的
next()
方法。看一下父類
MultithreadEventExecutorGroup
的
next()
方法。
又調用了
chooser.next()
,
chooser
是個啥玩意呢?繼續看源碼:
這玩意是一個工廠類中的内部類:
裡面就一個方法
next()
,用來建立一個新的
EventExecutor
。這個
EventExecutorChooserFactory
有一個實作類
DefaultEventExecutorChooserFactory
,該類主要使用簡單輪詢方法選擇下一個
Executor
,裡面的
newChooser()
方法是這樣實作的:
意思就是如果參數的長度是2的幂,那麼就傳回一個
PowerOfTwoEventExecutorChooser
,否則就傳回一個
GenericEventExecutorChooser
,主要是用于提升性能,這裡也可以看出netty這個架構對于性能的追求已經趨于極緻了,這麼小的細節都考慮到了性能問題。
看一下
GenericEventExecutorChooser.next()
方法的實作,實際上就是在對
Executor[]
的長度取模。找下一個
EventExecutor
3.2 next().register()
next().register()
在上面我們看到在開始執行注冊流程時調用的是
MultiThreadEventLoopGroup.register()
方法,其内部調用了
next().register(channel)
方法,這裡
next()
傳回一個
EventLoop
,也就是調用了
EventLoop.register
,具體實作方法是
SingleThreadEventLoopGroup.register()
,
随後執行
SingleThreadEventLoopGroup
重載的一個
register()
方法:
先檢查傳入的參數是否為
null
,
promise.channel().unsafe()
傳回一個
Unsafe
,執行底層的
register()
方法,該方法實作在
AbstractChannel
中的内部類
AbstractUnsafe
中。
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//1.檢查eventLoop是否為空
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
//2.檢查是否已經注冊,如果已經注冊,抛出異常
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
//3.檢查參數是否是給定類的執行個體或是其子類的執行個體
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
//4.檢查執行的線程是否是目前的線程,如果是則不存在并發問題,
//如果不是目前線程則則将注冊流程放到任務隊列中執行
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
接下來看
register0()
:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//register的具體實作
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//調用下一個channelInboundHandler的channelRegistered()方法。
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
//所謂isActive就是指Channel處于活動 狀态,已經連接配接或者綁定并且已經就緒
if (isActive()) {
if (firstRegistration) {
//調用下一個channelInbooundHandler的channelActive()方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
可以看到,有一個
doRegister()
方法,該方法定義在
AbstractChannel
中
具體實作在其子類中,這裡着重看
AbstractNioChannel
中的實作:
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//1.javaChannel()傳回java nio中的SelectableChannel,
//eventLoop().unwrappedSelector()傳回jdk中的Selector
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
這段代碼終于看到Java Nio的身影了.将
channel
注冊到
Selector
上。