天天看点

Netty原理二:NioEventLoop 如何启动监听事件前言原理解析总结

文章目录

  • 前言
  • 原理解析
  • 总结

前言

在NIO编程中,Server 通常会写一个无限循环的代码,Selector 持续监听新的事件:

//NIO
        System.out.println("等待连接....");
        while (true){
            selector.select();
            //处理 selector.selectedKeys 事件
        }
           

但是Netty程序只需要启动 ServerBootstrap,然后就开启监听事件了,那它是如何处理的?

//Netty
        ChannelFuture future = serverBootstrap.bind(7000).sync();
        System.out.println("服务器已启动,端口:7000");
           

原理解析

上一篇文章我们介绍了 ServerBootstrap启动过程全解析 ,其中讲到说 ServerBootstrap 启动的时候会将 NioServerSocketChannel 注册到 BossGroup 内部的 NioEventLoop,这个步骤是由 NioEventLoop 内部开启的线程异步执行,如下图红框部分所示:

Netty原理二:NioEventLoop 如何启动监听事件前言原理解析总结

答案也就在这里,外部调用 eventLoop.execute(new Runnable(){ … }) 在EventLoop 独立线程执行任务。EventLoop 是一个单线程的线程池,内部维护一个 Thread 对象,优先判断线程是否启动了,如果没有就创建新线程,反之不处理。

//SingleThreadEventExecutor.java
    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        //这里判断当前线程是否EventLoop开启的线程
        if (!inEventLoop) {
        	//这句是关键
            startThread();
            
            //以下省略...
        }
    }
           

在 startThread() 内部有一句 SingleThreadEventExecutor.this.run(),该方法就是无限循环监听的实现(这里删掉了部分无关代码):

protected void run() {
        int selectCnt = 0;
        for (;;) {
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:
                    // fall-through to SELECT since the busy-wait is not supported with NIO

                case SelectStrategy.SELECT:
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE; // nothing on the calendar
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        if (!hasTasks()) {
                            strategy = select(curDeadlineNanos);
                        }
                    } finally {
                        nextWakeupNanos.lazySet(AWAKE);
                    }

                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }

                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            } finally {
                // Always handle shutdown even if the loop processing threw an exception.
                if (isShuttingDown()) {
                      closeAll();
                      if (confirmShutdown()) {
                          return;
                      }
                  }
            }
        }
    }
           

上面这段代码比较长,用一个简单的流程图表示就是这样:

Netty原理二:NioEventLoop 如何启动监听事件前言原理解析总结

总结

NioEventLoop 在第一次创建线程的时候,会调用其父类 SingleThreadEventExecutor 的 run 方法,在这个方法内部启动无限循环,监听并处理事件。