天天看點

Netty源碼分析(二)之NioEventLoopnetty版本NioEventLoop

文章目錄

  • netty版本
  • NioEventLoop
    • NioEventLoop 構造方法分析
      • SingleThreadEventLoop構造方法
      • SingleThreadEventExecutor構造方法
      • openSelector
      • SelectedSelectionKeySet
    • JDK Bug解決方案
    • run方法
      • processSelectedKeys
    • select()方法
      • rebuildSelector

netty版本

  1. 使用的netty版本是

    io.netty:netty-all:4.1.33.Final

NioEventLoop

  1. uml圖
    Netty源碼分析(二)之NioEventLoopnetty版本NioEventLoop
  2. NioEventLoop

    是一個無限循環(

    Loop

    ),在循環中不斷處理接收到的事件

NioEventLoop 構造方法分析

  1. 構造方法,根據

    NioEventLoopGroup

    源碼的分析可知,

    NioEventLoop

    的構造方法調用是在

    newChild()

    (

    NioEventLoopGroup

    實作類的

    newChild

    方法)時。構造方法執行順序
    /**
         * @param parent 該NioEventLoop所在的線程池
         * @param executor
         * @param selectorProvider 用于建立selector
         * @param strategy
         * @param rejectedExecutionHandler
         */
        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
            //DEFAULT_MAX_PENDING_TASKS 隊列最大待處理任務      
            super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
            provider = selectorProvider;
            final SelectorTuple selectorTuple = openSelector();
            //SelectedSelectionKeySetSelector對象
            selector = selectorTuple.selector;
            unwrappedSelector = selectorTuple.unwrappedSelector;
            selectStrategy = strategy;
        }	
               

SingleThreadEventLoop構造方法

  1. 調用的構造方法,

    SingleThreadEventLoop

    newTaskQueue

    方法使用的是

    LinkedBlockingQueue

    隊列,子類

    NioEventLoop

    使用的是

    MpsQueue

    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
            tailTasks = newTaskQueue(maxPendingTasks);
        }
        //父類SingleThreadEventExecutor
        protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
            return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
        }
    
               
  2. NioEventLoop

    newTaskQueue

    方法
    @Override
        protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
            // This event loop never calls takeTask()
            return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                                                        : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
        }
    
               

SingleThreadEventExecutor構造方法

  1. 調用的構造方法
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        //存放任務的
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
    
               

openSelector

  1. 如果沒有開啟優化屬性,則直接傳回,
    private SelectorTuple openSelector() {
            final Selector unwrappedSelector;
            try {
                unwrappedSelector = provider.openSelector();
            } catch (IOException e) {
                throw new ChannelException("failed to open a new selector", e);
            }
    
            //如果沒有開啟KEYSET優化,Jdk建立的Selector直接傳回
            if (DISABLE_KEYSET_OPTIMIZATION) {
                return new SelectorTuple(unwrappedSelector);
            }
            //AccessController.doPrivileged表示不用做權限檢查
            Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        return Class.forName(
                                "sun.nio.ch.SelectorImpl",
                                false,
                                PlatformDependent.getSystemClassLoader());
                    } catch (Throwable cause) {
                        return cause;
                    }
                }
            });
    
            if (!(maybeSelectorImplClass instanceof Class) ||
                    // ensure the current selector implementation is what we can instrument.
                    !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
                if (maybeSelectorImplClass instanceof Throwable) {
                    Throwable t = (Throwable) maybeSelectorImplClass;
                    logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
                }
                return new SelectorTuple(unwrappedSelector);
            }
    
            final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
            //建立一個存放SelectionKey的對象,本質上是一個一維數組
            final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
            Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    
                        Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                        if (cause != null) {
                            return cause;
                        }
                        cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                        if (cause != null) {
                            return cause;
                        }
                        //反射強制将SelectorImpl中的selectedKeys替換為優化版的SelectedSelectionKeySet對象
                        selectedKeysField.set(unwrappedSelector, selectedKeySet);
                        //反射強制将SelectorImpl中的publicSelectedKeys替換為優化版的SelectedSelectionKeySet對象
                        publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                        return null;
                    } catch (NoSuchFieldException e) {
                        return e;
                    } catch (IllegalAccessException e) {
                        return e;
                    }
                }
            });
    
            if (maybeException instanceof Exception) {
                selectedKeys = null;
                Exception e = (Exception) maybeException;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
                return new SelectorTuple(unwrappedSelector);
            }
            selectedKeys = selectedKeySet;
            logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
            //SelectorTuple是一個封裝了原始selector對象和封裝後selector對象(即,SelectedSelectionKeySetSelector對象)的類
            return new SelectorTuple(unwrappedSelector,
                                     new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
        }
        
        
         private static final class SelectorTuple {
            final Selector unwrappedSelector;
            final Selector selector;
    
            SelectorTuple(Selector unwrappedSelector) {
                this.unwrappedSelector = unwrappedSelector;
                this.selector = unwrappedSelector;
            }
    
            SelectorTuple(Selector unwrappedSelector, Selector selector) {
                this.unwrappedSelector = unwrappedSelector;
                this.selector = selector;
            }
        }
    
               

SelectedSelectionKeySet

  1. 如果開啟優化(

    -Dio.netty.noKeySetOptimization

    ),netty反射強制将

    SelectorImpl

    中的

    selectedKeys

    替換為優化版的

    SelectedSelectionKeySet

    對象,這使

    Selector

    中所有對

    selectedKeys

    publicSelectedKeys

    的操作實際上就是對

    SelectedSelectionKeySet

    的操作。

    SelectedSelectionKeySet

    本質是一個一維動态數組,每次擴充1倍容量
    final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    
        SelectionKey[] keys;
        int size;
    
        SelectedSelectionKeySet() {
            keys = new SelectionKey[1024];
        }
    
        @Override
        public boolean add(SelectionKey o) {
            if (o == null) {
                return false;
            }
    
            keys[size++] = o;
            if (size == keys.length) {
                increaseCapacity();
            }
    
            return true;
        }
    
        @Override
        public int size() {
            return size;
        }
    
        @Override
        public boolean remove(Object o) {
            return false;
        }
    
        @Override
        public boolean contains(Object o) {
            return false;
        }
    
        @Override
        public Iterator<SelectionKey> iterator() {
            throw new UnsupportedOperationException();
        }
    
        void reset() {
            reset(0);
        }
    
        void reset(int start) {
            Arrays.fill(keys, start, size, null);
            size = 0;
        }
    	//擴大一倍,從1024變為2048
        private void increaseCapacity() {
            SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
            System.arraycopy(keys, 0, newKeys, 0, size);
            keys = newKeys;
        }
    }
    
    
               

JDK Bug解決方案

  1. Selector.open()

    空指針bug:

    sun.nio.ch.Util

    中包含一段非線程安全的代碼,可能會抛出

    NullPointerException

    異常。

    線程1

    執行到⑤,未執行到⑥,

    線程2

    執行了②,此時

    bugLevel

    可能為null,即

    bugLevel

    被重置為null,導緻

    線程1

    執行到⑥時抛出

    NullPointerException

    異常,此bug詳細參見bug庫

    https://bugs.java.com/view_bug.do?bug_id=6427854

    sun.nio.ch.Util contains code which is not thread safe and can throw a NullPointerException:
    	
    	private static String bugLevel = null;
    	
    	static boolean atBugLevel(String bl) {		// package-private
    	 if (bugLevel == null) { ①
    	     if (!sun.misc.VM.isBooted())
    	           return false;
    	     java.security.PrivilegedAction pa =
    	                new GetPropertyAction("sun.nio.ch.bugLevel");
    	// the next line can reset bugLevel to null
    	     bugLevel = (String)AccessController.doPrivileged(pa);②
    	     if (bugLevel == null)③
    	           bugLevel = "";④
    	     }
    	     return (bugLevel != null) ⑤&& bugLevel.equals(bl)⑥;
    	 }
    			
               
  2. 使用

    linux核心2.4

    并且同時使用

    jdk6u4

    或者

    jdk7b12

    以下版本,epoll-bug會導緻JDK中的

    Selector.select()

    在沒有感興趣的事件發生時,無效的被調用。此時本應該阻塞的

    Selector.select()

    确沒有阻塞,而是傳回0,導緻CPU空輪詢,緻使IO線程CPU 100%.bug庫位址

    https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6403933

  3. netty中的解決方案
    • Selector.open()

      空指針問題:在

      Selector.open()

      之前,如果

      sun.nio.ch.bugLevel

      為空,先将

      sun.nio.ch.bugLevel

      設定為空字元串
    • epoll 空輪詢導緻CPU 100%問題

      :對

      select()

      傳回0的操作計數(JDK bug導緻

      select()

      不阻塞而傳回0),如果次數大于門檻值

      SELECTOR_AUTO_REBUILD_THRESHOLD

      就建立一個

      selector

      ,将注冊到

      老的selector

      上的

      channel

      重新注冊到

      新的selector

      上。通過

      -Dio.netty.selectorAutoRebuildThreshold

      設定系統變量設定值,預設門檻值512,如果設定的門檻值小于

      MIN_PREMATURE_SELECTOR_RETURNS=3

      ,則設定為0
    static {
        /**
         * 解決`Selector.open()`空指針bug,在`Selector.open()`之前,如果`sun.nio.ch.bugLevel`為空,
         * 先将`sun.nio.ch.bugLevel`設定為空字元串
         */
        final String key = "sun.nio.ch.bugLevel";
        final String buglevel = SystemPropertyUtil.get(key);
        if (buglevel == null) {
            try {
                AccessController.doPrivileged(new PrivilegedAction<Void>() {
                    @Override
                    public Void run() {
                        System.setProperty(key, "");
                        return null;
                    }
                });
            } catch (final SecurityException e) {
                logger.debug("Unable to get/set System Property: " + key, e);
            }
        }
    
        /**
         * 對select傳回0的操作計數(JDK bug導緻select()不阻塞而傳回0),如果次數大于門檻值SELECTOR_AUTO_REBUILD_THRESHOLD就建立一個selector,
         * 将注冊到老的selector上的channel重新注冊到新的selector上。通過-Dio.netty.selectorAutoRebuildThreshold設定系統變量設定值
         * 預設門檻值512,如果設定的門檻值小于MIN_PREMATURE_SELECTOR_RETURNS=3,則設定為0
         */
        int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
        if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
            selectorAutoRebuildThreshold = 0;
        }
    
        SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
    
        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION);
            logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
        }
    }	
               

run方法

  1. run
    @Override
        protected void run() {
            for (;;) {
                try {
                    try {
                        switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
    
                        case SelectStrategy.BUSY_WAIT:
                            // fall-through to SELECT since the busy-wait is not supported with NIO
    
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
    
                            // 'wakenUp.compareAndSet(false, true)' is always evaluated
                            // before calling 'selector.wakeup()' to reduce the wake-up
                            // overhead. (Selector.wakeup() is an expensive operation.)
                            //
                            // However, there is a race condition in this approach.
                            // The race condition is triggered when 'wakenUp' is set to
                            // true too early.
                            //
                            // 'wakenUp' is set to true too early if:
                            // 1) Selector is waken up between 'wakenUp.set(false)' and
                            //    'selector.select(...)'. (BAD)
                            // 2) Selector is waken up between 'selector.select(...)' and
                            //    'if (wakenUp.get()) { ... }'. (OK)
                            //
                            // In the first case, 'wakenUp' is set to true and the
                            // following 'selector.select(...)' will wake up immediately.
                            // Until 'wakenUp' is set to false again in the next round,
                            // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                            // any attempt to wake up the Selector will fail, too, causing
                            // the following 'selector.select(...)' call to block
                            // unnecessarily.
                            //
                            // To fix this problem, we wake up the selector again if wakenUp
                            // is true immediately after selector.select(...).
                            // It is inefficient in that it wakes up the selector for both
                            // the first case (BAD - wake-up required) and the second case
                            // (OK - no wake-up required).
    
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                            // fall through
                        default:
                        }
                    } catch (IOException e) {
                        // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                        // the selector and retry. https://github.com/netty/netty/issues/8566
                        rebuildSelector0();
                        handleLoopException(e);
                        continue;
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    
               

processSelectedKeys

select()方法

rebuildSelector

  1. rebuildSelector

    的過程
    • 建立一個新的

      Selector

    • 取消

      Channel

      OldSelector

      上的

      SelectionKey

    • OldSelector

      上的所有

      Channel

      重新注冊到新的

      Selector

    • 關閉

      OldSelector

      釋放資源
  2. rebuildSelector

    源碼
    private void rebuildSelector0() {
        final Selector oldSelector = selector;
        final SelectorTuple newSelectorTuple;
    
        if (oldSelector == null) {
            return;
        }
    
        try {
            newSelectorTuple = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }
    
        // Register all channels to the new Selector.
        int nChannels = 0;
        for (SelectionKey key: oldSelector.keys()) {
            Object a = key.attachment();
            try {
                //keyFor 擷取目前Channel注冊在指定Selector上的鍵,如果沒有注冊将傳回null
                if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                    continue;
                }
                //取消SelectionKey在OldSelector上的事件注冊
                int interestOps = key.interestOps();
                key.cancel();
                //Channel注冊在新的selector上
                SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
                if (a instanceof AbstractNioChannel) {
                    // Update SelectionKey
                    ((AbstractNioChannel) a).selectionKey = newKey;
                }
                nChannels ++;
            } catch (Exception e) {
                logger.warn("Failed to re-register a Channel to the new Selector.", e);
                if (a instanceof AbstractNioChannel) {
                    AbstractNioChannel ch = (AbstractNioChannel) a;
                    ch.unsafe().close(ch.unsafe().voidPromise());
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    invokeChannelUnregistered(task, key, e);
                }
            }
        }
    
        selector = newSelectorTuple.selector;
        unwrappedSelector = newSelectorTuple.unwrappedSelector;
    
        try {
            // time to close the old selector as everything else is registered to the new one
            oldSelector.close();
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to close the old Selector.", t);
            }
        }
    
        if (logger.isInfoEnabled()) {
            logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
        }
    }