天天看點

寫自己的netty源碼分析--NioEventLoop緒論步驟一:NioEventLoop建立步驟二:NioEventLoop啟動步驟三:NioEventLoop執行邏輯

目錄

緒論

步驟一:NioEventLoop建立

new ThreadPerTaskExecutor()[線程建立器]

每次執行任務時都會建立一個線程實體(FastThreadLocalThread--封裝的一個線程)

 識别nioEventLoop-1-xx

for(){newChild()}  [構造NioEventLoop]

建立一個selector

儲存線程執行器ThreadPerTaskExecutor

建立一個MpscQueue

chooserFactory.newChooser()  [線程選擇器]

步驟二:NioEventLoop啟動

服務端啟動綁定端口

新連接配接介入通過chooser綁定一個NioEventLoop

步驟三:NioEventLoop執行邏輯

SingleThreadEventExecutor.this.run()

select()  [檢查是否有IO事件]

processSelectedKeys()  [處理IO事件]

runAllTasks()  [處理異步任務隊列--外部線程放進來的任務]

緒論

三個問題

  1. 預設情況下,netty服務端起多少線程?何時啟動?
  2. Netty如何解覺jdk空輪訓bug的?
  3. Netty如何保證串行無鎖化?

内容

  1. NioEventLoop建立
  2. NioEventLoop啟動
  3. NioEventLoop執行邏輯

步驟一:NioEventLoop建立

new NioEventLoopGroup()

  • new ThreadPerTaskExecutor()[線程建立器]
  • for(){newChild()}  [構造NioEventLoop]
  • chooserFactory.newChooser()  [線程選擇器]

代碼入口

EventLoopGroup group = new NioEventLoopGroup();
           

一直深入代碼

public NioEventLoopGroup() {  
    this(0);  //線程數預設為0
}

進入this(0):
public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}
進入this(...)方法:
public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}
進入this(...)方法:
public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
進入this(...)方法:
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider             
        selectorProvider,final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, 
        RejectedExecutionHandlers.reject());
}
進入super(...)方法:
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
進入super(...)方法:
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, 
                                                        Object... args) {
    this(nThreads, threadFactory == null ? null : 
                                  new ThreadPerTaskExecutor(threadFactory), args);
}
進入this(...)方法:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
           

繼續進入this(...)方法看到最終方法:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        //代碼校驗
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
        //建立步驟1--線程建立器
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        //建立步驟2--構造NioEventLoop
        children = new EventExecutor[nThreads];
        
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
        //建立步驟3--線程選擇器
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
           

new ThreadPerTaskExecutor()[線程建立器]

  • 每次執行任務時都會建立一個線程實體
  • NioEventLoop線程命名規則nioEventLoop-1-xx

每次執行任務時都會建立一個線程實體(FastThreadLocalThread--封裝的一個線程)

new ThreadPerTaskExecutor(),執行execute()方法,利用threadFactory産生一個新線程。傳入的參數是一個newDefaultThreadFactory()産生的Factory執行個體

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}
           

newDefaultThreadFactory()産生的Factory執行個體

protected ThreadFactory newDefaultThreadFactory() {
     return new DefaultThreadFactory(getClass());
}
進入:
public DefaultThreadFactory(Class<?> poolType) {
     this(poolType, false, Thread.NORM_PRIORITY);
}
進入this()方法:
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
    this(toPoolName(poolType), daemon, priority);
}
進入this()方法:
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
    this(poolName, daemon, priority, System.getSecurityManager() == null ?
          Thread.currentThread().getThreadGroup() 
                                      :System.getSecurityManager().getThreadGroup());
}
進入this()方法:
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup         
    threadGroup) {
        if (poolName == null) {
            throw new NullPointerException("poolName");
        }
        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
            throw new IllegalArgumentException(
                    "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
        }

        prefix = poolName + '-' + poolId.incrementAndGet() + '-';
        this.daemon = daemon;
        this.priority = priority;
        this.threadGroup = threadGroup;
    }
           

 識别nioEventLoop-1-xx

public static String toPoolName(Class<?> poolType) {
        if (poolType == null) {
            throw new NullPointerException("poolType");
        }

        String poolName = StringUtil.simpleClassName(poolType);
        switch (poolName.length()) {
            case 0:
                return "unknown";
            case 1:
                return poolName.toLowerCase(Locale.US);
            default:
                if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
                    return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
                } else {
                    return poolName;
                }
        }
}
           

産生的Factory執行個體的newThread()方法,産生FastThreadLocalThread實體

public Thread newThread(Runnable r) {
        Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
        try {
            if (t.isDaemon() != daemon) {
                t.setDaemon(daemon);
            }

            if (t.getPriority() != priority) {
                t.setPriority(priority);
            }
        } catch (Exception ignored) {
            // Doesn't matter even if failed to set.
        }
        return t;
}
           
protected Thread newThread(Runnable r, String name) {
    return new FastThreadLocalThread(threadGroup, r, name);
}
           

for(){newChild()}  [構造NioEventLoop]

  • 儲存線程執行器ThreadPerTaskExecutor
  • 建立一個MpscQueue
  • 建立一個selector

入口 children[i] = newChild(executor, args); 找到NioEventLoopGroup類實作的方法

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
           

可以看到NioEventLoop的構造函數

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;   //建立一個selector
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
}
           

建立一個selector

SelectorTuple 是一個内部包裝類,包含了final Selector unwrappedSelector和final Selector selector;,在openSelector()方法中建立了selector。

private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
        ......
}
           

繼續進入NioEventLoop的構造函數中的super()方法

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
        tailTasks = newTaskQueue(maxPendingTasks);
}
進入super()方法:
protected SingleThreadEventExecutor(
            EventExecutorGroup parent, ThreadFactory threadFactory,
            boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
        this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
}
           

可以看到在this()方法中傳進來了new ThreadPerTaskExecutor(threadFactory)執行器。繼續進入this()方法

儲存線程執行器ThreadPerTaskExecutor

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        //将傳進來的執行器儲存起來
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
           

 進入newTaskQueue()方法,找到NioEventLoop中的實作,可以看到建立了newMpscQueue()隊列。

建立一個MpscQueue

protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        // This event loop never calls takeTask()
        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                                                    : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
           

chooserFactory.newChooser()  [線程選擇器]

給新連接配接綁定對應的NioEventLoop

  • isPowerOfTwo() -- 判斷是不是2的次幂
  • 是--執行PowerOfTwoEventExecutorChooser(優化);index++ & (length-1)
  • 否--執行GenericEventExecutorChooser(普通); abs(index++ % (length)

新連接配接綁定NioEventLoop的過程:每來一個新連接配接,NioEventLoop[]數組會往後移動一位,移到隊尾後就會從第一個繼續綁定NioEventLoop。

入口:chooser = chooserFactory.newChooser(children);children為前面建立的NioEventLoop數組

public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
}
           
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
}

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
}
           

idx.getAndIncrement() & executors.length - 1 解釋:

假如idx是111010,NioEventLoop數組大小為16,減1之後為1111,兩者相與後結果為1010.可以快速定位到數組内的位置(因為是2的次幂)。而且當idx不斷增大時,高位就不會有作用,而隻會在後四位進行循環。

而取模%操作依賴于底層操作才能得到,速度會比&慢。

步驟二:NioEventLoop啟動

  • 服務端啟動綁定端口
  • 新連接配接介入通過chooser綁定一個NioEventLoop

服務端啟動綁定端口

bind() --> execute(task) [入口]

     startThread() --> dostartThread() [建立線程]

            ThreadPerTaskExecutor.execute()

                   thread=Thread.currentThread()

                   NioEventLoop.run()

服務端啟動過程中,主線程會調用bind()方法,該方法最終會将實際綁定的流程封裝成一個task,然後調用服務端Channel的一個execute()方法去具體的執行。然後netty會判斷調用execute()的線程不是NioEventLoop線程,于是調用startThread()中的dostartThread()開始建立線程(使用ThreadPerTaskExecutor,ThreadPerTaskExecutor.execute()會産生一個線程--具體過程是NioEventLoop将目前建立的線程進行儲存thread=Thread.currentThread(),然後調用NioEventLoop.run()啟動)。

從bind()方法中doBind()中的doBind0(regFuture, channel, localAddress, promise);進入

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
}
           

 接着調用execute()方法進入類SingleThreadEventExecutor中的實作。其中,inEventLoop()方法判斷目前執行的線程是否是NioEventLoop的線程。

public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();  //該方法判斷目前執行的線程是否是NioEventLoop的線程
        addTask(task);
        if (!inEventLoop) {  //不是則建立新的線程
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
}
進入inEventLoop()方法:
public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
}
進入inEventLoop()方法:
public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
}
           

 不是NioEventLoop線程的話就調用startThread()中的dostartThread()建立線程。

首先判斷目前線程狀态是否是啟動的,未啟動則使用CAS的方法進行實際線程啟動。

private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                try {
                    doStartThread();
                } catch (Throwable cause) {
                    STATE_UPDATER.set(this, ST_NOT_STARTED);
                    PlatformDependent.throwException(cause);
                }
            }
        }
}
           

 其中的executor是之前newChild()中第一個參數傳入到NioEventLoop裡面

private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        if (logger.isErrorEnabled()) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                    "be called before run() implementation terminates.");
                        }
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                            // the future. The user may block on the future and once it unblocks the JVM may terminate
                            // and start unloading classes.
                            // See https://github.com/netty/netty/issues/6596.
                            FastThreadLocal.removeAll();

                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                if (logger.isWarnEnabled()) {
                                    logger.warn("An event executor terminated with " +
                                            "non-empty task queue (" + taskQueue.size() + ')');
                                }
                            }
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
}
           

新連接配接介入通過chooser綁定一個NioEventLoop

待續

步驟三:NioEventLoop執行邏輯

SingleThreadEventExecutor.this.run()

進入NioEventLoop.run() --> for(;;)

  • select()  [檢查是否有IO事件]
  • processSelectedKeys()  [處理IO事件]
  • runAllTasks()  [處理異步任務隊列--外部線程放進來的任務]
protected void run() {
        for (;;) {
            try {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false)); //進行select操作處于未喚醒狀态

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    handleLoopException(e);
                    continue;
                }
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;  
                //ioRatio預設為50,也就是處理IO事件和處理外部事件時間一樣
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
           

select()  [檢查是否有IO事件]

  • deadline以及任務穿插邏輯處理
  • 阻塞式select
  • 避免jdk空輪詢的bug
private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();  //目前時間
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            //delayNanos(currentTimeNanos)--計算定時任務隊列第一個任務的截止時間
            //selectDeadLineNanos -- 目前select操作不能超過這個時間

            for (;;) {
                //首先計算是否逾時
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) { 
                    if (selectCnt == 0) {  //如果逾時一次也沒有select,則進行一個非阻塞的select方法
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                 //未逾時則檢視隊列裡是否有任務,有的話并進行喚醒操作,執行非阻塞select
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
                //截止時間沒到,且目前任務隊列為空的話則進行阻塞式select操作
                int selectedKeys = selector.select(timeoutMillis); //timeoutMillis--本次逾時的最大時間
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }
                //避免jdk空輪詢bug
                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {  //已經執行過一次阻塞式select操作
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                //目前時間減去開始時間,小于timeout時間,表示并沒有阻塞直接傳回了,導緻空輪詢觸發。接下來的邏輯是,判斷空輪詢的次數,如果大于512,調用selectRebuildSelector(selectCnt)--
               //把老的selectKey注冊到新的select上,這樣新的select的阻塞式操作就可能不發生空輪詢
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {  
                    // The code exists in an extra method to ensure the method is not too big to inline as this
                    // branch is not very likely to get hit very frequently.
                    selector = selectRebuildSelector(selectCnt);
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }
           

hasTasks()

protected boolean hasTasks() {
    return super.hasTasks() || !tailTasks.isEmpty();
}

tailTasks = newTaskQueue(maxPendingTasks);
           

 selectRebuildSelector(selectCnt),其中attachment()為經過netty包裝的channel

private Selector selectRebuildSelector(int selectCnt) throws IOException {
        // The selector returned prematurely many times in a row.
        // Rebuild the selector to work around the problem.
        logger.warn(
                "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                selectCnt, selector);

        rebuildSelector();
        Selector selector = this.selector;

        // Select again to populate selectedKeys.
        selector.selectNow();
        return selector;
}
進入rebuildSelector():
public void rebuildSelector() {
        if (!inEventLoop()) {
            execute(new Runnable() {
                @Override
                public void run() {
                    rebuildSelector0();
                }
            });
            return;
        }
        rebuildSelector0();
}
進入rebuildSelector0():
private void rebuildSelector0() {
        final Selector oldSelector = selector;
        final SelectorTuple newSelectorTuple;

        if (oldSelector == null) {
            return;
        }

        try {
            newSelectorTuple = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }

        // Register all channels to the new Selector.
        int nChannels = 0;
        for (SelectionKey key: oldSelector.keys()) {
            Object a = key.attachment();  //經過netty包裝的channel
            try {
                if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                    continue;
                }

                int interestOps = key.interestOps();
                key.cancel();
                SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
                if (a instanceof AbstractNioChannel) {
                    // Update SelectionKey
                    ((AbstractNioChannel) a).selectionKey = newKey;
                }
                nChannels ++;
            } catch (Exception e) {
                logger.warn("Failed to re-register a Channel to the new Selector.", e);
                if (a instanceof AbstractNioChannel) {
                    AbstractNioChannel ch = (AbstractNioChannel) a;
                    ch.unsafe().close(ch.unsafe().voidPromise());
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    invokeChannelUnregistered(task, key, e);
                }
            }
        }

        selector = newSelectorTuple.selector;
        unwrappedSelector = newSelectorTuple.unwrappedSelector;

        try {
            // time to close the old selector as everything else is registered to the new one
            oldSelector.close();
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to close the old Selector.", t);
            }
        }

        if (logger.isInfoEnabled()) {
            logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
        }
    }
           

processSelectedKeys()  [處理IO事件]

  • selected keySet優化(預設進行優化):select操作每次都會把已經就緒的IO事件添加到底層的hashSet的資料結構裡,netty會通過反射的方式将hashSet替換成數組的實作(netty不需要set的remove,contains操作)。這樣在任何情況下,操作時間複雜度為O(1),優于hashSet.
  • processSelectedKeysOptimized() :真正處理IO事件

keySet優化,在建立NioEventLoop時,會調用一個openSelector,優化操作其實是在他裡面完成的。

在其中建立了一個數組,關鍵是add()方法。

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    SelectionKey[] keys;
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        keys[size++] = o;
        if (size == keys.length) {
            increaseCapacity();
        }

        return true;
    }

    @Override
    public boolean remove(Object o) {
        return false;
    }

    @Override
    public boolean contains(Object o) {
        return false;
    }

    @Override
    public int size() {
        return size;
    }

    @Override
    public Iterator<SelectionKey> iterator() {
        return new Iterator<SelectionKey>() {
            private int idx;

            @Override
            public boolean hasNext() {
                return idx < size;
            }

            @Override
            public SelectionKey next() {
                if (!hasNext()) {
                    throw new NoSuchElementException();
                }
                return keys[idx++];
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    void reset() {
        reset(0);
    }

    void reset(int start) {
        Arrays.fill(keys, start, size, null);
        size = 0;
    }

    private void increaseCapacity() {
        SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
        System.arraycopy(keys, 0, newKeys, 0, size);
        keys = newKeys;
    }
}
           

openSelector()中的優化操作

private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        if (DISABLE_KEY_SET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }
        //預設進行優化
        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        if (!(maybeSelectorImplClass instanceof Class) ||
            // ensure the current selector implementation is what we can instrument.
            !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    //反射
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                        // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                        // This allows us to also do this in Java9+ without any extra flags.
                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset =
                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                            PlatformDependent.putObject(
                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                            PlatformDependent.putObject(
                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        }
                        // We could not retrieve the offset, lets try reflection as last-resort.
                    }

                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    //将原生selector替換成優化後的selector
                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });

        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
            return new SelectorTuple(unwrappedSelector);
        }
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }
           

processSelectedKeysOptimized()

private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
}
           
private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
}
           

 進入 processSelectedKey()方法

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {  //key不合法
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();  //讀取IO事件OP_CONNECT、OP_WRITE、OP_READ OP_ACCEPT
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
           

runAllTasks()  [處理異步任務隊列--外部線程放進來的任務]

  • task的分類和添加
  • 任務的聚合
  • 任務的執行

task的分類和添加

兩個task任務隊列,一個是普通任務隊列,一個是定時任務隊列,在NioEventLoop建立時就建立了。

普通任務隊列

入口:NioEventLoop構造函數 --> 調用super()方法,進入SingleThreadEventLoop的構造方法 -->調用super()方法,進入SingleThreadEventExecutor類的構造方法,可以看到taskQueue的建立。

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

protected Queue<Runnable> newTaskQueue() {
        return newTaskQueue(maxPendingTasks);
}
           

 當外部線程調用NioEventLoop的一個execute()方法時,會判斷調用execute()方法是在NioEventLoop線程裡還是外部線程。若是在外部線程裡面,則啟動startThread()。其中有一個addTask()方法。裡面有一個offerTask()方法,向隊列裡添加一個task。

public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
}
           
protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }

    final boolean offerTask(Runnable task) {
        if (isShutdown()) {
            reject();
        }
        return taskQueue.offer(task);
    }
           

 定時隊列

将callable包裝成一個定時任務

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        ObjectUtil.checkNotNull(callable, "callable");
        ObjectUtil.checkNotNull(unit, "unit");
        if (delay < 0) {
            delay = 0;
        }
        validateScheduled0(delay, unit);

        return schedule(new ScheduledFutureTask<V>(
                this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
    }
           

 首先判斷是在NioEventLoop内釋出的schedule還是在外部線程釋出的schedule。如果是外部線程,則把添加操作變成線程安全的操作;内部線程則直接添加。這麼做的原因是 scheduledTaskQueue()不是線程安全的,隻是一個普通的優先隊列。它是通過添加定時任務這個操作當成一個普通任務來保證所有對于scheduledTaskQueue的操作都是在NioEventLoop裡實作的,由此保證線程安全。

<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
        if (inEventLoop()) {
            scheduledTaskQueue().add(task);
        } else {
            execute(new Runnable() {
                @Override
                public void run() {
                    scheduledTaskQueue().add(task);
                }
            });
        }

        return task;
    }
           
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
        if (scheduledTaskQueue == null) {
            scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
                    SCHEDULED_FUTURE_TASK_COMPARATOR,
                    // Use same initial capacity as java.util.PriorityQueue
                    11);
        }
        return scheduledTaskQueue;
    }
           

 任務的聚合

入口:NioEventLoop.run()方法 --> runAllTasks(ioTime * (100 - ioRatio) / ioRatio);  //執行任務時的時間不能超過傳進來的參數時間。

protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();  //聚合任務
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }
        ......
    }
           

fetchFromScheduledTaskQueue():首先從定時任務隊列中擷取第一個任務。這個定時任務隊列是一個優先隊列,它的排序方式是按照截止時間來排序,如果截止時間相同則按照ID排序。其次,使用pollScheduledTask()取出任務後,将任務添加到普通任務隊列裡,如果添加失敗,将該任務重新加入定時任務隊列,否則的話,該任務會直接丢掉。添加後則繼續在循環中擷取任務,執行完之後,所有的定時任務都會被拉取到普通任務隊列中。

private boolean fetchFromScheduledTaskQueue() {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        Runnable scheduledTask  = pollScheduledTask(nanoTime);
        while (scheduledTask != null) {
            if (!taskQueue.offer(scheduledTask)) {
                // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
                scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
            scheduledTask  = pollScheduledTask(nanoTime);
        }
        return true;
    }
進入ScheduledFutureTask類:
public int compareTo(Delayed o) {
        if (this == o) {
            return 0;
        }

        ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
        long d = deadlineNanos() - that.deadlineNanos();
        if (d < 0) {
            return -1;
        } else if (d > 0) {
            return 1;
        } else if (id < that.id) {
            return -1;
        } else if (id == that.id) {
            throw new Error();
        } else {
            return 1;
        }
    }
           

 pollScheduledTask():首先拿到定時任務隊列,擷取第一個任務,若任務為空,則直接傳回;如果定時任務的截止時間小于傳進來的時間,就把定時任務從定時任務隊列中移除,然後傳回。否則傳回null,說明目前定時任務隊列裡所有任務的截止時間還有到,沒必要拿出來執行。

protected final Runnable pollScheduledTask(long nanoTime) {
        assert inEventLoop();

        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
        ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
        if (scheduledTask == null) {
            return null;
        }

        if (scheduledTask.deadlineNanos() <= nanoTime) {
            scheduledTaskQueue.remove();
            return scheduledTask;
        }
        return null;
    }
           

任務的執行

protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();  //聚合任務
        Runnable task = pollTask();  //從普通隊列裡取一個任務
        if (task == null) {
            afterRunningAllTasks();  //每個任務執行完之後都會進行一個收尾操作
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; //計算截止時間
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {  //循環執行隊列裡的所有任務
            safeExecute(task);  //安全執行任務

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {  //每執行64個任務會計算目前時間,若超過截止時間,就不執行任務。為什麼是64?因為nanoTime()相當耗時
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;  //記錄上一個執行時間
        return true;
    }
           
protected Runnable pollTask() {
        assert inEventLoop();
        return pollTaskFrom(taskQueue);
    }
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
        for (;;) {
            Runnable task = taskQueue.poll();
            if (task == WAKEUP_TASK) {
                continue;
            }
            return task;
        }
    }
protected static void safeExecute(Runnable task) {  //出現異常之後并不阻止任務執行,知識列印異常日志
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception. Task: {}", task, t);
        }
    }
           

繼續閱讀