netty作為一個被廣泛應用的通信架構,有必要我們多了解一點。
實際上netty的幾個重要的技術亮點:
1. reactor的線程模型;
2. 安全有效的nio非阻塞io模型應用;
3. pipeline流水線式的靈活處理過程;
4. channelHandler的靈活實作;
5. 提供許多開箱即用的處理器和編解碼器;
我們可以從這些點去深入了解其過人之處。
1. 一個NettyServer的demo
要想深入了解某個架構,一般還是要以demo作為一個抓手點的。以下,我們可以看到一個簡單的nettyServer的建立過程,即netty的quick start樣例吧。
@Slf4j
public class NettyServerHelloApplication {
/**
* 一個server的樣例
*/
public static void main(String[] args) throws Exception {
// 1. 建立對應的EventLoop線程池備用, 分bossGroup和workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
try {
// 2. 建立netty對應的入口核心類 ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
// 3. 設定server的各項參數,以及應用處理器
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100) // 設定tcp協定的請求等待隊列
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 3.2. 最重要的,将各channelHandler綁定到netty的上下文中(暫且這麼說吧)
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast("encoder", new MessageEncoder());
p.addLast("decoder", new MessageDecoder());
p.addLast(new EchoServerHandler());
}
});
// 4. 綁定tcp端口開啟服務端監聽, sync() 保證執行完成所有任務
ChannelFuture f = b.bind(ServerConstant.PORT).sync();
// 5. 等待關閉信号,讓業務線程去服務業務了
f.channel().closeFuture().sync();
} finally {
// 6. 收到關閉信号後,優雅關閉server的線程池,保護應用
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
以上,就是一個簡版的nettyServer的整個架構了,這也基本上整個nettyServer的程式設計範式了。主要即分為這麼幾步:
1. 建立對應的EventLoop線程池備用, 分bossGroup和workerGroup;
2. 建立netty對應的入口核心類 ServerBootstrap;
3. 設定server的各項參數,以及應用處理器(必備的channelHandler業務接入過程);
4. 綁定tcp端口開啟服務端監聽;
5. 等待關閉信号,讓業務線程去服務業務了;
6. 收到關閉信号後,優雅關閉server的線程池,保護應用;
事實上,如果我們直接基于jdk提供的ServerSocketChannel是否也差不了多少呢?是的,至少表面看起來是的,但我們要處理許多的異常情況,且可能面對變化繁多的業務類型。又該如何呢?
畢竟一個架構的成功,絕非偶然。下面我們就這幾個過程來看看netty都是如何處理的吧!
2. EventLoop 的建立
EventLoop 直譯為事件循環,但在這裡我們也可以了解為一個線程池,因為所有的事件都是送出給其處理的。那麼,它倒底是個什麼樣的循環呢?
首先來看下其類繼承情況:

從類圖可以看出,EventLoop也是一個executor或者說線程池的實作,它們也許有相通之處。
// 調用方式如下
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
// io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.ThreadFactory)
/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
// io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
// 預設線程是 cpu * 2
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
// io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, java.lang.Object...)
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
// io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// 建立一個執行器,該執行器每送出一個任務,就建立一個線程來運作,即并沒有隊列的概念
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 使用一個數組來儲存整個可用的線程池
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 為每個child建立一個線程運作, 該方法由子類實作
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
// 如果建立失敗,則把已經建立好的線程池關閉掉
// 不過值得注意的是,當某個線程池建立失敗後,并沒有立即停止後續建立工作,即無 return 操作,這是為啥?
// 實際上,發生異常時,Exeception 已經被抛出,此處無需關注
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 建立選擇器,猜測是做負載均衡時使用
// 此處的chooser預設是 DefaultEventExecutorChooserFactory
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
// io.netty.channel.nio.NioEventLoopGroup#newChild
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
// 注意此處的參數類型是由外部進行保證的,在此直接做強轉操作
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
// io.netty.channel.nio.NioEventLoop#NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 此構造器會做很多事,比如建立隊列,開啟nio selector...
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
// io.netty.util.concurrent.DefaultEventExecutorChooserFactory#newChooser
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
// 如: 1,2,4,8... 都會建立 PowerOfTwoEventExecutorChooser
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
// io.netty.util.concurrent.DefaultPromise#addListener
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
synchronized (this) {
addListener0(listener);
}
if (isDone()) {
notifyListeners();
}
return this;
}
以上,就是 NioEventLoopGroup 的建立過程了. 本質上其就是一個個的單獨的線程組成的數組清單, 等待被調用.
3. ServerBootstrap 的建立
ServerBootstrap是Netty的一個服務端核心入口類, 它可以很快速的建立一個穩定的netty服務.
ServerBootstrap 的類圖如下:
還是非常純粹的啊!其中有意思是的, ServerBootstrap繼承自 AbstractBootstrap, 而這個 AbstractBootstrap 是一個自依賴的抽象類: AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> , 這樣,即父類可以直接傳回子類的資訊了。
其預設構造方法為空,是以是以參數都使用預設值, 因為還有後續的參數設定過程,接下來,我們看看其一些關鍵參數的設定:
// 1. channel的設定
// io.netty.bootstrap.AbstractBootstrap#channel
/**
* The {@link Class} which is used to create {@link Channel} instances from.
* You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
* {@link Channel} implementation has no no-args constructor.
*/
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
// 預設使用構造器反射的方式建立 channel
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
// io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.channel.ChannelFactory<? extends C>)
/**
* {@link io.netty.channel.ChannelFactory} which is used to create {@link Channel} instances from
* when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
* is not working for you because of some more complex needs. If your {@link Channel} implementation
* has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
* simplify your code.
*/
@SuppressWarnings({ "unchecked", "deprecation" })
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}
// io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.bootstrap.ChannelFactory<? extends C>)
/**
* @deprecated Use {@link #channelFactory(io.netty.channel.ChannelFactory)} instead.
*/
@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
@SuppressWarnings("unchecked")
private B self() {
return (B) this;
}
// 2. option 參數選項設定, 它會承包各種特殊配置的設定, 是一個通用配置項設定的入口
/**
* Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got
* created. Use a value of {@code null} to remove a previous set {@link ChannelOption}.
*/
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
// options 是一個 new LinkedHashMap<ChannelOption<?>, Object>(), 即非線程安全的容器, 是以設定值時要求使用 synchronized 保證線程安全
// value 為null時代表要将該選項設定删除, 如果key相同,後面的配置将會覆寫前面的配置
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return self();
}
// 3. childHandler 添加channelHandler, 這是一個最重要的一個方法, 它會影響到後面的業務處理統籌
// 調用該方法僅将 channelHandler的上下文加入進來, 實際還未進行真正的添加操作 .childHandler(new ChannelInitializer<SocketChannel>() {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100) // 設定tcp協定的請求等待隊列
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast("encoder", new MessageEncoder());
p.addLast("decoder", new MessageDecoder());
p.addLast(new EchoServerHandler());
}
});
/**
* Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.
*/
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
// 僅将 channelHandler 綁定到netty的上下文中
this.childHandler = childHandler;
return this;
}
// 4. bossGroup, workGroup 如何被配置設定 ?
/**
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
// parentGroup 是給acceptor使用的, 主要用于對socket連接配接的接入,是以一般一個線程也夠了
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
// childGroup 主要用于接入後的socket的事件的處理,一般要求數量較多,視業務屬性決定
this.childGroup = childGroup;
return this;
}
bind 綁定tcp端口,這個是真正觸發server初始化的一步,工作量比較大,我們另開一段講解。
4. nettyServer 的初始化
前面所有工作都是在準備, 都并未展現在外部, 而 bind 則會是開啟一個對外服務, 對外可見, 真正啟動server.
// io.netty.bootstrap.AbstractBootstrap#bind(int)
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
// io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(SocketAddress localAddress) {
// 先驗證各種參數是否設定完整, 如線程池是否設定, channelHandler 是否設定...
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
// 綁定tcp端口
return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
// 1. 建立一些channel使用, 與eventloop綁定, 統一管理嘛
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 2. 注冊成功之後, 開始實際的 bind() 操作, 實際就是調用 channel.bind()
// doBind0() 是一個異步的操作,是以使用的一個 promise 作為結果驅動
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
是以,從整體來說,bind()過程分兩大步走:1. 初始化channel,與nio關聯; 2. 落實channel和本地端口的綁定工作; 我們來細看下:
4.1 初始化channel
初始化channel, 并注冊到 selector上, 這個操作實際上非常重要。
// 以下我們先看下執行架構
// io.netty.bootstrap.AbstractBootstrap#initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 即根據前面設定的channel 使用反射建立一個執行個體出來
// 即此處将會執行個體化出一個 ServerSocketChannel 出來
// 是以如果你想用jdk的nio實作,則設定channel時使用 NioServerSocketChannel.class即可, 而你想使用其他更優化的實作時比如EpollServerSocketChannel時,改變一下即可
// 而此處的 channelFactory 就是一個反射的實作 ReflectiveChannelFactory, 它會調用如上channel的無參構造方法執行個體化
// 重點工作就需要在這個無參構造器中完成,我們接下來看看
channel = channelFactory.newChannel();
// 初始化channel的一些公共參數, 相當于做一些屬性的繼承, 因為後續它将不再依賴 ServerBootstrap, 它需要有獨立自主能力
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 注冊建立好的 channel 到eventLoop中
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
// 1. 先看看 NioServerSocketChannel 的構造過程
// io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel()
/**
* Create a new instance
*/
public NioServerSocketChannel() {
// newSocket 簡單說就是建立一個本地socket, api調用: SelectorProvider.provider().openServerSocketChannel()
// 但此時本 socket 并未和任何端口綁定
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
// 注冊 OP_ACCEPT 事件
super(null, channel, SelectionKey.OP_ACCEPT);
// 此處的 javaChannel() 實際就是 channel, 這樣調用隻是為統一吧
// 建立一個新的 socket 傳入 NioServerSocketChannelConfig 中
// 主要用于一些 RecvByteBufAllocator 的設定,及channel的儲存
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
// io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
/**
* Create a new instance
*
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
* @param readInterestOp the ops to set to receive data from the {@link SelectableChannel}
*/
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// 先讓父類初始化必要的上下文
super(parent);
// 保留 channel 資訊,并設定非阻塞辨別
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
// io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
// 初始化上下文
this.parent = parent;
// DefaultChannelId
id = newId();
// NioMessageUnsafe
unsafe = newUnsafe();
// new DefaultChannelPipeline(this);
// 比較重要,将會初始化 head, tail 節點
pipeline = newChannelPipeline();
}
// io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
// 初始化 head, tail
tail = new TailContext(this);
head = new HeadContext(this);
// 構成雙向連結清單
head.next = tail;
tail.prev = head;
}
// 2. 初始化channel, 有個最重要的動作是将 Acceptor 接入到 pipeline 中
// io.netty.bootstrap.ServerBootstrap#init
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
// 根據前面的設定, 将各種屬性copy過來, 放到 config 字段中
// 同樣, 因為 options 和 attrs 都不是線程安全的, 是以都要上鎖操作
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 此處的pipeline, 就是在 NioServerSocketChannel 中初始化好head,tail的pipeline
ChannelPipeline p = channel.pipeline();
// childGroup 實際就是外部的 workGroup
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
// 這個就比較重要了, 關聯 ServerBootstrapAcceptor
// 主動添加一個 initializer, 它将作為第一個被調用的 channelInitializer 存在
// 而 channelInitializer 隻會被調用一次
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加 Acceptor 到 pipeline 中, 形成一個 head -> ServerBootstrapAcceptor -> tail 的pipeline
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
// 此操作過後,目前pipeline中,就隻有此一handler
}
。。。
4.2 handler的添加過程
addLast() 看起來隻是一個添加元素的過程, 總體來說就是一個雙向連結清單的添加, 但也蠻有意思的, 有興趣可以戳開詳情看看.
// io.netty.channel.ChannelHandler
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
// io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, io.netty.channel.ChannelHandler...)
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
// 支援同時添加多個 handler
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
// io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler)
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 重複性檢查 @Shareable 參數使用
checkMultiplicity(handler);
// 生成一個新的上下文, filterName()将會生成一個唯一的名稱, 如 ServerBootstrap$1#0
newCtx = newContext(group, filterName(name, handler), handler);
// 将目前ctx添加到連結清單中
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
// 未注冊情況下, 不會進行下一步了
callHandlerCallbackLater(newCtx, true);
return this;
}
// 而已注冊情況下, 則會使用 executor 送出callHandlerAdded0, 即調用 pipeline 的頭節點
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
// 一個雙向連結清單儲存上下文
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
// 添加ctx到隊列尾部
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
// 對每一次添加 handler, 則都會産生一個事件, 通知現有的handler, handlerAdded()
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
ctx.setAddComplete();
ctx.handler().handlerAdded(ctx);
} catch (Throwable t) {
boolean removed = false;
try {
remove0(ctx);
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}
檢視 handler 的添加過程
4.3 注冊channel,綁定eventloop線程
經過前面兩步, channel已經建立好和初始化好了, 但還沒有看到 eventLoop 的影子. 實際上eventloop和channel間就差一個注冊了.
也就是前面看到的 ChannelFuture regFuture = config().group().register(channel); 此處的group 即是 bossGroup.
// io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
@Override
public ChannelFuture register(Channel channel) {
// next() 相當于是一個負載均衡器, 會選擇出一個合适的 eventloop 出來, 預設是round-robin
return next().register(channel);
}
// io.netty.channel.MultithreadEventLoopGroup#next
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
// io.netty.util.concurrent.MultithreadEventExecutorGroup#next
@Override
public EventExecutor next() {
// 使用前面建立的 PowerOfTwoEventExecutorChooser 進行調用
// 預設實作為輪詢
return chooser.next();
}
// io.netty.util.concurrent.DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser#next
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
// io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
@Override
public ChannelFuture register(Channel channel) {
// 使用 DefaultChannelPromise 封裝channel, 再注冊到 eventloop 中
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// NioMessageUnsafe
promise.channel().unsafe().register(this, promise);
return promise;
}
// io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
// inEventLoop() 判斷目前線程是否在 eventLoop 中
// 判斷方式為直接比較 eventloop 線程也目前線程是否是同一個即可 Thread.currentThread() == this.thread;
// 核心注冊方法 register0()
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
// 不在 eventLoop 中, 則異步送出任務給 eventloop 處理
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
// register0() 做真正的注冊
// io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 具體的注冊邏輯由子類實作, NioServerSocketChannel
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 幾個擴充點: fireHandlerAdded() -> fireChannelRegistered() -> fireChannelActive()
// part1: fireChannelAdded(), 它将會回調上面的 ServerBootstrapAcceptor 的添加 channelInitializer
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// part2: fireChannelRegistered()
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
// io.netty.channel.nio.AbstractNioChannel#doRegister
@Override
protected void doRegister() throws Exception {
boolean selected = false;
// 進行注冊即是 JDK 的 ServerSocketChannel.register() 過程
// 即 netty 與 socket 建立了關系連接配接, ops=0, 代表監聽所有讀事件
for (;;) {
try {
// 一直注冊直到成功
// 此處 ops=0, 即不關注任何事件哦, 那麼前面的 OP_ACCEPT 和這裡又是什麼關系呢?
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
4.4 ServerBootstrapAcceptor 速覽
前面我們看到, 在做 register() 完了之後, netty 會觸發一個invokeHandlerAddedIfNeeded, 進而調用fireHandlerAdded. 此時将會觸發 handlerAdded() 進而首次調用 ChannelInitializer.initChannel(), 進而将 ServerBootstrapAcceptor 添加到pipeline進來. ServerBootstrapAcceptor 獨立做的事情不多,更多是交給父類處理。
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// Task which is scheduled to re-enable auto-read.
// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
// not be able to load the class because of the file limit it already reached.
//
// See https://github.com/netty/netty/issues/1328
//
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
// ServerBootstrapAcceptor 大部分情況下都是普通的 InboundHandler, 除了 channelRead() 時
// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 它會向 childGroup 中送出channel過去, 進而使用 childGroup 産生作用
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
4.5 端口的綁定 doBind0
經過前面的channel的建立,初始化, Acceptor 的添加到handlerAdded(), 整個pipeline已經work起來了. 然後netty會回調之前添加好的 listeners, 其中一個便是 doBind0();
// 回顧下:
...
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
...
// io.netty.bootstrap.AbstractBootstrap#doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
// 這還是一個異步過程
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
// channel.bind(), channel 與 端口綁定
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
// io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// bind() 被當作一個普通的出站事件, 在pipeline中被傳遞
return pipeline.bind(localAddress, promise);
}
// io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// 從tail開始傳遞
return tail.bind(localAddress, promise);
}
// io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
// 同樣是一個pipeline式調用, bind() 是一個出站事件, 是以查找 outbound
// 最終會調到 DefaultChannelPipeline 中
// netty的pipeline機制就展現在這裡, 它會一直查找可用的handler, 然後執行它, 直到結束
final AbstractChannelHandlerContext next = findContextOutbound();
// 擷取其綁定的 executor
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
// -------------------------------------------------------------------------
// 出入站handler的查找實作, 非常簡單, 卻很有效 (該方法在 AbstractChannelHandlerContext 中實作,被所有handler通用)
// io.netty.channel.AbstractChannelHandlerContext#findContextInbound
private AbstractChannelHandlerContext findContextInbound() {
// 以目前節點作為起點開始查找, 取第一個入站handler傳回, 沒有則說明 pipeline 已結束
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
// io.netty.channel.AbstractChannelHandlerContext#findContextOutbound
private AbstractChannelHandlerContext findContextOutbound() {
// 以目前節點作為起點開始查找, 取第一個出站handler傳回, 沒有則說明 pipeline 已結束
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
// -------------------------------------------------------------------------
// io.netty.channel.AbstractChannelHandlerContext#invokeBind
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
// 最終傳遞到 HeadContext 中進行處理
// io.netty.channel.DefaultChannelPipeline.HeadContext#bind
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
// unsafe 處理bind() 操作
unsafe.bind(localAddress, promise);
}
// io.netty.channel.AbstractChannel.AbstractUnsafe#bind
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
// 這裡會調用 jdk 的ServerSocketChannel接口, 實作真正的端口綁定
// 至此, 服務對外可見
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// 判斷是否是首次建立 channel, 如果是, 則調用 fireChannelActive() 傳播channelActive事件
if (!wasActive && isActive()) {
// 這将會被稍後執行
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
// 觸發一些通知什麼的, 結束了
safeSetSuccess(promise);
}
// 最終的bind(), 是通過 jdk 底層的 serverSocketChannel 開啟socket監聽
// io.netty.channel.socket.nio.NioServerSocketChannel#doBind
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
// 調用 serverSocketChannel bind() 方法,開啟socket監聽
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
至此, bind 工作總算是完成了.我們來總結下它的主要工作:
1. 初始化一個channel, 根據設定裡來, 我們使用 NioServerSocketChannel;
2. 過繼現有的配置項給到channel;
3. 将channel與eventloop綁定做注冊, 添加 ServerBootstrapAcceptor 到 pipeline 中;
4. 綁定完成後, 通知現有的handler, 觸發系列事件: fireHandlerAdded() -> fireChannelRegistered() -> fireChannelActive();
5. 而bind()則作為一個出站事件, 被處理, 最終調用 jdk的ServerSocketChannel.register() 完成端口的開啟;
不過有一點需要注意, 在這個過程中, 隻有 bossGroup 起作用, 所有的 workGroup 都還在待命中. 我們目前看到的 pipeline 是這樣的: head -> Acceptor -> tail;
講了這麼多, 有一種繞了一大圈的感覺有木有, 如果你自己直接使用nio寫, 估計10行代碼都不要就搞定了. 尴尬!
5. netty eventloop 主循環
evenloop是netty的重要概念, 但在前面我們并未細講這玩意如何起作用(僅看過其建立過程而已), 不過這并不意味着它還沒起作用, 而是我們暫時忽略了它. 每次要執行任務時, 總是會調用 eventloop().execute(...), 實際上這就是 eventloop的入口:
// io.netty.util.concurrent.SingleThreadEventExecutor#execute
@Override
public void execute(Runnable task) {
// execute 線上程池中, 是一個異步任務的送出方法, eventloop中同樣也一樣
// 但是大部分情況下隻是添加隊列, 因為 eventloop 是單線程的
if (task == null) {
throw new NullPointerException("task");
}
// 向eventLoop隊列中添加task
boolean inEventLoop = inEventLoop();
addTask(task);
// 如果自身就是運作在 eventloop 環境中, 添加完task後則不再做更多的事
if (!inEventLoop) {
// 如果不是在eventLoop線程中,則都會嘗試建立新線程運作, 但實際會重新檢測線程是否建立
startThread();
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
// io.netty.util.concurrent.SingleThreadEventExecutor#addTask
/**
* Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
* before.
*/
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// taskQueue = MpscUnsafeUnboundedArrayQueue, 基于Unsafe 和 cas 實作的線程安全的隊列
if (!offerTask(task)) {
// 添加失敗,則走拒絕政策
reject(task);
}
}
// startThread, 看起來是開啟線程的意思, 卻又不太一樣
private void startThread() {
// 是以實際上隻會建立一次線程
if (state == ST_NOT_STARTED) {
// 搶到鎖的線程才能調用start()方法
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}
// 開啟eventLoop的線程
// io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
private void doStartThread() {
assert thread == null;
// 它并不是簡單的thread.start()
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 核心方法,由 SingleThreadEventExecutor.run() 實作
// 當然是由具體的executor具體實作了, 此文為 NioEventLoop.run()
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 線程池關閉,優雅停機
...
}
}
});
}
核心: 事件循環主架構, 既然是事件循環,則其必然是不會退出的。
// io.netty.channel.nio.NioEventLoop#run
@Override
protected void run() {
// 一個死循環檢測任務, 這就 eventloop 的大殺器哦
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
// 有任務時執行任務, 否則阻塞等待網絡事件, 或被喚醒
case SelectStrategy.SELECT:
// select.select(), 帶逾時限制
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio 為io操作的占比, 和運作任務相比, 預設為 50:50
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// step1. 運作io操作
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// step2. 運作task任務
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
// 運作任務的最長時間
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
// select, 事件循環的依據
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 帶逾時限制, 預設最大逾時1s, 但當有延時任務處理時, 以它為标準
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
// 逾時則立即傳回
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
反正整體就是這樣了, 循環檢測select, 運作io事件及execute task.
有了這個 eventloop, 整體server就可以run起來了, 不管是有外部請求進來, 還是有内部任務送出, 都将被eventloop執行.
不過還有一點未澄清的: 前面在做channel.register()時傳遞了一個 ops=0, 那它是如何監聽新連接配接事件的呢?
實際上它是在注冊激活完成之後, 再進行了一個read()的操作, 重新将 OP_ACCEPT 添加到 selectionKey 中了.(沒錯,底層永遠沒那麼多花招)
// io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
// 會觸發 read() 流程, 修改 selectionKey 的 ops 标志位
readIfIsAutoRead();
}
...
// io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
// io.netty.channel.nio.AbstractNioMessageChannel#doBeginRead
@Override
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
super.doBeginRead();
}
// io.netty.channel.nio.AbstractNioChannel#doBeginRead
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// readInterestOp, 即是前面設定的 OP_ACCEPT
selectionKey.interestOps(interestOps | readInterestOp);
}
}
View Code
本文有點長了, 留點東西下篇繼續: io事件如何處理? 任務如何執行?
不要害怕今日的苦,你要相信明天,更苦!