Netty Hello World 入門源碼分析
第一節簡單提了什麼是網絡程式設計,Netty 做了什麼,Netty 都有哪些功能元件。這一節就具體進入 Netty 的世界,我們從用 Netty 的功能實作基本的網絡通信開始分析 各個元件的使用。
-
一個簡單的發送接收消息的例子#
話不多說,先來實作一個發送接收消息的例子。本執行個體基于 SpringBoot 工程搭建。
項目類檔案如下:
用戶端和服務端的主要代碼分為3個部分:啟動器,ChannelInitializer,eventHandler。
相關代碼已經上傳 GitHub,請參閱:點我 (๑¯ ³ ¯๑)
Server端:
Copy
package com.rickiyang.learn.helloWorld;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
/**
- @author: rickiyang
- @date: 2020/3/15
-
@description: server 端
*/
@Slf4j
public class HwServer {
private int port;
public HwServer(int port) {
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer());
try {
ChannelFuture future = server.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server start fail",e);
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
HwServer server = new HwServer(7788);
server.start();
}
}
server initializer:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
- @description:
public class HwServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("server channelActive");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info(ctx.channel().remoteAddress()+"===>server: "+msg.toString());
ctx.write("hi, received your msg");
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
server handler:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("server channelActive");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info(ctx.channel().remoteAddress()+"===>server: "+msg.toString());
ctx.write("hi, received your msg");
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
用戶端代碼:
client:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.socket.nio.NioSocketChannel;
public class HwClient {
private int port;
private String address;
public HwClient(int port, String address) {
this.port = port;
this.address = address;
}
public void start(){
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ClientChannelInitializer());
try {
ChannelFuture future = bootstrap.connect(address,port).sync();
future.channel().writeAndFlush("Hello world, i'm online");
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error("client start fail",e);
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
HwClient client = new HwClient(7788,"127.0.0.1");
client.start();
}
client initializer :
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
- Created by Administrator on 2017/3/11.
public class ClientChannelInitializer extends ChannelInitializer {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 用戶端的邏輯
pipeline.addLast("handler", new HwClientHandler());
}
client handler :
public class HwClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("server say : " + msg.toString());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("client channelActive");
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("Client is close");
}
代碼很簡單,主要功能就是啟動 服務端 和 用戶端, 然後實作一個簡單的 handler ,在handler 中擷取消息并列印,本地先啟動服務端 main 函數,再啟動用戶端即可。
-
從 EventLoopGroup 開始說起#
觀看用戶端 和 服務端 的啟動類,我們看到都有相同的特性:
建立 EventLoopGroup 去監聽 channel,然後使用定義的 handler處理對應的事件。
Netty 是一個異步事件驅動的 NIO 架構,所有IO操作都是異步非阻塞的。Netty 實際上是使用 Threads(多線程)處理 I/O 事件。
EventLoopGroup 是個啥東西呢?我們回想一下 Reactor 模型,主要的操作是使用 Selector 監聽 channel 上的事件,Reactor 模型有三種結構,首先是單線程模型:
這種模型顯而易見始終隻有一個 Acceptor 線程在處理用戶端連接配接事件和服務端産生的讀寫事件,好處是始終隻有一個線程在工作不會産生并發帶來的一系列問題。但是不足之處也顯而易見:
一個線程來處理對于現在的多核系統來說有點浪費資源;
雖然是使用異步非阻塞I/O處理,但是面對大并發的請求場景,很有可能會負載過重,堆積事件,這樣用戶端就會有逾時發生,然後重複發送請求,必然會造成系統超載;
單線程如果挂掉了系統就停止了,這種場景如何處理。
是以這種單線程模型對于當今系統的發展是沒有适用場景的。接着又演變出多線程的 Reactor 模型。
在多線程模型下,Acceptor 是一個單獨的線程專門處理 Client 的請求連接配接事件,所有的 I/O 操作都由一個特定的 NIO 線程池負責,每個用戶端都與一個特定的 NIO 線程池綁定,是以這個用戶端連接配接中的所有 I/O 操作都是在同一個線程中完成的。用戶端連接配接是很多的,但是 NIO 線程很少,是以 一個 NIO 線程可以同時綁定到多個用戶端連接配接中。
從上面的模型找缺點的話,很顯然能發現還是有單點的問題:處理用戶端連接配接請求的線程仍舊隻有一個,如果這個線程挂了,整個系統将不可用。是以這種超級并發的情況也要考慮啊,系統不能有單點,接着改:
現在的系統就沒有單點問題了,但是也增加了複雜性。
我們剛才在說 EventLoopGroup ,為啥突然又轉到了 Reactor 模型上去了呢? 前面說過,Netty 是基于 NIO 程式設計的,NIO 又是基于 Reactor 模型的,自然 Netty 的程式設計模型也是 Reactor 。而 EventLoopGroup 其實就是來設定 Reactor 模型的類型根據不同的參數方式。
我們來看 Server 啟動類中的寫法:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer());
這裡建立了兩個 group,下面 bootstrap set group的時候 set了兩個線程池,即 Acceptor 使用一個線程池,一個 Reactor 線程池。但是可以看到兩個線程池都沒有設定大小,進去 NioEventLoopGroup 的構造方法可以看到 預設值是 0,即初始化為0,不開啟線程,當有事件進來的時候會開啟一個線程來處理。那麼如果将 workGroup 設定為多個線程的時候,上面這種寫法就是 Reactor 的多線程模型。
我們再來看另一種寫法:
ServerBootstrap server = new ServerBootstrap().group(bossGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer());
這裡的 group() 方法與上面的差別在于隻有一個參數,進入方法内部看看:
@Override
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
很明顯它調用了帶兩個 EventLoopGroup 參數的 group方法,即兩個 group 線程池使用的是同一個。這就是 Reactor 的單線程模型。
還有一個 Reactor 的主從多線程模型,這個在 Netty 中是沒有實作的,即你将 bossGroup 的線程設定為大于1,這個不會改變 Acceptor 的時候事件處理方式,因為在服務端啟動的時候 ServerSocketChannel 隻會綁定到 bossGroup 中的一個線程,即使你設定了多個,啟動的時候隻會使用一個。Netty 官方認為處理連接配接請求的時候沒有必要使用多線程的方式。
現在我們了解到 EventLoopGroup 的作用是初始化線程池的,那就一起看看它是怎麼實作的吧。
首先看一下 EventLoopGroup 的類結構圖:
可以看到它繼承了 ScheduledExecutorService,即 EventLoopGroup 有線程池排程的能力。上面在代碼中我們使用的是 EventLoopGroup 的子類 NioEventLoopGroup, 還有一個OioEventLoopGroup也可以使用。繼續看 NioEventLoopGroup 的類結構:
可以看到繼承關系為:NioEventLoopGroup -> MultithreadEventLoopGroup -> MultithreadEventExecutorGroup。
先看一下我們使用的預設無參構造方法:
public NioEventLoopGroup() {
this(0);
這裡預設設定為 0 ,但是後面的邏輯會判斷如果為 0,那麼會将 線程數設定為目前 CPU * 2。
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
這個構造函數将 線程執行的 Executor 設定為空,後面會判斷為空重新構造一個 Executor。
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
上面第一個構造方法是使用了 JDK 的 NIO 生成一個 Selector,第二個是生成一個 Selector 預設政策。接着進入第三個構造方法,這裡使用了父類 MultithreadEventLoopGroup 的 構造方法,還 set 了一個線程拒絕政策。
跟進父類的構造方法:
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
這裡對線程數進行了判斷,如果是 0 則賦預設值,這裡的預設值就是目前核心數 * 2。
接下來又調用了它的父類 MultithreadEventExecutorGroup 的構造器:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
再點選進入,終于能看到一段實質性的代碼了,太不容易:
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
//上面我們有個構造函數傳 executor == null。在這裡判斷如果為空,則建立一個新的ThreadPerTaskExecutor
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 這裡就是建立指定大小的線程池,線程池中的每一個元素都是一個 EventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//存入了一個 NIOEventLoop 類執行個體
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
//這裡如果建立失敗,首先嘗試優雅停止線程,下面會判斷線程未正常停止的情況繼續判斷
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//執行個體化線程工廠執行器選擇器: 根據children擷取選擇器
chooser = chooserFactory.newChooser(children);
//為每一個 EventLoop 線程添加一個 線程終止的監聽器
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
// 到這裡 eventLoop 就建立完畢,接着做了一個操作:把 eventLoop 添加進一個新的不可變的 set集合中,即聲明隻讀屬性
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
這段代碼的實質就是來建立 EventLoop 執行個體,并 set 線程執行。
首先校驗 executor 是否為空,如果是空,則建立一個 ThreadPerTaskExecutor 對象。這個 executor 是用來執行線程池中的所有的線程,也就是所有的 NioEventLoop,其實從 NioEventLoop 構造器中也可以知道,NioEventLoop 構造器中都傳入了executor這個參數
接着建立了一個指定大小的線程池,這裡的線程池就是用來執行我們的 EventLoop,即監聽事件。
下面就開始往線程池中放東西了,for 循環的開始是一個 newChild(executor, args)方法,這個方法主要實作的功能就是 new 出一個 NioEventLoop 執行個體,具體可以參考 NioEventLoopGroup 中的方法:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
eventLoop 建立完畢,下面的操作就是一個安全性保護,這裡就可以使用了。當有 I/O 事件來,就從線程池中取出一個線程來執行,那麼怎麼取就是根據 chooser 選擇器的政策來執行, 調用選擇器的 next()方法。
這裡線程的初始化就結束了,是以這麼多的轉跳隻做了一個事情:建立 Selector 的線程池。
-
NioEventLoop 做了什麼#
上面分析了 EventLoopGroup 的作用是定義了一個線程池,建立 EventLoop,而EventLoop 的作用不言而喻,按照 Reactor 模型來了解,大概就是兩件事:監聽連接配接請求,将事件分發給 handler 處理。下面我們就詳細分析一下 NioEventLoop 的代碼。
入口就是 newChild()方法,傳回的是 EventLoopGroup 的構造函數。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
selector = openSelector();
selectStrategy = strategy;
//父類構造函數
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
//建立了一個指定大小的隊列
tailTasks = newTaskQueue(maxPendingTasks);
//父類的父類構造函數
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEYSET_OPTIMIZATION) {
return selector;
}
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (ClassNotFoundException e) {
return e;
} catch (SecurityException e) {
return e;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(selector.getClass())) {
if (maybeSelectorImplClass instanceof Exception) {
Exception e = (Exception) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
}
return selector;
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
} else {
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", selector);
}
return selector;
上面是 NioEventLoop 的構造函數,裡面又有調用父類的構造器,一般這種我們看到都很頭痛,寫碼一直爽,看碼火葬場,你需要一直跳進跳出。我們上面啟動 Netty 服務的是時候配置設定了一個 boss 線程池,一個 worker 線程池,boss 線程池啟動隻需要一個線程,主要負責用戶端的連接配接請求;而 worker 線程池就是用來處理 目前這個連接配接上所有事件用的,一個 worker 線程就是一個 EventLoop,一個 channel 隻會被一個 EventLoop 處理,但 一個 EventLoop 可以處理多個 channel。
NioEventLoop 的本質是一個線程,那麼這個線程是在何時被初始化,又是如何處理連接配接事件的監聽的呢?至少目前是沒有看到眉目,我們先看一下類結構圖,從父類身上找找關鍵資訊:
SingleThreadEventLoop 的父類 SingleThreadEventExecutor 的構造函數很有意思了,這是一個隻有一個線程的線程池, 先看看其中的幾個變量:
state:線程池目前的狀态;
taskQueue:存放任務的隊列;
thread:線程池維護的唯一線程;
scheduledTaskQueue:定義在其父類AbstractScheduledEventExecutor中,用以儲存延遲執行的任務。
我們先記住這些變量哈,下面會解釋。
因為 EventLoop 本質就是一個線程,這個線程的初始化在哪呢?往上翻看父類的資訊,不難看出:SingleThreadEventExecutor 類裡面有初始化線程的操作,它的初始化過程在 doStartThread()方法中,往上跟蹤初始化的調用鍊:
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
調用的地方是 execute()方法。這個方法是 SingleThreadEventExecutor 對外暴露的唯一接口:
public interface Executor {
void execute(Runnable command);
即所有通過 EventLoop 送出的任務都是通過這一個線程來執行。另外上面的父類構造方法中我們看到有隊列的初始化,不難看出,隊列的作用是當有多個事件同時在一個 EventLoop 中待執行的時候,EventLoop 的做法是将任務包裝成對象存放在隊列中然後按照先後順序執行。那麼是不是肯定有個執行的方法呢?比如一個循環的取出任務的方法,這個是有的,先看 doStartThread()方法的代碼:
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...
...
...
}
});
代碼比較多,就趕重要的說,看到在 doStartThread() 方法中執行了一個異步線程,而線程中做的事情是調用SingleThreadEventExecutor.this.run()方法。這個 run()是何方神聖呢? SingleThreadEventExecutor 中的 run()隻是一個抽象方法:
protected abstract void run();
具體的實作在子類,我們看 NioEventLoop 裡面的實作:
protected void run() {
for (;;) {
try {
// 判斷接下來是是執行select還是直接處理IO事件和執行隊列中的task
// hasTask判斷目前線程的queue中是否還有待執行的任務
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
// 說明目前queue中沒有task待執行
select(wakenUp.getAndSet(false));
// 喚醒epoll_wait
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
/* ioRatio調節連接配接事件和内部任務執行事件百分比
* ioRatio越大,連接配接事件處理占用百分比越大 */
final int ioRatio = this.ioRatio;
// 如果比例是100,表示每次都處理完IO事件後,執行所有的task
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
// 處理IO事件
processSelectedKeys();
// 目前時間減去處理IO事件開始的時間就是處理IO事件花費的時間
final long ioTime = System.nanoTime() - ioStartTime;
// 執行task的時間taskTime就是ioTime * (100 - ioRatio) / ioRatio
// 如果taskTime時間到了還有未執行的task,runAllTasks也會傳回
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
// io.netty.channel.DefaultSelectStrategy#calculateStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
// 如果還有task待執行則先執行selectNow,selectNow是立即傳回的,不是阻塞等待
// 如果沒有待執行的task則執行select,有可能是阻塞等待IO事件
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
// io.netty.channel.nio.NioEventLoop#selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
// epoll_wait的參數timeout可以指定逾時時間,selectNow傳入的參數是0,也就是不逾時等待立即傳回
return selectNow();
};
上面的 run()方法裡面是一個死循環,在執行select()前有一個hasTasks()的操作,這個hasTasks()方法判斷目前 taskQueue 是否有元素。如果 taskQueue 中有元素,執行 selectNow()方法,最終執行selector.selectNow(),該方法會立即傳回,保證了 EventLoop 在有任務執行時不會因為 I/O 事件遲遲不來造成延後處理,這裡優先處理 I/O 事件,然後再處理任務。
知識點
這裡插入一個知識點,selectNow() 其實暴露的就是 Java 封裝的 epoll 模型的一部分。具體參考:
java.nio.channels.Selector 類:
public abstract class Selector implements Closeable {
protected Selector() { }
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
public abstract boolean isOpen();
public abstract SelectorProvider provider();
public abstract Set<SelectionKey> keys();
public abstract Set<SelectionKey> selectedKeys();
public abstract int selectNow() throws IOException;
public abstract int select(long timeout)
throws IOException;
public abstract int select() throws IOException;
public abstract Selector wakeup();
public abstract void close() throws IOException;
上面三個select方法都調用了 lockAndDoSelect,隻是 timeout 參數不同,其實最後就是調用 epoll_wait 參數不同,epoll_wait 有一個timeout參數,表示逾時時間:
-1:阻塞
0:立即傳回,非阻塞
大于0:指定微秒
詳細的分析限于篇幅就不在這裡說了哈。大家可以下去慢慢看。
如果目前 taskQueue 沒有任務時,就會執行select(wakenUp.getAndSet(false))方法,代碼如下:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
/* delayNanos(currentTimeNanos):計算延遲任務隊列中第一個任務的到期執行時間(即最晚還能延遲多長時間執行),預設傳回1s。每個SingleThreadEventExecutor都持有一個延遲執行任務的優先隊列PriorityQueue,啟動線程時,往隊列中加入一個任務。*/
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
/* 如果延遲任務隊列中第一個任務的最晚還能延遲執行的時間小于500000納秒,且selectCnt == 0(selectCnt 用來記錄selector.select方法的執行次數和辨別是否執行過selector.selectNow()),則執行selector.selectNow()方法并立即傳回。*/
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
//如果在wakenUp值為true時送出了任務,則該任務沒有機會調用
// Selector#wakeup。是以,我們需要在執行選擇操作之前再次檢查任務隊列。
//如果不這樣做,則可能會挂起任務,直到選擇操作逾時。
//如果管道中存在IdleStateHandler,則可能要等待直到空閑逾時。
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 逾時阻塞select
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
// 有事件到來 | 被喚醒 | 有内部任務 | 有定時任務時,會傳回
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
if (Thread.interrupted()) {
//線程被中斷,是以重置標明的鍵并中斷,這樣我們就不會遇到繁忙的循環。
//由于這很可能是使用者或其用戶端庫的處理程式中的錯誤,是以我們将其記錄下來。
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
//此處的邏輯就是: 目前時間 - 循環開始時間 >= 定時select的時間timeoutMillis,說明已經執行過一次阻塞select()
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// 阻塞逾時後沒有事件到來,重置selectCnt
selectCnt = 1;
// 如果空輪詢的次數大于空輪詢次數門檻值 SELECTOR_AUTO_REBUILD_THRESHOLD(512)
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
// 1.首先建立一個新的Selecor
// 2.将舊的Selector上面的鍵及其一系列的資訊放到新的selector上面。
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
這個方法解決了Nio中臭名昭著的 BUG:selector 的 select 方法導緻空輪詢 cpu100%。對 Selector()方法中的阻塞定時 select(timeMIllinois) 操作的 次數進行統計,每完成一次 select 操作進行一次計數,若在循環周期内 發生 N 次空輪詢,如果 N 值大于 BUG 門檻值(預設為512),就進行空輪詢 BUG 處理。重建 Selector,判斷是否是其他線程發起的重建請求,若不是則将原 SocketChannel 從舊的 Selector 上去除注冊,重新注冊到新的 Selector 上,并将原來的 Selector 關閉。
當java NIO BUG 觸發時,進行 Selector 重建,rebuildSelector 過程如下:
通過方法 openSelector 建立一個新的 selector;
将 old selector 的 selectionKey 執行 cancel;
将 old selector 的 channel 重新注冊到新的 selector 中。
Netty 的連接配接處理就是 I/O 事件的處理,I/O 事件包括 READ 事件、ACCEPT 事件、WRITE 事件和 OP_CONNECT 事件:
ACCEPT 事件:連接配接建立好之後将該連接配接的 channel 注冊到 workGroup 中某個 NIOEventLoop 的 selector中;
READ 事件:從 channel 中讀取資料,存放到 byteBuf 中,觸發後續的 ChannelHandler 來處理資料;
WRITE 事件:正常情況下一般是不會注冊寫事件的,如果 Socket 發送緩沖區中沒有空閑記憶體時,在寫入會導緻阻塞,此時可以注冊寫事件,當有空閑記憶體(或者可用位元組數大于等于其低水位标記)時,再響應寫事件,并觸發對應回調;
CONNECT 事件:該事件是 Client 觸發的,由主動建立連接配接這一側觸發的。
再把目光從 select() 方法拉回到 run() 方法, 這個死循環的終止邏輯是遇到 confirmShutdown() 方法。然後在循環裡會詢問是否有事件,如果沒有,則繼續循環,如果有事件,那麼就開始處理。
往下看代碼中有一個字段:ioRatio,預設值是 50,這個比例是處理 I/O 事件所需的時間和花費在處理 task 時間的比例。即如果花了 5s 去處理 I/O 事件, 那麼也會花 5s 去處理 task 任務。處理 I/O 事件的操作主要是在 processSelectedKeys()方法中:
private void processSelectedKeys() {
//
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
當有了新 I/O 請求進來, JDK 原生的 Selector 将 SelectionKey 放入感興趣的 key 的集合中,而這個集合現在就是 Netty 通過反射的方式強制替換為以數組為資料結構的selectedKeys:
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
//數組輸出空項, 進而允許在channel 關閉時對其進行垃圾回收
//數組中目前循環對應的keys置空, 這種感興趣的事件隻處理一次就行
selectedKeys[i] = null;
// 擷取出 attachment,預設情況下就是注冊進Selector時,傳入的第三個參數 this===> NioServerSocketChannel
// 一個Selector中可能被綁定上了成千上萬個Channel, 通過K+attachment 的手段, 精确的取出發生指定事件的channel, 進而擷取channel中的unsafe類進行下一步處理
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//進入這個方法, 傳進入 感興趣的key + NioSocketChannel
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// 判斷是否需要再次輪詢
if (needsToSelectAgain) {
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
selectedKeys = this.selectedKeys.flip();
i = -1;
}
selectedKeys 是一個 set,與 selector 綁定,selector 在調用 select() 族方法的時候,如果有 I/O 事件發生,就會往 selectedKeys 中塞相應的 selectionKey。而 selectedKeys 内部維護了兩個 SelectionKey[] 數組,重寫了 set 的 #add 方法,在#add 的時候實際上是往數組裡面塞 SelectionKey。而在周遊時隻用周遊數組而不是周遊set。
處理輪詢到的IO事件也主要是三步:
取出輪詢到的SelectionKey
取出與用戶端互動的channel對象,處理channel
判斷是否需要再次輪詢
上面提到過,一個 EventLoop 是可以處理多個 channel 的,并且保證一個 channel 事件隻會在同一個 EventLoop 中被處理,那麼這裡的如何保證同一個 channel 會被某個曾經處理過他的 EventLoop 識别呢?
關鍵就在 SelectionKey,上面看到 a 對象其實就是一個 NioSocketChannel,在 AbstractNioChannel 中有一個#doRegister 方法,這裡将 JDK 的 channel 注冊到 selector 上去,并且将自身設定到 attachment 上。這樣 JDK 輪詢出某條 SelectableChannel 有 I/O 事件發生時,就可以直接取出 AbstractNioChannel 了。
我們繼續看看 processSelectedKey(k, (AbstractNioChannel) a)是如何處理感興趣的事件的:
//當有新連接配接進來,就會到這裡
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//netty底層對資料的讀寫都是unsafe完成的。這個unsafe也是和Channel進行唯一綁定的對象
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
//上面這一串都是在校驗 key 的合法性
try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
//處理write事件的flush
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
//處理讀和新連接配接的accept事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
首先在讀寫之前都要先調用 finishConnect,來確定與用戶端連接配接上。這個過程最終會傳遞給 channelHandle r的channelActive 方法,是以可以通過 channelActive 來驗證有多少用戶端線上。
接下來是處理 write 事件的 flush,注意,我們的 write 不是在這裡做的,真正的 write 一般是封裝成 task 去執行的。
最後是處理讀和新連接配接的 accept 事件。Netty 将新連接配接的 accept 也當做一次 read。對于 boss NioEventLoop 來說,新連接配接的 accept 事件在 read 的時候通過他的 pipeline 将連接配接扔給一個 worker NioEventLoop 處理;而worker NioEventLoop 處理讀事件,是通過他的 pipeline 将讀取到的位元組流傳遞給每個 channelHandler 來處理。
從這裡也可以看出來 Netty 所有關于 I/O 操作都是通過内部的 Unsafe 來實作的。
還記得我們是在哪裡扯到了 I/O操作的 processSelectedKeys方法嘛!感覺在扯遠的道路上越來越遠了。再把視線回到 NioEventLoop 的 #run()方法, I/O 操作都是processSelectedKeys方法來處理,下面還有個runAllTasks方法,它是用于處理封裝好的事件操作的。可以看到 runAllTasks 有個函數是帶了事件參數的,雖然設定了一個可以運作的時間參數,但是實際上 Netty 并不保證能精确的確定非 I/O 任務隻運作設定的毫秒,下面來看下 runAllTasks 帶時間參數的代碼:
/ timeoutNanos:任務執行花費最長耗時/
protected boolean runAllTasks(long timeoutNanos) {
// 把scheduledTaskQueue中已經超過延遲執行時間的任務移到taskQueue中等待被執行。
fetchFromScheduledTaskQueue();
// 非阻塞方式pollTask
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// 執行task
safeExecute(task);
runTasks ++;
// 依次從taskQueue任務task執行,每執行64個任務,進行耗時檢查。
// 如果已執行時間超過預先設定的執行時間,則停止執行非IO任務,避免非IO任務太多,影響IO任務的執行。
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
NioEventLoop 執行 task 的過程,同樣可以分成幾步:
從 scheduledTaskQueue 轉移定時任務到 taskQueue;
計算本次任務循環的截止時間;
執行任務;
執行完任務後的工作。
從上面可以看到 NioEventLoop 中至少有兩種隊列,taskQueue 和 scheduledTaskQueue。
EventLoop 是一個 Executor,是以使用者可以向 EventLoop 送出 task。在 execute 方法中,當 EventLoop 處于循環中或啟動了循環後都會通過 addTask(task)向 EventLoop 送出任務。SingleThreadEventExecutor 内部使用一個 taskQueue 将task 儲存起來。
taskQueue最大的應用場景就是使用者在 channelHandler 中擷取到 channel,然後通過 channel.write() 資料,這裡會把 write 操作封裝成一個 WriteTask,然後通過 eventLoop.execute(task) 執行,實際上是給 EventLoop 送出了一個 task,加入到 taskQueue 隊列中。
同時,EventLoop也是一個ScheduledExecutorService,這意味着使用者可以通過ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)方法向EventLoop送出定時任務。是以,EventLoop内部也維護了一個優先級隊列scheduledTaskQueue來儲存送出的定時任務。
知道了NioEventLoop内部的任務隊列後,再來看執行task的過程。
第一步,是将到期的定時任務轉移到taskQueue中,隻有在目前定時任務的截止時間已經到了,才會取出來。
然後第二步計算本次任務循環的截止時間deadline。
第三步真正去執行任務,先執行task的run方法,然後将runTasks加一,每執行完64(0x3F)個任務,就判斷目前時間是否超過deadline,如果超過,就break,如果沒有超過,就繼續執行。
需要注意的是,這裡如果任務沒執行完break掉了,afterRunningAllTasks後,NioEventLoop就會重新開始一輪新的循環,沒完成的任務仍然在taskQueue中,等待runAllTasks的時候去執行。
最後一步是afterRunningAllTasks,執行完所有任務後需要進行收尾,相當于一個鈎子方法,可以作統計用。
最後總結一下處理任務隊列的task的過程就是:
eventLoop是一個Executor,可以調用execute給eventLoop送出任務,NioEventLoop會在runAllTasks執行。NioEventLoop内部分為普通任務和定時任務,在執行過程中,NioEventLoop會把過期的定時任務從scheduledTaskQueue轉移到taskQueue中,然後執行taskQueue中的任務,同時每隔64個任務檢查是否該退出任務循環。
-
EventLoop 如何綁定 channel#
上面的長篇大論其實隻是分析了 Netty 初始化一個 Reactor 線程是多麼的艱難。考慮了太多的事情。我們一開頭寫了一個Demo,到現在為止都沒有分析到用戶端和服務端啟動的時候是如何将 Reactor 線程和 channel 綁定起來的,即啟動的時候如何将一個 SocketChannel 綁定到 work thread 上。是以我們還是從啟動過程分析一下,走一遍總體流程。
用戶端啟動
再重溫一下用戶端啟動代碼:
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ClientChannelInitializer());
ChannelFuture future = bootstrap.connect(address,port).sync();
future.channel().writeAndFlush("Hello world, i'm online");
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error("client start fail",e);
}finally {
group.shutdownGracefully();
EventLoopGroup 就不用解釋了,初始化了一個啟動線程池。下面的 Bootstrap 是Netty 封裝的啟動類,通過一連串的鍊式調用綁定 Selector 線程,啟動指定類型的SocketChannel 和 初始化處理邏輯。
初始化好啟動資訊之後調用 connect()進行連接配接:
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
validate();
return doResolveAndConnect(remoteAddress, config.localAddress());
首先是校驗一下必傳參數是否存在,端口、IP 以及 handler 資訊是否初始化。下面的 doResolveAndConnect方法就是連接配接的主要邏輯:
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
//完成channel 的初始化和注冊
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
//注冊成功直接傳回
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
// 如果注冊還在進行中,需要向future對象添加一個監聽器,以便在注冊成功的時候做一些工作,監聽器實際上就是一個回調對象
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Direclty obtain the cause and do a null check so we only need one volatile read in case of a
// failure.
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
// 注冊成功後仍然調用doResolveAndConnect0方法完成連接配接建立的過程
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
這個方法其實隻做了兩個事情:
初始化一個 Channel 對象并注冊到 EventLoop 中;
調用 doResolveAndConnect0() 方法完成 tcp 連接配接的建立。
繼續看初始化 channel 的過程:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//使用工廠類ChannelFactory的newChannel通過反射建立Channel執行個體
channel = channelFactory.newChannel();
//調用init方法執行初始化操作
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// 若在建立執行個體和初始化期間抛出異常,建立DefaultChannelPromise執行個體,寫入異常并傳回
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
//調用EventLoopGroup的register方法,完成注冊操作
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
/** 如果程式到這裡,說明promise沒有失敗,可能發生以下情況之一
* 1) 如果嘗試将Channel注冊到EventLoop,且此時注冊已經完成;inEventLoop傳回true,channel已經成功注
* 冊,可以安全調用bind() or connect()
* 2) 如果嘗試注冊到另一個線程,即inEventLoop傳回false,則此時register請求已成功添加到事件循環的任務隊
* 列中,現在同樣可以嘗試bind()或connect(),因為bind()或connect()會被排程在執行register
* Task之後執行, 因為register(),bind()和connect()都被綁定到同一個I/O線程。
*/
return regFuture;
channel 的注冊過程主要就在上面的一句代碼中:
ChannelFuture regFuture = config().group().register(channel);
跟着register()往下走,可以跟到一段代碼:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
這裡就是從 EventLoopGroup 中拿到特定的 EventLoop,如何配置設定的過程上面已經有分析,就是調用 NioEventLoop 的 next()方法。判斷 NioEventLoop 的線程是否已經啟動,如果已經啟動,調用 register0方法;否則調用 eventLoop.execute 方法啟動線程。
再跟一下 register0(promise)的代碼:
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//調用JDK去注冊Selector
doRegister();
neverRegistered = false;
registered = true;
//設定注冊成功通知監聽器
safeSetSuccess(promise);
//觸發注冊成功事件
pipeline.fireChannelRegistered();
//如果是第一次則觸發激活成功事件
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
在這裡調用 JDK NIO 去注冊 Selector,設定注冊成功的監聽事件。
這裡是不是就把上面分析的 EventLoop 的 execute 方法和 啟動聯系起來了,通過execute 來執行task。
initAndRegister方法的主要過程就分析完了,其實主要有三點:
建立了一個NioServerSocketChannel對象;
為NioServerSocketChannel對應的ChannelPipeLine增加了一個ServerBootstrapAcceptor處理器,用來處理新的連接配接;
從NioEventLoopGroup中配置設定了一個NioEventLoop,用于監聽NioServerSocketChannel通道上的 I/O 事件。
用戶端的啟動就分析完成,工作量還是不少哈。
Server 端啟動
還是先看一下 Server 啟動的代碼:
.channel(NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer());
ChannelFuture future = server.bind(port).sync();
} catch (InterruptedException e) {
log.error("server start fail",e);
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
這裡看到有個差別是:用戶端啟動是通過 Bootstrap 啟動類來實作,調用 connect()方法,服務端啟動是ServerBootstrap啟動類來實作,調用 bind()方法。
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
return doBind(localAddress);
private ChannelFuture doBind(final SocketAddress localAddress) {
if (regFuture.cause() != null) {
return regFuture;
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
bind 方法裡面就沒啥要說的,已經在用戶端啟動的時候解釋過了,建立 channel ,綁定 EventLoop,再看doBind0()方法:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
是不是跟用戶端的 register()方法大同小異呢,也是調用 EventLoop 送出 task。
-
本篇小結#
回顧這一篇,開始寫了一個 Demo, 給出了如何開啟一個 Netty 服務端和用戶端的案例。然後從EventLoopGroup 說起,我們談到了 Reactor 的3種線程模型,接着繼續說 EventLoopGroup 的作用就是一個承載 EventLoop 的線程池,那麼 EventLoop 的作用又是什麼。它是執行連接配接事件和 I/O 事件的基石,本質就是一個Selector 線程。所有的連結都是在這裡初始化和承載,由它交于後面的 handler 進行處理,處理完之後的結果由它傳回給連接配接的用戶端。
最後從用戶端 和 服務端 的啟動代碼一起分析了啟動的流程,将啟動的過程與 EventLoop 的 execute()方法連接配接起來,組成一個整體。
Netty 之是以在啟動的時候代碼寫的這麼複雜,主要原因是要适配它所搭起來的架構,是以調用流程非常的隐晦。這個過程中也做了很多的工作,處理各種異常,以及原生 NIO 的 BUG。這一篇暫時就先到這裡,下一節我們來聊建立起來 channel 和 Selector 的關系之後,後面的 handler 操作又是如何處理的。
作者: rickiyang
出處:
https://www.cnblogs.com/rickiyang/p/12562408.html