天天看點

Netty源碼解析之NioEventLoop

閱讀須知

  • Netty版本:4.1.14.Final
  • 文章中使用注釋的方法會做深入分析

正文

Netty的EventLoop主要用于處理I/O操作,而NioEventLoop就是EventLoop的主要實作之一,它将Channel注冊到Selector,是以在事件循環中可以對這些進行多路複用。我們首先來看NioEventLoop的層次結構:

Netty源碼解析之NioEventLoop

我們來看它的構造方法:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
        SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    // 調用父類構造方法初始化任務隊列和各個屬性
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    /* 開啟多路複用器 */
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}
           

NioEventLoop:

private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }
    if (DISABLE_KEYSET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object> () {
        @Override
        public Object run() {
            try {
                return Class.forName(
                    "sun.nio.ch.SelectorImpl",
                    false,
                    PlatformDependent.getSystemClassLoader());
            } catch (Throwable cause) {
                return cause;
            }
        }
    });
    if (!(maybeSelectorImplClass instanceof Class) ||
        !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
        if (maybeSelectorImplClass instanceof Throwable) {
            Throwable t = (Throwable) maybeSelectorImplClass;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
        }
        return new SelectorTuple(unwrappedSelector);
    }
    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object> () {
        @Override
        public Object run() {
            try {
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);
                if (cause != null) {
                    return cause;
                }
                cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);
                if (cause != null) {
                    return cause;
                }
                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                return e;
            }
        }
    });
    if (maybeException instanceof Exception) {
        selectedKeys = null;
        Exception e = (Exception) maybeException;
        logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
        return new SelectorTuple(unwrappedSelector);
    }
    selectedKeys = selectedKeySet;
    logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
    return new SelectorTuple(unwrappedSelector,
        new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
           

方法很長,首先調用JDK NIO的API開啟Selector,剩餘核心邏輯就是根據設定來判斷是否采取selectedKeys優化功能。

在上面的結構圖中我們看到,NioEventLoop還實作了JDK的Executor,熟悉線程池的同學對這個接口一定不陌生,是以NioEventLoop也同樣可以執行Runnable任務,在Netty中NioEventLoop主要負責執行一些系統任務和定時任務,我們來看相關方法實作:

SingleThreadEventExecutor:

public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    // 判斷目前線程是否是目前EventLoop中的線程
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        // 添加任務到隊列中等待執行,添加失敗走拒絕政策
        addTask(task);
    } else {
        startThread(); /* 啟動線程 */
        // 添加任務到隊列中等待執行,添加失敗走拒絕政策
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            // 如果線程已關閉并且成功從隊列中移除了任務,則抛出異常拒絕任務
            reject();
        }
    }
    // addTaskWakesUp在NioEventLoop構造方法中傳入的是false
    // 判斷task是否不是NonWakeupRunnable類型
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        /* 喚醒selector */
        wakeup(inEventLoop);
    }
}
           

先來看wakeup方法的實作:

NioEventLoop:

protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
        // 可以讓尚未傳回的第一個select操作立即傳回
        selector.wakeup();
    }
}
           

NioEventLoop:

private void startThread() {
    if (state == ST_NOT_STARTED) {
        // CAS變更線程狀态,保證隻有一次成功
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            /* 啟動線程 */
            doStartThread();
        }
    }
}
           

SingleThreadEventExecutor:

private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }
            boolean success = false;
            updateLastExecutionTime();
            try {
                /* 子類實作run方法實作任務邏輯 */
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                // 自旋更新線程狀态為關閉
                for (;;) {
                    int oldState = state;
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }
                if (success && gracefulShutdownStartTime == 0) {
                    logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                        SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                        "before run() implementation terminates.");
                }
                try {
                    for (;;) {
                        // 運作所有剩餘的任務和關閉鈎子
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                        // 子類實作清理操作,預設空實作
                        // NioEventLoop實作的清理操作就是關閉selector
                        cleanup();
                    } finally {
                        // 更新線程狀态為終止
                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        threadLock.release(); // 釋放線程鎖
                        if (!taskQueue.isEmpty()) {
                            logger.warn(
                                "An event executor terminated with " +
                                "non-empty task queue (" + taskQueue.size() + ')');
                        }
                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });
}
           

NioEventLoop:

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    /* select操作 */
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            // ioRatio的含義是event loop中I/O所需時間的百分比
            // 預設值為50,這意味着event loop将嘗試花費與非I/O任務相同的I/O時間
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    /* 處理selectKey */
                    processSelectedKeys();
                } finally {
                    /* 確定我們一直運作任務 */
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    /* 處理selectKey */
                    processSelectedKeys();
                } finally {
                    final long ioTime = System.nanoTime() - ioStartTime;
                    /* 確定我們一直運作任務,有逾時時間 */
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // 即使循環處理引發異常,也始終處理shutdown
        try {
            if (isShuttingDown()) {
                /* 關閉所有Channel */
                closeAll();
                /* 确認shutdown */
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}
           

下面我們就來分析比較關鍵的select操作:

NioEventLoop:

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        // 計算下一個将要觸發的定時任務的逾時時間
        // delayNanos方法會擷取到下一個定時任務的觸發時間
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
            // 為逾時時間增加五毫秒的調整值
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000 L) / 1000000 L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    // 如果需要立即執行或者已經逾時,則執行輪詢操作
                    selector.selectNow();
                    // select計數器置為1
                    selectCnt = 1;
                }
                break;
            }
            // 如果在wakenUp值為true時送出了任務,則該任務沒有機會調用Selector的wakeup方法
            // 是以我們需要在執行select操作之前再次檢查任務隊列
            // 如果我們不這樣做,那麼任務可能會被挂起,直到select操作逾時
            // 如果IdleStateHandler存在于pipeline中,它可能會被挂起直到idle timeout
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            // 剩餘的逾時時間作為參數調用Seletor的select方法
            int selectedKeys = selector.select(timeoutMillis);
            // 自增select計數器
            selectCnt++;
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // 以下情況退出目前循環
                // 1、select操作selectedKeys不為0,說明有讀寫事件需要處理
                // 2、由使用者喚醒多路複用器或任務隊列具有待處理任務
                // 3、定時任務已準備好進行處理
                break;
            }
            if (Thread.interrupted()) {
                // 線程被中斷,是以重置selected keys并退出循環,這樣就不會在繁忙的循環中執行
                // 由于這很可能是使用者或其用戶端庫的處理程式中的錯誤,我們也會記錄它
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                        "Thread.currentThread().interrupt() was called. Use " +
                        "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }
            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // timeoutMillis已過,select沒有任何内容
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // selector連續多次select沒有内容,有可能觸發了JDK NIO epoll死循環導緻CPU 100%的bug
                // 重建selector解決
                logger.warn(
                    "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
                /* 重建selector */
                rebuildSelector();
                selector = this.selector;
                // 再次select以填充selectedKeys
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            currentTimeNanos = time;
        }
        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                    selectCnt - 1, selector);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                selector, e);
        }
    }
}
           

NioEventLoop:

public void rebuildSelector() {
    if (!inEventLoop()) {
        execute(new Runnable() {
            @Override
            public void run() {
                /* 目前線程不在event loop中,向executor送出新任務來重建selector */
                rebuildSelector0();
            }
        });
        return;
    }
    /* 重建selector */
    rebuildSelector0();
}
           

NioEventLoop:

private void rebuildSelector0() {
    final Selector oldSelector = selector;
    final SelectorTuple newSelectorTuple;
    if (oldSelector == null) {
        return;
    }
    try {
        // 建立新的selector
        newSelectorTuple = openSelector();
    } catch (Exception e) {
        logger.warn("Failed to create a new Selector.", e);
        return;
    }
    // 注冊所有的channel到新的selector
    int nChannels = 0;
    for (SelectionKey key: oldSelector.keys()) {
        Object a = key.attachment();
        try {
            if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                continue;
            }
            int interestOps = key.interestOps();
            // 取消select key并取消channel在selector上的注冊
            key.cancel();
            // 将channel注冊到新的selector
            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
            if (a instanceof AbstractNioChannel) {
                ((AbstractNioChannel) a).selectionKey = newKey;
            }
            nChannels++;
        } catch (Exception e) {
            logger.warn("Failed to re-register a Channel to the new Selector.", e);
            if (a instanceof AbstractNioChannel) {
                AbstractNioChannel ch = (AbstractNioChannel) a;
                ch.unsafe().close(ch.unsafe().voidPromise());
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, key, e);
            }
        }
    }
    selector = newSelectorTuple.selector;
    unwrappedSelector = newSelectorTuple.unwrappedSelector;
    try {
        // 關閉舊的selector,因為所有channel已經注冊到新的selector
        oldSelector.close();
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("Failed to close the old Selector.", t);
        }
    }
    logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
           

這樣,select方法就分析完成了,接下來肯定就是select key的處理了:

NioEventLoop:

private void processSelectedKeys() {
    if (selectedKeys != null) {
        /* 開啟優化的select key處理 */
        processSelectedKeysOptimized();
    } else {
        // 不開啟優化的select key處理
        processSelectedKeysPlain(selector.selectedKeys());
    }
}
           

兩個分支處理select key的邏輯是比較相似的,我們以開啟優化的方式為例:

NioEventLoop:

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // 将數組中已經取出的元素置為null,以便channel關閉時GC掉
        selectedKeys.keys[i] = null;
        final Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            /* 處理select key */
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        // needsToSelectAgain變量在run方法進入之前置為false
        if (needsToSelectAgain) {
            // 這裡也是為了GC
            selectedKeys.reset(i + 1);
            // 如果需要再次select則再次調用selecor的selectNow方法
            selectAgain();
            i = -1;
        }
    }
}
           

NioEventLoop:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // 如果Channel實作因為沒有event loop而抛出異常,我們會忽略它
            // 因為我們隻是試圖确定channel是否已注冊到此event loop,是以有權關閉channel
            return;
        }
        // 如果channel仍然注冊到此EventLoop,則僅關閉channel
        // channel可以從event loop中取消注冊,是以SelectionKey可以作為登出過程的一部分被取消
        // 但是該channel仍然是健康的,不應該關閉
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        // 如果SelectionKey無效則關閉channel
        unsafe.close(unsafe.voidPromise());
        return;
    }
    try {
        int readyOps = k.readyOps();
        // 我們首先需要在嘗試觸發read或write之前調用finishConnect
        // 否則NIO JDK channel實作可能會抛出NotYetConnectedException
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // 移除OP_CONNECT,否則Selector.select将始終傳回而不會阻塞
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }
        // 首先處理OP_WRITE,因為我們可以編寫一些排隊的緩沖區,是以可以釋放記憶體
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // 調用forceFlush,一旦沒有什麼可寫的,它也将清除OP_WRITE
            ch.unsafe().forceFlush();
        }
        // 同時檢查readOps為0以解決可能導緻死循環的JDK bug
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            // 讀操作
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        // 異常關閉channel
        unsafe.close(unsafe.voidPromise());
    }
}
           

這裡的各種對Channel的操作,如close、read等,我們在分析ServerSocketChannel的相關源碼中分析過,這裡不再重複。處理完select key之後,需要在finally塊中保證任務一直運作:

SingleThreadEventExecutor:

protected boolean runAllTasks(long timeoutNanos) {
    /* 從定時任務隊列中擷取任務 */
    fetchFromScheduledTaskQueue();
    // 從隊列中輪詢task,并且會檢查是否需要重新select
    Runnable task = pollTask();
    if (task == null) {
        // 子類擴充實作任務執行結束之後的操作
        // SingleThreadEventLoop覆寫此方法完成自己維護的tailTasks隊列中任務的執行
        afterRunningAllTasks();
        return false;
    }
    // 計算過期時間
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        // 執行task的run方法,會捕捉異常避免抛出對上層的影響
        safeExecute(task);
        runTasks++;
        // 每64個任務檢查逾時,因為nanoTime相對昂貴
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            // 逾時退出循環
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        // 繼續從隊列中擷取任務
        task = pollTask();
        if (task == null) {
            // 擷取不到任務記錄最後的執行時間并退出循環
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    // 同樣子類擴充實作任務執行結束之後的操作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}
           

SingleThreadEventExecutor:

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    // 從scheduledTaskQueue中擷取任務
    Runnable scheduledTask = pollScheduledTask(nanoTime);
    while (scheduledTask != null) {
        // 取出任務放入taskQueue中
        if (!taskQueue.offer(scheduledTask)) {
            // 任務隊列中沒有剩餘空間将其添加回scheduledTaskQueue,是以我們可以再次提取它
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        // 繼續循環擷取任務
        scheduledTask = pollScheduledTask(nanoTime);
    }
    return true;
}
           

回到run方法,接下來就是shutdown流程了:

NioEventLoop:

private void closeAll() {
    selectAgain();
    Set<SelectionKey> keys = selector.keys();
    Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
    for (SelectionKey k: keys) {
        // 循環從SelectionKey中擷取channel放入channels集合中
        Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            channels.add((AbstractNioChannel) a);
        } else {
            k.cancel();
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            invokeChannelUnregistered(task, k, null);
        }
    }
    // 周遊channels集合關閉channel
    for (AbstractNioChannel ch: channels) {
        ch.unsafe().close(ch.unsafe().voidPromise());
    }
}
           

NioEventLoop:

protected boolean confirmShutdown() {
    if (!isShuttingDown()) {
        return false;
    }
    if (!inEventLoop()) {
        throw new IllegalStateException("must be invoked from an event loop");
    }
    /* 取消定時任務 */
    cancelScheduledTasks();
    // gracefulShutdownStartTime為優雅停機開始時間
    if (gracefulShutdownStartTime == 0) {
        gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
    }
    // 運作所有剩餘的定時任務,并運作關閉鈎子
    if (runAllTasks() || runShutdownHooks()) {
        if (isShutdown()) {
            return true;
        }
        // gracefulShutdownQuietPeriod為安靜期
        // 隊列中有任務,稍等一會,直到沒有任務排隊等待安靜期,或者如果安靜期為0則終止
        if (gracefulShutdownQuietPeriod == 0) {
            return true;
        }
        // 這裡的wakeup方法上文已經分析過
        wakeup(true);
        return false;
    }
    final long nanoTime = ScheduledFutureTask.nanoTime();
    if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
        return true;
    }
    if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
        // 每100ms檢查是否有任何任務添加到隊列中
        wakeup(true);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        return false;
    }
    // 在最後的安靜期沒有添加任務,希望可以安全地關閉
    return true;
}
           

到這裡,NioEventLoop的源碼解析就完成了。

繼續閱讀