閱讀須知
- Netty版本:4.1.14.Final
- 文章中使用注釋的方法會做深入分析
正文
Netty的EventLoop主要用于處理I/O操作,而NioEventLoop就是EventLoop的主要實作之一,它将Channel注冊到Selector,是以在事件循環中可以對這些進行多路複用。我們首先來看NioEventLoop的層次結構:
我們來看它的構造方法:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 調用父類構造方法初始化任務隊列和各個屬性
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
/* 開啟多路複用器 */
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
NioEventLoop:
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEYSET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object> () {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object> () {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
方法很長,首先調用JDK NIO的API開啟Selector,剩餘核心邏輯就是根據設定來判斷是否采取selectedKeys優化功能。
在上面的結構圖中我們看到,NioEventLoop還實作了JDK的Executor,熟悉線程池的同學對這個接口一定不陌生,是以NioEventLoop也同樣可以執行Runnable任務,在Netty中NioEventLoop主要負責執行一些系統任務和定時任務,我們來看相關方法實作:
SingleThreadEventExecutor:
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 判斷目前線程是否是目前EventLoop中的線程
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
// 添加任務到隊列中等待執行,添加失敗走拒絕政策
addTask(task);
} else {
startThread(); /* 啟動線程 */
// 添加任務到隊列中等待執行,添加失敗走拒絕政策
addTask(task);
if (isShutdown() && removeTask(task)) {
// 如果線程已關閉并且成功從隊列中移除了任務,則抛出異常拒絕任務
reject();
}
}
// addTaskWakesUp在NioEventLoop構造方法中傳入的是false
// 判斷task是否不是NonWakeupRunnable類型
if (!addTaskWakesUp && wakesUpForTask(task)) {
/* 喚醒selector */
wakeup(inEventLoop);
}
}
先來看wakeup方法的實作:
NioEventLoop:
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
// 可以讓尚未傳回的第一個select操作立即傳回
selector.wakeup();
}
}
NioEventLoop:
private void startThread() {
if (state == ST_NOT_STARTED) {
// CAS變更線程狀态,保證隻有一次成功
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
/* 啟動線程 */
doStartThread();
}
}
}
SingleThreadEventExecutor:
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
/* 子類實作run方法實作任務邏輯 */
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 自旋更新線程狀态為關閉
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
for (;;) {
// 運作所有剩餘的任務和關閉鈎子
if (confirmShutdown()) {
break;
}
}
} finally {
try {
// 子類實作清理操作,預設空實作
// NioEventLoop實作的清理操作就是關閉selector
cleanup();
} finally {
// 更新線程狀态為終止
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release(); // 釋放線程鎖
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
NioEventLoop:
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
/* select操作 */
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio的含義是event loop中I/O所需時間的百分比
// 預設值為50,這意味着event loop将嘗試花費與非I/O任務相同的I/O時間
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
/* 處理selectKey */
processSelectedKeys();
} finally {
/* 確定我們一直運作任務 */
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
/* 處理selectKey */
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
/* 確定我們一直運作任務,有逾時時間 */
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// 即使循環處理引發異常,也始終處理shutdown
try {
if (isShuttingDown()) {
/* 關閉所有Channel */
closeAll();
/* 确認shutdown */
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
下面我們就來分析比較關鍵的select操作:
NioEventLoop:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 計算下一個将要觸發的定時任務的逾時時間
// delayNanos方法會擷取到下一個定時任務的觸發時間
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 為逾時時間增加五毫秒的調整值
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000 L) / 1000000 L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
// 如果需要立即執行或者已經逾時,則執行輪詢操作
selector.selectNow();
// select計數器置為1
selectCnt = 1;
}
break;
}
// 如果在wakenUp值為true時送出了任務,則該任務沒有機會調用Selector的wakeup方法
// 是以我們需要在執行select操作之前再次檢查任務隊列
// 如果我們不這樣做,那麼任務可能會被挂起,直到select操作逾時
// 如果IdleStateHandler存在于pipeline中,它可能會被挂起直到idle timeout
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 剩餘的逾時時間作為參數調用Seletor的select方法
int selectedKeys = selector.select(timeoutMillis);
// 自增select計數器
selectCnt++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// 以下情況退出目前循環
// 1、select操作selectedKeys不為0,說明有讀寫事件需要處理
// 2、由使用者喚醒多路複用器或任務隊列具有待處理任務
// 3、定時任務已準備好進行處理
break;
}
if (Thread.interrupted()) {
// 線程被中斷,是以重置selected keys并退出循環,這樣就不會在繁忙的循環中執行
// 由于這很可能是使用者或其用戶端庫的處理程式中的錯誤,我們也會記錄它
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis已過,select沒有任何内容
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// selector連續多次select沒有内容,有可能觸發了JDK NIO epoll死循環導緻CPU 100%的bug
// 重建selector解決
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
/* 重建selector */
rebuildSelector();
selector = this.selector;
// 再次select以填充selectedKeys
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
}
}
NioEventLoop:
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
/* 目前線程不在event loop中,向executor送出新任務來重建selector */
rebuildSelector0();
}
});
return;
}
/* 重建selector */
rebuildSelector0();
}
NioEventLoop:
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
// 建立新的selector
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// 注冊所有的channel到新的selector
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
// 取消select key并取消channel在selector上的注冊
key.cancel();
// 将channel注冊到新的selector
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// 關閉舊的selector,因為所有channel已經注冊到新的selector
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
這樣,select方法就分析完成了,接下來肯定就是select key的處理了:
NioEventLoop:
private void processSelectedKeys() {
if (selectedKeys != null) {
/* 開啟優化的select key處理 */
processSelectedKeysOptimized();
} else {
// 不開啟優化的select key處理
processSelectedKeysPlain(selector.selectedKeys());
}
}
兩個分支處理select key的邏輯是比較相似的,我們以開啟優化的方式為例:
NioEventLoop:
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// 将數組中已經取出的元素置為null,以便channel關閉時GC掉
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
/* 處理select key */
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// needsToSelectAgain變量在run方法進入之前置為false
if (needsToSelectAgain) {
// 這裡也是為了GC
selectedKeys.reset(i + 1);
// 如果需要再次select則再次調用selecor的selectNow方法
selectAgain();
i = -1;
}
}
}
NioEventLoop:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// 如果Channel實作因為沒有event loop而抛出異常,我們會忽略它
// 因為我們隻是試圖确定channel是否已注冊到此event loop,是以有權關閉channel
return;
}
// 如果channel仍然注冊到此EventLoop,則僅關閉channel
// channel可以從event loop中取消注冊,是以SelectionKey可以作為登出過程的一部分被取消
// 但是該channel仍然是健康的,不應該關閉
if (eventLoop != this || eventLoop == null) {
return;
}
// 如果SelectionKey無效則關閉channel
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// 我們首先需要在嘗試觸發read或write之前調用finishConnect
// 否則NIO JDK channel實作可能會抛出NotYetConnectedException
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// 移除OP_CONNECT,否則Selector.select将始終傳回而不會阻塞
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 首先處理OP_WRITE,因為我們可以編寫一些排隊的緩沖區,是以可以釋放記憶體
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// 調用forceFlush,一旦沒有什麼可寫的,它也将清除OP_WRITE
ch.unsafe().forceFlush();
}
// 同時檢查readOps為0以解決可能導緻死循環的JDK bug
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 讀操作
unsafe.read();
}
} catch (CancelledKeyException ignored) {
// 異常關閉channel
unsafe.close(unsafe.voidPromise());
}
}
這裡的各種對Channel的操作,如close、read等,我們在分析ServerSocketChannel的相關源碼中分析過,這裡不再重複。處理完select key之後,需要在finally塊中保證任務一直運作:
SingleThreadEventExecutor:
protected boolean runAllTasks(long timeoutNanos) {
/* 從定時任務隊列中擷取任務 */
fetchFromScheduledTaskQueue();
// 從隊列中輪詢task,并且會檢查是否需要重新select
Runnable task = pollTask();
if (task == null) {
// 子類擴充實作任務執行結束之後的操作
// SingleThreadEventLoop覆寫此方法完成自己維護的tailTasks隊列中任務的執行
afterRunningAllTasks();
return false;
}
// 計算過期時間
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// 執行task的run方法,會捕捉異常避免抛出對上層的影響
safeExecute(task);
runTasks++;
// 每64個任務檢查逾時,因為nanoTime相對昂貴
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
// 逾時退出循環
if (lastExecutionTime >= deadline) {
break;
}
}
// 繼續從隊列中擷取任務
task = pollTask();
if (task == null) {
// 擷取不到任務記錄最後的執行時間并退出循環
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
// 同樣子類擴充實作任務執行結束之後的操作
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
SingleThreadEventExecutor:
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
// 從scheduledTaskQueue中擷取任務
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
// 取出任務放入taskQueue中
if (!taskQueue.offer(scheduledTask)) {
// 任務隊列中沒有剩餘空間将其添加回scheduledTaskQueue,是以我們可以再次提取它
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
// 繼續循環擷取任務
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
回到run方法,接下來就是shutdown流程了:
NioEventLoop:
private void closeAll() {
selectAgain();
Set<SelectionKey> keys = selector.keys();
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
for (SelectionKey k: keys) {
// 循環從SelectionKey中擷取channel放入channels集合中
Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
channels.add((AbstractNioChannel) a);
} else {
k.cancel();
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, k, null);
}
}
// 周遊channels集合關閉channel
for (AbstractNioChannel ch: channels) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
NioEventLoop:
protected boolean confirmShutdown() {
if (!isShuttingDown()) {
return false;
}
if (!inEventLoop()) {
throw new IllegalStateException("must be invoked from an event loop");
}
/* 取消定時任務 */
cancelScheduledTasks();
// gracefulShutdownStartTime為優雅停機開始時間
if (gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
}
// 運作所有剩餘的定時任務,并運作關閉鈎子
if (runAllTasks() || runShutdownHooks()) {
if (isShutdown()) {
return true;
}
// gracefulShutdownQuietPeriod為安靜期
// 隊列中有任務,稍等一會,直到沒有任務排隊等待安靜期,或者如果安靜期為0則終止
if (gracefulShutdownQuietPeriod == 0) {
return true;
}
// 這裡的wakeup方法上文已經分析過
wakeup(true);
return false;
}
final long nanoTime = ScheduledFutureTask.nanoTime();
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
}
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
// 每100ms檢查是否有任何任務添加到隊列中
wakeup(true);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
return false;
}
// 在最後的安靜期沒有添加任務,希望可以安全地關閉
return true;
}
到這裡,NioEventLoop的源碼解析就完成了。