天天看點

Netty為啥可靠(二)Selector空輪詢處理防止線程跑飛記憶體保護

Selector空輪詢處理

在NIO中通過Selector的輪詢目前是否有IO事件,根據JDK NIO api描述,Selector的select方法會一直阻塞,直到IO事件達到或逾時,但是在Linux平台上這裡有時會出現問題,在某些場景下select方法會直接傳回,即使沒有逾時并且也沒有IO事件到達,這就是著名的epoll bug,這是一個比較嚴重的bug,它會導緻線程陷入死循環,會讓CPU飙到100%,極大地影響系統的可靠性,到目前為止,JDK都沒有完全解決這個問題。

但是Netty有效的規避了這個問題,經過實踐證明,epoll bug已Netty架構解決,Netty的處理方式是這樣的:

記錄select空轉的次數,定義一個閥值,這個閥值預設是512,可以在應用層通過設定系統屬性io.netty.selectorAutoRebuildThreshold傳入,當空轉的次數超過了這個閥值,重新建構新Selector,将老Selector上注冊的Channel轉移到建立的Selector上,關閉老Selector,用新的Selector代替老Selector,詳細實作可以檢視NioEventLoop中的selector和rebuildSelector方法:

for (;;) {
    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    if (timeoutMillis <= 0) {
        if (selectCnt == 0) {
            selector.selectNow();
            selectCnt = 1;
        }
        break;
    }

    int selectedKeys = selector.select(timeoutMillis);
    selectCnt ++;

    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {
        // Selected something,
        // waken up by user, or
        // the task queue has a pending task.
        break;
    }
    if (selectedKeys == 0 && Thread.interrupted()) {
        // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
        // As this is most likely a bug in the handler of the user or it's client library we will
        // also log it.
        //
        // See https://github.com/netty/netty/issues/2426
        if (logger.isDebugEnabled()) {
            logger.debug("Selector.select() returned prematurely because " +
                    "Thread.currentThread().interrupt() was called. Use " +
                    "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
        }
        selectCnt = 1;
        break;
    }
    if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
        // The selector returned prematurely many times in a row.
        // Rebuild the selector to work around the problem.
        logger.warn(
                "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                selectCnt);

        rebuildSelector();
        selector = this.selector;

        // Select again to populate selectedKeys.
        selector.selectNow();
        selectCnt = 1;
        break;
    }

    currentTimeNanos = System.nanoTime();
}
           
public void rebuildSelector() {
    if (!inEventLoop()) {
        execute(new Runnable() {
            @Override
            public void run() {
                rebuildSelector();
            }
        });
        return;
    }

    final Selector oldSelector = selector;
    final Selector newSelector;

    if (oldSelector == null) {
        return;
    }

    try {
        newSelector = openSelector();
    } catch (Exception e) {
        logger.warn("Failed to create a new Selector.", e);
        return;
    }

    // Register all channels to the new Selector.
    int nChannels = 0;
    for (;;) {
        try {
            for (SelectionKey key: oldSelector.keys()) {
                Object a = key.attachment();
                try {
                    if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                        continue;
                    }

                    int interestOps = key.interestOps();
                    key.cancel();
                    key.channel().register(newSelector, interestOps, a);
                    nChannels ++;
                } catch (Exception e) {
                    logger.warn("Failed to re-register a Channel to the new Selector.", e);
                    if (a instanceof AbstractNioChannel) {
                        AbstractNioChannel ch = (AbstractNioChannel) a;
                        ch.unsafe().close(ch.unsafe().voidPromise());
                    } else {
                        @SuppressWarnings("unchecked")
                        NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                        invokeChannelUnregistered(task, key, e);
                    }
                }
            }
        } catch (ConcurrentModificationException e) {
            // Probably due to concurrent modification of the key set.
            continue;
        }

        break;
    }

    selector = newSelector;

    try {
        // time to close the old selector as everything else is registered to the new one
        oldSelector.close();
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("Failed to close the old Selector.", t);
        }
    }

    logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
           

防止線程跑飛

線程是多路複用器的核心,所有IO事件執行的載體,一旦線程出現異常線程跑飛(run方法執行結束),那麼可能會導緻整個多路複用器不可用,導緻挂載在多路複用器上的連接配接不可用,進而大量的業務請求失敗。由于Netty中的同時處理IO事件和非IO事件邏輯,是以線程不僅僅要處理IO異常,業務測觸發的異常也需要被正确的處理,一旦處理不當,會導緻線程跑飛。Netty的處理是在run方法中catch所有的Throwable即所有的Exception和Error,不做任何處理,休眠1s繼續執行循環,休眠1s的目的是為了防止捕獲異常之後繼續執行再次進入該異常形成死循環。實作代碼在NioEventLoop的run方法中:

@Override
protected void run() {
    for (;;) {
        oldWakenUp = wakenUp.getAndSet(false);
        try {
            ...
        } catch (Throwable t) {
            logger.warn("Unexpected exception in the selector loop.", t);

            // Prevent possible consecutive immediate failures that lead to
            // excessive CPU consumption.
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // Ignore.
            }
        }
    }
}
           

記憶體保護

ByteBuf記憶體洩露保護

為了提升記憶體的使用率,Netty提供了記憶體池和對象池,記憶體洩露保護主要是針對Netty中的記憶體池的,Netty要求在使用完記憶體池中的記憶體之後要顯示的歸還,以免記憶體中的對象存在額外的引用造成記憶體洩露,Netty提供了SimpleChannelInboundHandler,該處理器會自動釋放記憶體,使用者可以直接繼承該處理器,它的channelRead方法的finally塊中調用了釋放記憶體的方法,另外記憶體洩露監控處理可以參考ResourceLeakDetector類中的代碼:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    boolean release = true;
    try {
        if (acceptInboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I imsg = (I) msg;
            channelRead0(ctx, imsg);
        } else {
            release = false;
            ctx.fireChannelRead(msg);
        }
    } finally {
        if (autoRelease && release) {
            ReferenceCountUtil.release(msg);
        }
    }
}
           

ByteBuf的記憶體溢出保護

為了防止一些超長的惡意流量耗盡伺服器記憶體壓垮伺服器,有必要會緩存區設定上限,Netty做了如下處理:

  • 在記憶體配置設定的時候指定緩沖區長度上限(io.netty.buffer.ByteBufAllocator.buffer(int, int))。
  • 在對緩沖區進行寫入操作的時候,如果緩沖區容量不足需要擴充,首先對最大容量進行判斷,如果擴充後的容量超過上限,則拒絕擴充(io.netty.buffer.ByteBuf.ensureWritable(int)方法中處理)。
  • 在解碼的時候,對消息長度進行判斷,如果超過最大容量上限,則抛出解碼異常,拒絕配置設定記憶體(io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(ChannelHandlerContext, ByteBuf)方法中處理,在fail方法中抛出TooLongFrameException異常)。