這裡寫目錄标題
- Netty 源碼解讀
- bind啟動源碼
- 1.初始化和注冊邏輯
- 1.1建立ServerSocketChannel
- 1.2注冊selector
- 2.端口綁定邏輯
- 2.1端口綁定
- 2.2在pipeline傳播ChannelActive事件
- 關注我日常更新分享Java程式設計技術!希望大家都能早日走上人生巅峰!

Netty 源碼解讀
bind啟動源碼
我們的服務是通過下面代碼進行綁定端口并啟動的,是以我們從bind()方法開始分析
ChannelFuture f = b.bind(port).sync();
一直跟進到io.netty.bootstrap.AbstractBootstrap#doBind方法
private ChannelFuture doBind(final SocketAddress localAddress) {
//1 初始化和注冊邏輯
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone())
ChannelPromise promise = channel.newPromise();
//2 端口綁定邏輯
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
1.初始化和注冊邏輯
繼續跟進initAndRegister()方法,根絕下面的注釋,你類比一下Java NIO的代碼,你會不會有莫名的親切感?
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//(1)建立ServerSocketChannel
channel = channelFactory.newChannel();
//初始化Channel,不展開說明了
init(channel);
} catch (Throwable t) {
if (channel != null)
channel.unsafe().closeForcibly();
}
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
//(2)注冊selector
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
1.1建立ServerSocketChannel
回看一下抛棄服務的代碼
.channel(NioServerSocketChannel.class)
這裡就是把這個NioServerSocketChannel執行個體化,見io.netty.channel.ReflectiveChannelFactory#newChannel
@Override
public T newChannel() {
try {
//這裡這個clazz指的就是NioServerSocketChannel.class,具體怎麼傳進來的,大家可以自己看代碼,這裡不介紹了。
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
通過反射進行執行個體化我們就直接看NioServerSocketChannel的構造函數即可看看都幹了什麼,見io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel()
/**
* Create a new instance
*/
public NioServerSocketChannel() {
這裡主要幹了兩件事情:
(1)newSocket(DEFAULT_SELECTOR_PROVIDER),其實這裡就是建立了Java NIO中的ServerSocketChannel
(2)this方法繼續調用構造函數
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
//(2)處會調用這個構造函數
public NioServerSocketChannel(ServerSocketChannel channel) {
//我們主要關心這個調用父類的構造函數幹了什麼,看見SelectionKey.OP_ACCEPT有沒有很熟悉的樣子
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
見io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
/**
* Create a new instance
*
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
* @param readInterestOp the ops to set to receive data from the {@link SelectableChannel}
*/
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
//我們仍然繼續往裡跟進
super(parent);
this.ch = ch;
//這個地方把SelectionKey.OP_ACCEPT事件儲存到成員變量中了哦
this.readInterestOp = readInterestOp;
try {
//設定成非阻塞模式,有沒有感覺又很熟悉了?
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
見io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
//這裡把Java NIO的ServerSocketChannel儲存進了成員變量
this.parent = parent;
//給這個Channel一個id
id = newId();
//這個先忽略,以後介紹
unsafe = newUnsafe();
//看看這個地方,是不是生成了一個Pipeline和一個Channel關聯了?
pipeline = newChannelPipeline();
}
我們來心中回顧一下建立一個NioServerSocketChannel都幹了哪些事情:
- 建立了一個Java NIO底層的SeverSocketChannel,作為自己的成員變量;
- 給自己弄了個id;
- 建立了一個Pipeline作為自己的成員變量;
- 設定成了非阻塞模式。
1.2注冊selector
config().group()
拿到的就是我們的bossGroup(具體怎麼區分的,可以自己通過丢棄服務代碼b.group(bossGroup, workerGroup)跟進去看一下),那整個
config().group().register(channel)
;我們就可以了解為在EventLoop上注冊我們的Channel。
接下來我們先忽略Netty如何選擇bossGroup哪個EventLoop直接跳到在EventLoop上注冊的方法,見io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//用NioServerSocketChannel的unsafe注冊,注意這個地方傳參是把目前的EventLoop和NioServerSocketChannel都傳進去了
promise.channel().unsafe().register(this, promise);
return promise;
}
繼續跟進,見io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
//現在咱們再mian主線程裡,是以會走else
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//這個地方涉及到eventLoop的啟用,我們下一章講
eventLoop.execute(new Runnable() {
@Override
public void run() {
//是以會在目前eventLoop進行注冊
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
往register0裡繼續跟,見io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//其他不關心繼續往裡跟這個方法
doRegister();
...
}
跟進doRegister(),見io.netty.channel.nio.AbstractNioChannel#doRegister
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//看到這段代碼是不是又有莫名的親切感,讓我們回想一下《從Java NIO到Netty(二)》的内容
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
在上述代碼中,javaChannel()拿到的就是Java NIO的ServerSocketChannel,這個register方法也是Java NIO的那個register方法,把這個ServerSocketChannel注冊到目前eventLoop的selector上面去。
那可能有人問了,ServerSocketChannel應該把OP_ACCEPT事件注冊到selector上面去,這裡怎麼展現的呢?别着急,往下看。
2.端口綁定邏輯
當建立完NioServerSocketChannel後,會調用doBind0把建立好的NioServerSocketChannel傳進來,見io.netty.bootstrap.AbstractBootstrap#doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
//在NioServerSocketChannel的eventLoop上執行bind
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
繼續跟進,見io.netty.channel.AbstractChannel.AbstractUnsafe#bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
boolean wasActive = isActive();
try {
//(1)端口綁定
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
(2)在pipeline傳播ChannelActive事件:主要關心如何把OP_ACCEPT事件注冊到selector上面的
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
2.1端口綁定
繼續往裡跟,見io.netty.channel.socket.nio.NioServerSocketChannel#doBind
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
//這個地方是不是又感覺很熟悉?就是調用Java NIO的bind進行端口綁定的,見《從Java NIO到Netty(一)》
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
2.2在pipeline傳播ChannelActive事件
往裡跟,見io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//這個地方就是把事件繼續向下一個連結清單節點傳播
ctx.fireChannelActive();
//主要看這個
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
//往這裡面走
channel.read();
}
}
@Override
protected void doBeginRead() throws Exception {
//這個官方注釋也表述很清楚了,這兩個方法最終會調用到這個地方
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
//還記得我們之前建立NioServerSocketChannel時把SelectionKey.OP_ACCEPT事件指派給了這個readInterestOp嗎?看這個地方就把這個事件加到這個selectionKey中了
selectionKey.interestOps(interestOps | readInterestOp);
}
}
//打開ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//監聽端口9999
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
//設定為非阻塞模式
serverSocketChannel.configureBlocking(false);
//建立Selector
Selector selector = Selector.open();
//将選serverSocketChannel注冊到selector,并在注冊過程中指出該serverSocketChannel可以進行Accept操作
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);