文章目錄
- netty版本
- NioEventLoop
-
- NioEventLoop 構造方法分析
-
- SingleThreadEventLoop構造方法
- SingleThreadEventExecutor構造方法
- openSelector
- SelectedSelectionKeySet
- JDK Bug解決方案
- run方法
-
- processSelectedKeys
- select()方法
-
- rebuildSelector
netty版本
- 使用的netty版本是
io.netty:netty-all:4.1.33.Final
NioEventLoop
- uml圖
Netty源碼分析(二)之NioEventLoopnetty版本NioEventLoop -
是一個無限循環(NioEventLoop
),在循環中不斷處理接收到的事件Loop
NioEventLoop 構造方法分析
- 構造方法,根據
源碼的分析可知,NioEventLoopGroup
的構造方法調用是在NioEventLoop
(newChild()
實作類的NioEventLoopGroup
方法)時。構造方法執行順序newChild
/** * @param parent 該NioEventLoop所在的線程池 * @param executor * @param selectorProvider 用于建立selector * @param strategy * @param rejectedExecutionHandler */ NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { //DEFAULT_MAX_PENDING_TASKS 隊列最大待處理任務 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); //SelectedSelectionKeySetSelector對象 selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }
SingleThreadEventLoop構造方法
- 調用的構造方法,
中SingleThreadEventLoop
方法使用的是newTaskQueue
隊列,子類LinkedBlockingQueue
使用的是NioEventLoop
MpsQueue
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); tailTasks = newTaskQueue(maxPendingTasks); } //父類SingleThreadEventExecutor protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { return new LinkedBlockingQueue<Runnable>(maxPendingTasks); }
-
的NioEventLoop
方法newTaskQueue
@Override protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue() : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); }
SingleThreadEventExecutor構造方法
- 調用的構造方法
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); //存放任務的 taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
openSelector
- 如果沒有開啟優化屬性,則直接傳回,
private SelectorTuple openSelector() { final Selector unwrappedSelector; try { unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } //如果沒有開啟KEYSET優化,Jdk建立的Selector直接傳回 if (DISABLE_KEYSET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); } //AccessController.doPrivileged表示不用做權限檢查 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } return new SelectorTuple(unwrappedSelector); } final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; //建立一個存放SelectionKey的對象,本質上是一個一維數組 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; } //反射強制将SelectorImpl中的selectedKeys替換為優化版的SelectedSelectionKeySet對象 selectedKeysField.set(unwrappedSelector, selectedKeySet); //反射強制将SelectorImpl中的publicSelectedKeys替換為優化版的SelectedSelectionKeySet對象 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new SelectorTuple(unwrappedSelector); } selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); //SelectorTuple是一個封裝了原始selector對象和封裝後selector對象(即,SelectedSelectionKeySetSelector對象)的類 return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); } private static final class SelectorTuple { final Selector unwrappedSelector; final Selector selector; SelectorTuple(Selector unwrappedSelector) { this.unwrappedSelector = unwrappedSelector; this.selector = unwrappedSelector; } SelectorTuple(Selector unwrappedSelector, Selector selector) { this.unwrappedSelector = unwrappedSelector; this.selector = selector; } }
SelectedSelectionKeySet
- 如果開啟優化(
),netty反射強制将-Dio.netty.noKeySetOptimization
中的SelectorImpl
替換為優化版的selectedKeys
對象,這使SelectedSelectionKeySet
中所有對Selector
、selectedKeys
的操作實際上就是對publicSelectedKeys
的操作。SelectedSelectionKeySet
本質是一個一維動态數組,每次擴充1倍容量SelectedSelectionKeySet
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { SelectionKey[] keys; int size; SelectedSelectionKeySet() { keys = new SelectionKey[1024]; } @Override public boolean add(SelectionKey o) { if (o == null) { return false; } keys[size++] = o; if (size == keys.length) { increaseCapacity(); } return true; } @Override public int size() { return size; } @Override public boolean remove(Object o) { return false; } @Override public boolean contains(Object o) { return false; } @Override public Iterator<SelectionKey> iterator() { throw new UnsupportedOperationException(); } void reset() { reset(0); } void reset(int start) { Arrays.fill(keys, start, size, null); size = 0; } //擴大一倍,從1024變為2048 private void increaseCapacity() { SelectionKey[] newKeys = new SelectionKey[keys.length << 1]; System.arraycopy(keys, 0, newKeys, 0, size); keys = newKeys; } }
JDK Bug解決方案
-
空指針bug:Selector.open()
中包含一段非線程安全的代碼,可能會抛出sun.nio.ch.Util
異常。NullPointerException
執行到⑤,未執行到⑥,線程1
執行了②,此時線程2
可能為null,即bugLevel
被重置為null,導緻bugLevel
執行到⑥時抛出線程1
異常,此bug詳細參見bug庫NullPointerException
https://bugs.java.com/view_bug.do?bug_id=6427854
sun.nio.ch.Util contains code which is not thread safe and can throw a NullPointerException: private static String bugLevel = null; static boolean atBugLevel(String bl) { // package-private if (bugLevel == null) { ① if (!sun.misc.VM.isBooted()) return false; java.security.PrivilegedAction pa = new GetPropertyAction("sun.nio.ch.bugLevel"); // the next line can reset bugLevel to null bugLevel = (String)AccessController.doPrivileged(pa);② if (bugLevel == null)③ bugLevel = "";④ } return (bugLevel != null) ⑤&& bugLevel.equals(bl)⑥; }
- 使用
并且同時使用linux核心2.4
或者jdk6u4
以下版本,epoll-bug會導緻JDK中的jdk7b12
在沒有感興趣的事件發生時,無效的被調用。此時本應該阻塞的Selector.select()
确沒有阻塞,而是傳回0,導緻CPU空輪詢,緻使IO線程CPU 100%.bug庫位址Selector.select()
https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6403933
- netty中的解決方案
-
空指針問題:在Selector.open()
之前,如果Selector.open()
為空,先将sun.nio.ch.bugLevel
設定為空字元串sun.nio.ch.bugLevel
-
:對epoll 空輪詢導緻CPU 100%問題
傳回0的操作計數(JDK bug導緻select()
不阻塞而傳回0),如果次數大于門檻值select()
就建立一個SELECTOR_AUTO_REBUILD_THRESHOLD
,将注冊到selector
上的老的selector
重新注冊到channel
上。通過新的selector
設定系統變量設定值,預設門檻值512,如果設定的門檻值小于-Dio.netty.selectorAutoRebuildThreshold
,則設定為0MIN_PREMATURE_SELECTOR_RETURNS=3
static { /** * 解決`Selector.open()`空指針bug,在`Selector.open()`之前,如果`sun.nio.ch.bugLevel`為空, * 先将`sun.nio.ch.bugLevel`設定為空字元串 */ final String key = "sun.nio.ch.bugLevel"; final String buglevel = SystemPropertyUtil.get(key); if (buglevel == null) { try { AccessController.doPrivileged(new PrivilegedAction<Void>() { @Override public Void run() { System.setProperty(key, ""); return null; } }); } catch (final SecurityException e) { logger.debug("Unable to get/set System Property: " + key, e); } } /** * 對select傳回0的操作計數(JDK bug導緻select()不阻塞而傳回0),如果次數大于門檻值SELECTOR_AUTO_REBUILD_THRESHOLD就建立一個selector, * 将注冊到老的selector上的channel重新注冊到新的selector上。通過-Dio.netty.selectorAutoRebuildThreshold設定系統變量設定值 * 預設門檻值512,如果設定的門檻值小于MIN_PREMATURE_SELECTOR_RETURNS=3,則設定為0 */ int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512); if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) { selectorAutoRebuildThreshold = 0; } SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold; if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION); logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD); } }
-
run方法
- run
@Override protected void run() { for (;;) { try { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { selector.wakeup(); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); handleLoopException(e); continue; } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
processSelectedKeys
select()方法
rebuildSelector
-
的過程rebuildSelector
- 建立一個新的
Selector
- 取消
在Channel
上的OldSelector
SelectionKey
- 将
上的所有OldSelector
重新注冊到新的Channel
上Selector
- 關閉
釋放資源OldSelector
- 建立一個新的
-
源碼rebuildSelector
private void rebuildSelector0() { final Selector oldSelector = selector; final SelectorTuple newSelectorTuple; if (oldSelector == null) { return; } try { newSelectorTuple = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } // Register all channels to the new Selector. int nChannels = 0; for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { //keyFor 擷取目前Channel注冊在指定Selector上的鍵,如果沒有注冊将傳回null if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; } //取消SelectionKey在OldSelector上的事件注冊 int interestOps = key.interestOps(); key.cancel(); //Channel注冊在新的selector上 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); if (a instanceof AbstractNioChannel) { // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } } selector = newSelectorTuple.selector; unwrappedSelector = newSelectorTuple.unwrappedSelector; try { // time to close the old selector as everything else is registered to the new one oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } } if (logger.isInfoEnabled()) { logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); } }