天天看點

Netty之EventLoop和EventLoopGroup

https://blog.csdn.net/twt936457991/article/details/89854851

https://www.cnblogs.com/lovezmc/p/11547912.html

1.Netty線程模型

Netty線程模型并不是一成不變的,它實際取決于使用者的啟動參數配置。通過設定不同的啟動參數,Netty可以同時支援Reactor單線程模型、多線程模型和主從Reactor多線程模型。

Netty之EventLoop和EventLoopGroup

可以通過下圖Netty服務端啟動代碼來了解它的線程模型

Netty之EventLoop和EventLoopGroup

服務端啟動的時候,建立了兩個NioEventLoopGroup,它們實際上是兩個獨立的Reactor線程池。一個用于接收用戶端的TCP連接配接,另一個用于處理I/O相關的讀寫操作,或者執行系統Task、定時任務Task等。

Netty用于接收用戶端請求的線程池職責如下:

  • 接收用戶端TCP連接配接,初始化Channel參數;
  • 将鍊路狀态變更事件通知給ChannelPipeline;

Netty處理I/O操作的Reactor線程池職責如下:

  • 異步讀取通信對端的資料報,發送讀事件到ChannelPipeline;
  • 異步發送消息到通信對端,調用ChannelPipeline的消息發送接口;
  • 執行系統調用Task;
  • 執行定時任務Task,例如鍊路空閑狀态監測定時任務。

通過調整線程池的線程個數、是否共享線程池等方式,Netty的Reactor線程模型可以在單線程、多線程和主從多線程間切換,這種靈活的配置方式可以最大程度的滿足不同使用者的個性化定制。

為了盡可能提升性能,Netty在很多地方進行了無鎖化的設計,例如在I/O線程内部進行串行操作,避免多線程競争導緻的性能下降問題。表面上看,串行化設計似乎CPU使用率不高,并發程度不夠。但是,通過調整NIO線程池的線程參數,可以同時啟動多個串行化的線程并行運作,這種局部無鎖化的串行線程設計相比一個隊列—多個工作線程的模型性能更優。

它的設計原理如圖:

Netty之EventLoop和EventLoopGroup

Netty的NioEventLoop讀取到消息之後,直接調用ChannelPipeline的fireChannelRead(Object msg)。隻要使用者不主動切換線程,一直都是由NioEventLoop調用使用者的Handler,期間不進行線程切換。這種串行化處理方式避免了多線程操作導緻的鎖的競争,從性能角度看是最優的。

1.1Netty的多線程程式設計最佳實踐

(1)建立兩個NioEventLoopGroup,用于邏輯隔離NIOAcceptor和NIOI/O線程。

(2)盡量不要在ChannelHandler中啟動使用者線程(解碼後用于将POJO消息派發到後端業務線程的除外)。

(3)解碼要放在NIO線程調用的解碼Handler中進行,不要切換到使用者線程中完成消息的解碼。

(4)如果業務邏輯操作非常簡單,沒有複雜的業務邏輯計算,沒有可能

可能會導緻線程被阻塞的磁盤操作、資料庫操作、網路操作等,可以直接在NIO線程上完成業務邏輯編排,不需要切換到使用者線程。

(5)如果業務邏輯處理複雜,不要在NIO線程上完成,建議将解碼後的POJO消息封裝成Task,派發到業務線程池中由業務線程執行,以保證NIO線程盡快被釋放,處理其他的I/O操作。

推薦的線程數量計算公式有以下兩種:

  • 公式一:線程數量=(線程總時間/瓶頸資源時間)*瓶頸資源的線程并行數;
  • 公式二:QPS=1000/線程總時間*線程數;

由于使用者場景的不同,對于一些複雜的系統,實際上很難計算出最優線程配置,指南根據測試資料和使用者場景,結合公式給出一個相對合理的範圍,然後對範圍内的資料進行性能測試,選擇相對最優值。

2.NioEventLoop源碼分析

2.1 NioEventLoop設計原理

Netty的NioEventLoop并不是一個純粹的I/O線程,它除了負責I/O的讀寫之外,還兼顧處理以下兩類任務:

  • 系統Task:通過調用NioEventLoop的execute(Runnable task)方法實作,Netty有很多系統Task,建立它們的主要原因是:當I/O線程和使用者線程同時操作網絡資源時,為了防止并發操作導緻的鎖競争,将使用者線程的操作封裝成Task放入消息隊列中,由I/O線程負責執行,這樣就實作了局部無鎖化;
  • 定時任務:通過調用NioEventLoop的schedule(Runnable command,long delay,TimeUnit unit)方法實作。

2.2NioEventLoop繼承關系類圖

Netty之EventLoop和EventLoopGroup

2.3 NioEventLoop

作為NIO架構的Reactor線程,NioEventLoop需要處理網絡I/O線程讀寫事件,是以它必須聚合一個多路複用器對象。它的Selector定義如下

Netty之EventLoop和EventLoopGroup

Selector初始化非常簡單,直接調用Selector.open()方法九年建立并打開一個新的Selector。Netty對Selector的selectedKeys進行了優化,使用者可以通過io.netty.noKeySet Optimization開關決定是否啟用該優化項。預設不打卡selectedKeys優化功能。

Selector初始化如下

private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        if (DISABLE_KEY_SET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        if (!(maybeSelectorImplClass instanceof Class) ||
            // ensure the current selector implementation is what we can instrument.
            !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                        // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                        // This allows us to also do this in Java9+ without any extra flags.
                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset =
                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                            PlatformDependent.putObject(
                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                            PlatformDependent.putObject(
                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        }
                        // We could not retrieve the offset, lets try reflection as last-resort.
                    }

                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }

                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });

        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
            return new SelectorTuple(unwrappedSelector);
        }
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }
           

如果沒有開啟selectedKeys優化開關,通過provider.openSelector()建立并打開多路複用器之後就立即傳回。如果開啟了優化開關,需要通過反射的方式從Selector執行個體中擷取selectedKeys和publicSelectedKeys,将上述兩個成員變量設定為可寫,通過反射的方式使用Netty構造的selectedKeys包裝類selectedKeySet将原JDK的selectedKeys替換掉。

下面重點看NioEventLoop的run方法實作

@Override
    protected void run() {
        for (;;) {
            try {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        // 'wakenUp.compareAndSet(false, true)' is always evaluated
                        // before calling 'selector.wakeup()' to reduce the wake-up
                        // overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        // However, there is a race condition in this approach.
                        // The race condition is triggered when 'wakenUp' is set to
                        // true too early.
                        //
                        // 'wakenUp' is set to true too early if:
                        // 1) Selector is waken up between 'wakenUp.set(false)' and
                        //    'selector.select(...)'. (BAD)
                        // 2) Selector is waken up between 'selector.select(...)' and
                        //    'if (wakenUp.get()) { ... }'. (OK)
                        //
                        // In the first case, 'wakenUp' is set to true and the
                        // following 'selector.select(...)' will wake up immediately.
                        // Until 'wakenUp' is set to false again in the next round,
                        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                        // any attempt to wake up the Selector will fail, too, causing
                        // the following 'selector.select(...)' call to block
                        // unnecessarily.
                        //
                        // To fix this problem, we wake up the selector again if wakenUp
                        // is true immediately after selector.select(...).
                        // It is inefficient in that it wakes up the selector for both
                        // the first case (BAD - wake-up required) and the second case
                        // (OK - no wake-up required).

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    handleLoopException(e);
                    continue;
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                //ioRatio 變量表示的就是此線程配置設定給 IO 操作所占的時間比(即 運作processSelectedKeys 耗時在整個循環中所占用的時間)
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
           

所有的邏輯操作都在for循環體内進行,隻有當NioEventLoop接收到退出指令的時候,才退出循環,否則一直執行下去,這也是通用的NIO線程實作方式。

首先需要将wakenUp還原為false,并将之前的wake up專題太儲存到oldWakenUp變量中。通過hasTasks()方法判斷目前的消息隊列中是否有消息尚未處理,如果有則調用selectNow()方法立即進行依次select操作,看是否有準備就緒的Channel需要處理,實作如下

int selectNow() throws IOException {
        try {
            return selector.selectNow();
        } finally {
            // restore wakeup state if needed
            if (wakenUp.get()) {
                selector.wakeup();
            }
        }
    }

    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
            if (nextWakeupTime != normalizedDeadlineNanos) {
                nextWakeupTime = normalizedDeadlineNanos;
            }

            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The code exists in an extra method to ensure the method is not too big to inline as this
                    // branch is not very likely to get hit very frequently.
                    selector = selectRebuildSelector(selectCnt);
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }
           

Selector的selectNow()方法會立即觸發Selector的選擇操作,如果有準備就緒的Channel,則傳回就緒Channel的集合,否則傳回0.選擇完成之後,再次判斷使用者是否調用了Selector的wakeup方法,如果調用,則執行selector.wakeup()操作。

select方法中,調用delayNanos()方法計算獲得NioEventLoop中定時任務的觸發時間。計算下一個将要觸發的定時任務的剩餘逾時時間,将他轉換為毫秒,為逾時時間增加0.5毫秒的調整至。對剩餘的逾時時間進行判斷,如果需要立即執行或已經逾時,則調用selector.selectNow()進行輪詢操作,将selectCnt設定為1,并退出目前循環。

将定時任務剩餘的逾時時間作為參數進行select操作,每完成一次select操作,對select計數器selectCnt加1。Select操作完成之後,需要對結果進行判斷,如果存在下列任意一種情況,則退出目前循環。

  • 有Channel處于就緒狀态,selectedKeys不為0,說明有讀寫事件需要處理;
  • oldWakenUp為true;
  • 系統或使用者調用了wakeup操作,喚醒目前的多路複用器;
  • 消息隊列中有新的任務需要處理。

    如果本次Selector的輪詢結果為空,也沒有wakeup操作或是新的消息需要處理,則說明是個空輪詢,有可能觸發了JDK的epollbug,它會導緻Selector的空輪詢,使I/O線程一直處于100%狀态。該問題的修複政策如下:

    (1) 對Selector的select操作周期進行統計;

    (2)每完成一次空的select操作進行一次計數;

    (3)在某個周期内如果連續發生N次空輪詢,說明出發了JDK NIO的epoll()死循環。

    監測到Selector處于死循環後,需要通過重建Selector的方式讓系統恢複正常:

/**
     * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
     * around the infamous epoll 100% CPU bug.
     */
    public void rebuildSelector() {
        if (!inEventLoop()) {
            execute(new Runnable() {
                @Override
                public void run() {
                    rebuildSelector0();
                }
            });
            return;
        }
        rebuildSelector0();
    }


private void rebuildSelector0() {
        final Selector oldSelector = selector;
        final SelectorTuple newSelectorTuple;

        if (oldSelector == null) {
            return;
        }

        try {
            newSelectorTuple = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }

        // Register all channels to the new Selector.
        int nChannels = 0;
        for (SelectionKey key: oldSelector.keys()) {
            Object a = key.attachment();
            try {
                if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                    continue;
                }

                int interestOps = key.interestOps();
                key.cancel();
                SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
                if (a instanceof AbstractNioChannel) {
                    // Update SelectionKey
                    ((AbstractNioChannel) a).selectionKey = newKey;
                }
                nChannels ++;
            } catch (Exception e) {
                logger.warn("Failed to re-register a Channel to the new Selector.", e);
                if (a instanceof AbstractNioChannel) {
                    AbstractNioChannel ch = (AbstractNioChannel) a;
                    ch.unsafe().close(ch.unsafe().voidPromise());
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    invokeChannelUnregistered(task, key, e);
                }
            }
        }

        selector = newSelectorTuple.selector;
        unwrappedSelector = newSelectorTuple.unwrappedSelector;

        try {
            // time to close the old selector as everything else is registered to the new one
            oldSelector.close();
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to close the old Selector.", t);
            }
        }

        if (logger.isInfoEnabled()) {
            logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
        }
    }
           

首先通過inEventLoop()方法判斷是否是其他線程發起的rebuildSelector,如果由其他線程發起,為了避免多線程并發操作Selector和其他資源,需要将rebuildSelector封裝成Task,放到NioEventLoop的消息隊列中,由NioEventLoop線程負責調用,這樣就避免了多線程并發操作導緻的線程安全問題。調用openSelector方法建立并打開新的Selector,通過循環,将原Selector上注冊的SocketChannel從舊的Selector上去注冊,重新注冊到新的Selector上,并将老的Selector關閉。

3.NioEventLoopGroup

public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }
    
    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        super(nThreads, executor, selectorProvider);
    }
    
    // DEFAULT_EVENT_LOOP_THREADS = CPU個數*2
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    
    // EventLoop數組
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
    
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0]);
    }
           

NioEventLoop的配置設定

  當一個新的Channel連接配接時,NioEventLoopGroup需要拿出一個NioEventLoop讓Channel綁定,這個Channel之後的IO操作都在這個NioEventLoop上操作。

public EventExecutor next() {
        return children[Math.abs(childIndex.getAndIncrement() % children.length)];
    }
           

4.EventLoop和EventLoopGroup總結

為了解決系統在運作過程中,如果頻繁的進行線程上下文切換,會帶來額外的性能損耗問題,Netty采用了串行化設計理念,從消息的讀取、編碼以及後續 ChannelHandler 的執行,始終都由 IO 線程 EventLoop 負責,這就意外着整個流程不會進行線程上下文的切換,資料也不會面臨被并發修改的風險。

EventLoopGroup 是一組 EventLoop 的抽象,一個 EventLoopGroup 當中會包含一個或多個 EventLoop,EventLoopGroup 提供 next 接口,可以從一組 EventLoop 裡面按照一定規則擷取其中一個 EventLoop 來處理任務。

在 Netty 伺服器端程式設計中我們需要 BossEventLoopGroup 和WorkerEventLoopGroup 兩個 EventLoopGroup 來進行工作。

BossEventLoopGroup 通常是一個單線程的 EventLoop,EventLoop 維護着一個注冊了 ServerSocketChannel 的 Selector 執行個體,EventLoop 的實作涵蓋 IO 事件的分離,和分發(Dispatcher),EventLoop 的實作充當 Reactor 模式中的分發(Dispatcher)的角色。

是以通常可以将 BossEventLoopGroup 的線程數參數為 1。

BossEventLoop 隻負責處理連接配接,故開銷非常小,連接配接到來,馬上按照政策将SocketChannel 轉發給 WorkerEventLoopGroup,WorkerEventLoopGroup 會由 next 選擇其中一個 EventLoop 來将這 個SocketChannel 注冊到其維護的 Selector 并對其後續的 IO 事件進行處理。

ChannelPipeline 中的每一個 ChannelHandler 都是通過它的 EventLoop(I/O 線程)來處理傳遞給它的事件的。是以至關重要的是不要阻塞這個線程,因為這會對整體的 I/O 處理産生嚴重的負面影響。但有時可能需要與那些使用阻塞 API 的遺留代碼進行互動。

對于這種情況, ChannelPipeline 有一些接受一個 EventExecutorGroup 的 add() 方法。如果一個事件被傳遞給一個自定義的 EventExecutorGroup, DefaultEventExecutorGroup 的預設實作。

static final EventExecutor group = new DefaultEventExecutorGroup(16);
...
ChannelPipeline p = ch.pipeline();
pipeline.addLast(group, "handler", new MyChannelHandler());

           

綜述如下:

  • NioEventLoopGroup 實際上就是個線程池,一個 EventLoopGroup 包含一個或者多個 EventLoop;
  • 一個 EventLoop 在它的生命周期内隻和一個 Thread 綁定;
  • 所有有 EnventLoop 處理的 I/O 事件都将在它專有的 Thread 上被處理;
  • 一個 Channel 在它的生命周期内隻注冊于一個 EventLoop;
  • 每一個 EventLoop 負責處理一個或多個 Channel;

《Netty權威指南》

部分綜述參考: netty中的EventLoopGroup 和 EventLoop