文章目錄
- 1. 開篇
- 2. 類繼承關系以及重要的成員變量
-
- 2.1 類繼承關系
- 2.2 重要的成員變量
- 2.3 構造函數
-
- 2.3.1 在構造函數中,會建立任務隊列和tailTask隊列
- 2.3.2 多路複用器Selector
- 2.4 NioEventLoop執行流程
-
- 2.4.1 添加一個線程
- 2.4.2 run方法解析
-
- 2.4.2.1 calculateStrategy
- 2.4.2. 阻塞的select
- 2.4.2.3 processSelectedKeys()
- 總結
系類文章:
《Netty服務端啟動源碼分析(一)整體流程》
《Netty服務端啟動源碼分析(二)服務端Channel的端口綁定》
《Netty核心元件之NioEventLoop(一)建立》
《Netty核心元件之NioEventLoop(二)處理消息》
1. 開篇
- NioEventLoop是Netty架構的Reactor線程;
-
NioEventLoop負責處理注冊在其上面的所有Channel的IO事件,通常情況下一個NioEventLoop會下挂多個Channel,但一個Channel隻和唯一的NioEventLoop對應
例如NioEventLoop A B 2個,A和c1、c2對應,B和c3、c4對應,不會交叉。
一個NioEventLoop對應一個selector多路複用器,為了負載均衡,新的Channel會平攤到每個NioEventLoop上,一旦确定後,就不會更改。
- NioEventLoop同時會負責通過execute方法送出的任務,以及通過schedule方法送出的定時任務;
在接下來幾篇文章,我會通過Netty的源碼深入講解NioEventLoop的實作機制。
特别說明:基于4.1.52版本的源碼(很新的版本,和之前4.xx版本還是有些不同的)
2. 類繼承關系以及重要的成員變量
先來看下NioEventLoop的類關系圖和重要的屬性,對其有一個整體的感覺,便于後面詳細分析。
2.1 類繼承關系
可以看到
NioEventLoop
的繼承關系非常複雜,最上層是JDK的
Executor
接口,說明它歸根到底是一個執行器,是用來執行任務的。另外,它實作了
EventLoop
接口、
EventExecutorGroup
接口和
ScheduledExecutorService
接口,繼承了
SingleThreadEventExecutor
類,這些接口和類為這個執行器添加了十分繁複的功能特性,要搞清楚NioEventLoop的具體實作機制就要不停的在這些父類和接口中來回跳轉。
-
接口表示是一個定時任務接口,EventLoop可以接受定時任務。ScheduledExecutorService
-
接口: 一旦Channel注冊了,就處理該Channel對應的所有I/O操作。EventLoop
-
表示這是一個單個線程的線程池。SingleThreadEventExecutor
-
是一個單例的線程池,裡面含有一個死循環的線程不斷地做着三件事情:監聽端口,處理端口事件,處理隊列事件。每個Eventloop都可以綁定多個Channel,而每個Channel始終隻能由一個EventLoop來處理EventLoop
也就是說,建立了NioEventLoop後,擁有一個線程池類型的成員變量,緊接着會往該線程池加入一個線程,該線程就負責阻塞監聽IO事件等操作。添加線程由SingleThreadEventExecutor的doStartThread()完成,線程具體的幹的事情由NioEventLoop的run()實作
2.2 重要的成員變量
private Selector selector;
private SelectedSelectionKeySet selectedKeys;
private volatile Thread thread;
private final EventExecutorGroup parent;
private final Queue<Runnable> taskQueue;
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
private final Queue<Runnable> tailTasks;
-
:作為NIO架構的Reactor線程,NioEventLoop需要處理網絡IO事件,是以它需要有一個多路複用器,即Java NIO的Selector對象;selector
-
:每次select操作選出來的有事件就緒的SelectionKey集合,在NioEventLoop的run方法中會處理這些事件;selectedKeys
-
:即每個NioEventLoop綁定的線程,它們是一對一的關系,一旦綁定,在整個生命周期内都不會改變;thread
-
:即目前的NioEventLoop所屬的EventExecutorGroup;parent
-
:NioEventLoop中三大隊列之一,用于儲存需要被執行的任務。taskQueue
-
:NioEventLoop中三大隊列之一,是一個優先級隊列(内部其實是一個按照任務的下次執行時間排序的小頂堆),用于儲存定時任務,當檢測到定時任務需要被執行時,會将任務從scheduledTaskQueue中取出,放入taskQueue;scheduledTaskQueue
-
:NioEventLoop中三大隊列之一,用于存儲目前或下一次事件循環結束後需要執行的任務;tailTasks
2.3 構造函數
在調用構造函數建立執行個體之前,有必要回顧下NioEventLoop是何時被建立的,是在建立NioEventLoopGroup的時候就已經建立了!詳情參見《Netty服務端啟動源碼分析(一)》 中"1.1.2 構造函數幹了什麼"章節。也就是說NioEventLoop是被預建立的,發生在通道的建立和與通道的綁定之前。
首先來看NioEventLoop的構造函數:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
// 設定parent、executor、addTaskWakesUp(添加任務時是否喚醒select)、建立taskQueue和tailTask隊
// 列
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
// selector初始化
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
2.3.1 在構造函數中,會建立任務隊列和tailTask隊列
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
預設情況下,會建立MPSC,即多生産者單消費者的隊列,這裡最終會用到JCTools庫,這裡不過多介紹,感興趣的可以自己去了解。
2.3.2 多路複用器Selector
Netty的實作是基于Java原生的NIO的,其對原生的NIO做了很多優化,避免了某些bug,也提升了很多性能。但是底層對于網絡IO事件的監聽和處理也是離不開多路複用器Selector的。在NioEventLoop的構造方法中進行了Selector的初始化:
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
構造函數中還會初始化selector和根據配置對selectedKeys進行優化
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 如果優化選項沒有開啟,則直接傳回
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
-
調用openSelector();,初始化一個epoll,并持有這個selector
等價于 原生nio文法:Selector selector = Selector.open();
更多的nio知識,可以參見:《bio、nio、aio–Netty筆記》中<3.2 NIO引入多路複用器代碼示例>的架構示意圖部分
如果設定了優化開關(預設優化選項是開啟的),則通過反射的方式從Selector中擷取selectedKeys和publicSelectedKeys,将這兩個成員設定為可寫,通過反射,使用Netty構造的selectedKeySet将原生JDK的selectedKeys替換掉。
我們知道使用Java原生NIO接口時,需要先調Selector的select方法,再調selectedKeys方法才可以獲得有IO事件準備好的SelectionKey集合。這裡優化過後,隻通過一步select調用,就可以從selectedKeySet獲得需要的SelectionKey集合。
另外,原生Java的SelectionKey集合是一個HashSet,這裡優化過後的SelectedSelectionKeySet底層是一個數組,效率更高。
2.4 NioEventLoop執行流程
也就是說,建立了NioEventLoop後,擁有一個線程池類型的成員變量,緊接着會往該線程池加入一個線程,該線程就負責阻塞監聽IO事件等操作。添加線程由SingleThreadEventExecutor的doStartThread()完成,線程具體的幹的事情由NioEventLoop的run()實作。
2.4.1 添加一個線程
我們先來看下添加線程的代碼,定義在父類中的,為了友善,我進行了簡化,僅列出重要代碼:
public abstract class SingleThreadEventExecutor {
private void doStartThread() {
//添加一個Runnable作為線程任務到executor(線程池)中
executor.execute(new Runnable() {
@Override
public void run() {
try {
//Runnable匿名内部類調用外部類SingleThreadEventExecutor的run(),實際會調用子類NioEventLoop對象執行個體的run()
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
}
}
});
}
代碼比較簡單,但是有個不容易了解的地方,即 SingleThreadEventExecutor.this.run();
,這個是匿名表達式引用外部類方法的文法,即Runnable的run()方法想引用外部類的run()方法,詳情參見《Lambda表達式裡的“陷阱“ 匿名表達式 qualified this》
2.4.2 run方法解析
添加線程後,線程池就會調用該線程,那麼線程具體幹了哪些事情呢?
EventLoop的職責可以用下面這張圖形象的表示:
EventLoop的run方法在一個for死循環中,周而複始的做着三件事:
1、從已注冊的Channel監聽IO事件;對應select()阻塞監聽,當然為了避免一直阻塞,一般帶逾時時間
2、處理IO事件;
3、從任務隊列取任務執行。
public final class NioEventLoop extends SingleThreadEventLoop {
protected void run() {
int selectCnt = 0;
for (;;) {
int strategy;
try {
// 計算本次循環的執行政策
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
// 調用Java NIO的多路複用器,檢查注冊在NioEventLoop上的Channel的IO狀态
strategy = select(curDeadlineNanos);
}
} catch (IOException e) {
}
// 處理IO事件
processSelectedKeys();
// 處理任務隊列中的任務
ranTasks = runAllTasks();
...
}
整個run()方法被包裹在一個for循環中,唯一能夠結束循環的條件是狀态state為SHUTDOWN或者TERMINATED,NioEventLoop繼承了SingleThreadEventExecutor,isShuttingDown()和confirmShutdown()都是SingleThreadEventExecutor中的方法。
可以看到,除去異常處理和一些分支流程,整個run()方法不是特别複雜,重點在與select()和selectNow()方法,run()方法流程如下圖所示:
下面詳細解析
2.4.2.1 calculateStrategy
calculateStrategy決定了調用selectNow()還是select()。
先來看calculateStrategy:
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
//成員變量selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty() || !tailTasks.isEmpty();
}
- 每次循環,都會檢測任務隊列和IO事件,如果任務隊列中沒有任務,則直接傳回SelectStrategy.SELECT;
因為任務清單為空,則允許調用select阻塞等待新的消息的到來
- 如果任務隊列中有任務,則會調用非阻塞的selectNow檢測是否有IO事件準備好的Channel數。
因為任務清單不為空,要選用非阻塞的方法,這樣原有的任務可以繼續執行
2.4.2. 阻塞的select
當任務隊列中沒有任務時,直接進入select分支
case SelectStrategy.SELECT:
// 找到下一個将要執行的定時任務的截止時間
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
// 阻塞調用select
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
阻塞調用select:
private int select(long deadlineNanos) throws IOException {
// 如果沒有定時任務,直接調Java NIO的select,進入阻塞
if (deadlineNanos == NONE) {
return selector.select();
}
// 如果截止時間小于0.5ms,則timeoutMillis 為0,直接調非阻塞的selectNow()方法
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
在調用select之前,再次調用hasTasks()判斷從上次調用該方法到目前為止是否有任務加入,多做了一層防護,因為調用select時,可能會阻塞,這時,如果任務隊列中有任務就會長時間得不到執行,是以須小心謹慎。
如果任務隊列中還是沒有任務,則會調用select方法。在這個方法中會根據入參deadlineNanos來選擇調用NIO的哪個select方法:
如果deadlineNanos為NONE,即沒有定時任務時,直接調用NIO的無參select方法,進入永久阻塞,除非檢測到Channel的IO事件或者被wakeup;
如果存在定時任務,且定時任務的截止時間小于0.5ms,則timeoutMillis 為0,直接調非阻塞的selectNow方法,也就是說馬上有定時任務需要執行了,不要再進入阻塞了;
其他情況,調用select(timeout),進入有逾時時間的阻塞。
2.4.2.3 processSelectedKeys()
參見 《Netty核心元件之NioEventLoop(二)處理消息》
總結
在本文中,對Netty的NioEventLoop進行了深入的解讀,并且詳細講解了它的三大職責之一:檢測Channel的IO事件的機制。
NioEventLoop是Netty最核心的概念,内部運作機制很複雜,在接下來的兩篇文章中會繼續分析。
參考:
《Netty核心元件之NioEventLoop(一)》