天天看點

【Netty4】Netty核心元件之NioEventLoop(一)建立1. 開篇2. 類繼承關系以及重要的成員變量總結

文章目錄

  • 1. 開篇
  • 2. 類繼承關系以及重要的成員變量
    • 2.1 類繼承關系
    • 2.2 重要的成員變量
    • 2.3 構造函數
      • 2.3.1 在構造函數中,會建立任務隊列和tailTask隊列
      • 2.3.2 多路複用器Selector
    • 2.4 NioEventLoop執行流程
      • 2.4.1 添加一個線程
      • 2.4.2 run方法解析
        • 2.4.2.1 calculateStrategy
        • 2.4.2. 阻塞的select
        • 2.4.2.3 processSelectedKeys()
  • 總結

系類文章:

《Netty服務端啟動源碼分析(一)整體流程》

《Netty服務端啟動源碼分析(二)服務端Channel的端口綁定》

《Netty核心元件之NioEventLoop(一)建立》

《Netty核心元件之NioEventLoop(二)處理消息》

1. 開篇

  • NioEventLoop是Netty架構的Reactor線程;
  • NioEventLoop負責處理注冊在其上面的所有Channel的IO事件,通常情況下一個NioEventLoop會下挂多個Channel,但一個Channel隻和唯一的NioEventLoop對應

    例如NioEventLoop A B 2個,A和c1、c2對應,B和c3、c4對應,不會交叉。

    一個NioEventLoop對應一個selector多路複用器,為了負載均衡,新的Channel會平攤到每個NioEventLoop上,一旦确定後,就不會更改。

  • NioEventLoop同時會負責通過execute方法送出的任務,以及通過schedule方法送出的定時任務;

在接下來幾篇文章,我會通過Netty的源碼深入講解NioEventLoop的實作機制。

特别說明:基于4.1.52版本的源碼(很新的版本,和之前4.xx版本還是有些不同的)

2. 類繼承關系以及重要的成員變量

先來看下NioEventLoop的類關系圖和重要的屬性,對其有一個整體的感覺,便于後面詳細分析。

2.1 類繼承關系

【Netty4】Netty核心元件之NioEventLoop(一)建立1. 開篇2. 類繼承關系以及重要的成員變量總結

可以看到

NioEventLoop

的繼承關系非常複雜,最上層是JDK的

Executor

接口,說明它歸根到底是一個執行器,是用來執行任務的。另外,它實作了

EventLoop

接口、

EventExecutorGroup

接口和

ScheduledExecutorService

接口,繼承了

SingleThreadEventExecutor

類,這些接口和類為這個執行器添加了十分繁複的功能特性,要搞清楚NioEventLoop的具體實作機制就要不停的在這些父類和接口中來回跳轉。

  • ScheduledExecutorService

    接口表示是一個定時任務接口,EventLoop可以接受定時任務。
  • EventLoop

    接口: 一旦Channel注冊了,就處理該Channel對應的所有I/O操作。
  • SingleThreadEventExecutor

    表示這是一個單個線程的線程池。
  • EventLoop

    是一個單例的線程池,裡面含有一個死循環的線程不斷地做着三件事情:監聽端口,處理端口事件,處理隊列事件。每個Eventloop都可以綁定多個Channel,而每個Channel始終隻能由一個EventLoop來處理
    也就是說,建立了NioEventLoop後,擁有一個線程池類型的成員變量,緊接着會往該線程池加入一個線程,該線程就負責阻塞監聽IO事件等操作。添加線程由SingleThreadEventExecutor的doStartThread()完成,線程具體的幹的事情由NioEventLoop的run()實作

2.2 重要的成員變量

private Selector selector;
private SelectedSelectionKeySet selectedKeys;
private volatile Thread thread;
private final EventExecutorGroup parent;
private final Queue<Runnable> taskQueue;
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
private final Queue<Runnable> tailTasks;
           
  • selector

    :作為NIO架構的Reactor線程,NioEventLoop需要處理網絡IO事件,是以它需要有一個多路複用器,即Java NIO的Selector對象;
  • selectedKeys

    :每次select操作選出來的有事件就緒的SelectionKey集合,在NioEventLoop的run方法中會處理這些事件;
  • thread

    :即每個NioEventLoop綁定的線程,它們是一對一的關系,一旦綁定,在整個生命周期内都不會改變;
  • parent

    :即目前的NioEventLoop所屬的EventExecutorGroup;
  • taskQueue

    :NioEventLoop中三大隊列之一,用于儲存需要被執行的任務。
  • scheduledTaskQueue

    :NioEventLoop中三大隊列之一,是一個優先級隊列(内部其實是一個按照任務的下次執行時間排序的小頂堆),用于儲存定時任務,當檢測到定時任務需要被執行時,會将任務從scheduledTaskQueue中取出,放入taskQueue;
  • tailTasks

    :NioEventLoop中三大隊列之一,用于存儲目前或下一次事件循環結束後需要執行的任務;

2.3 構造函數

在調用構造函數建立執行個體之前,有必要回顧下NioEventLoop是何時被建立的,是在建立NioEventLoopGroup的時候就已經建立了!詳情參見《Netty服務端啟動源碼分析(一)》 中"1.1.2 構造函數幹了什麼"章節。也就是說NioEventLoop是被預建立的,發生在通道的建立和與通道的綁定之前。

首先來看NioEventLoop的構造函數:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        // 設定parent、executor、addTaskWakesUp(添加任務時是否喚醒select)、建立taskQueue和tailTask隊 
        // 列
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        // selector初始化
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }
           

2.3.1 在構造函數中,會建立任務隊列和tailTask隊列

private static Queue<Runnable> newTaskQueue(
            EventLoopTaskQueueFactory queueFactory) {
        if (queueFactory == null) {
            return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
        }
        return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
    }

    private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
    }
           

預設情況下,會建立MPSC,即多生産者單消費者的隊列,這裡最終會用到JCTools庫,這裡不過多介紹,感興趣的可以自己去了解。

2.3.2 多路複用器Selector

Netty的實作是基于Java原生的NIO的,其對原生的NIO做了很多優化,避免了某些bug,也提升了很多性能。但是底層對于網絡IO事件的監聽和處理也是離不開多路複用器Selector的。在NioEventLoop的構造方法中進行了Selector的初始化:

final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
           
構造函數中還會初始化selector和根據配置對selectedKeys進行優化
private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
        // 如果優化選項沒有開啟,則直接傳回
        if (DISABLE_KEY_SET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }

        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }

                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }
           
  • 調用openSelector();,初始化一個epoll,并持有這個selector

    等價于 原生nio文法:Selector selector = Selector.open();

    更多的nio知識,可以參見:《bio、nio、aio–Netty筆記》中<3.2 NIO引入多路複用器代碼示例>的架構示意圖部分

如果設定了優化開關(預設優化選項是開啟的),則通過反射的方式從Selector中擷取selectedKeys和publicSelectedKeys,将這兩個成員設定為可寫,通過反射,使用Netty構造的selectedKeySet将原生JDK的selectedKeys替換掉。

我們知道使用Java原生NIO接口時,需要先調Selector的select方法,再調selectedKeys方法才可以獲得有IO事件準備好的SelectionKey集合。這裡優化過後,隻通過一步select調用,就可以從selectedKeySet獲得需要的SelectionKey集合。

另外,原生Java的SelectionKey集合是一個HashSet,這裡優化過後的SelectedSelectionKeySet底層是一個數組,效率更高。

2.4 NioEventLoop執行流程

也就是說,建立了NioEventLoop後,擁有一個線程池類型的成員變量,緊接着會往該線程池加入一個線程,該線程就負責阻塞監聽IO事件等操作。添加線程由SingleThreadEventExecutor的doStartThread()完成,線程具體的幹的事情由NioEventLoop的run()實作。

2.4.1 添加一個線程

我們先來看下添加線程的代碼,定義在父類中的,為了友善,我進行了簡化,僅列出重要代碼:

public abstract class SingleThreadEventExecutor {
    private void doStartThread() {
        
        //添加一個Runnable作為線程任務到executor(線程池)中
        executor.execute(new Runnable() {
            @Override
            public void run() {               
             
                try {
                    //Runnable匿名内部類調用外部類SingleThreadEventExecutor的run(),實際會調用子類NioEventLoop對象執行個體的run()
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                   
                }
            }
        });
    }
           
代碼比較簡單,但是有個不容易了解的地方,即

SingleThreadEventExecutor.this.run();

,這個是匿名表達式引用外部類方法的文法,即Runnable的run()方法想引用外部類的run()方法,詳情參見《Lambda表達式裡的“陷阱“ 匿名表達式 qualified this》

2.4.2 run方法解析

添加線程後,線程池就會調用該線程,那麼線程具體幹了哪些事情呢?

EventLoop的職責可以用下面這張圖形象的表示:

【Netty4】Netty核心元件之NioEventLoop(一)建立1. 開篇2. 類繼承關系以及重要的成員變量總結

EventLoop的run方法在一個for死循環中,周而複始的做着三件事:

1、從已注冊的Channel監聽IO事件;對應select()阻塞監聽,當然為了避免一直阻塞,一般帶逾時時間

2、處理IO事件;

3、從任務隊列取任務執行。

public final class NioEventLoop extends SingleThreadEventLoop {
    protected void run() {
        int selectCnt = 0;
        for (;;) {
              int strategy;
              try {
                    // 計算本次循環的執行政策
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.BUSY_WAIT:
                    case SelectStrategy.SELECT:
                      // 調用Java NIO的多路複用器,檢查注冊在NioEventLoop上的Channel的IO狀态
                      strategy = select(curDeadlineNanos);
                    }
                } catch (IOException e) {
                }
                // 處理IO事件
                processSelectedKeys();
                // 處理任務隊列中的任務
                ranTasks = runAllTasks();
                ...
    }
           

整個run()方法被包裹在一個for循環中,唯一能夠結束循環的條件是狀态state為SHUTDOWN或者TERMINATED,NioEventLoop繼承了SingleThreadEventExecutor,isShuttingDown()和confirmShutdown()都是SingleThreadEventExecutor中的方法。

可以看到,除去異常處理和一些分支流程,整個run()方法不是特别複雜,重點在與select()和selectNow()方法,run()方法流程如下圖所示:

【Netty4】Netty核心元件之NioEventLoop(一)建立1. 開篇2. 類繼承關系以及重要的成員變量總結

下面詳細解析

2.4.2.1 calculateStrategy

calculateStrategy決定了調用selectNow()還是select()。

先來看calculateStrategy:

public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }

   //成員變量selectNowSupplier 
    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow();
        }
    };

    protected boolean hasTasks() {
        assert inEventLoop();
        return !taskQueue.isEmpty() || !tailTasks.isEmpty();
    }
           
  • 每次循環,都會檢測任務隊列和IO事件,如果任務隊列中沒有任務,則直接傳回SelectStrategy.SELECT;
    因為任務清單為空,則允許調用select阻塞等待新的消息的到來
  • 如果任務隊列中有任務,則會調用非阻塞的selectNow檢測是否有IO事件準備好的Channel數。
    因為任務清單不為空,要選用非阻塞的方法,這樣原有的任務可以繼續執行

2.4.2. 阻塞的select

當任務隊列中沒有任務時,直接進入select分支

case SelectStrategy.SELECT:
            // 找到下一個将要執行的定時任務的截止時間
            long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
            if (curDeadlineNanos == -1L) {
                curDeadlineNanos = NONE; // nothing on the calendar
            }
            nextWakeupNanos.set(curDeadlineNanos);
            try {
                if (!hasTasks()) {
                    // 阻塞調用select
                    strategy = select(curDeadlineNanos);
                }
            } finally {
                nextWakeupNanos.lazySet(AWAKE);
            }
            // fall through
           

阻塞調用select:

private int select(long deadlineNanos) throws IOException {
        // 如果沒有定時任務,直接調Java NIO的select,進入阻塞
        if (deadlineNanos == NONE) {
            return selector.select();
        }
        // 如果截止時間小于0.5ms,則timeoutMillis 為0,直接調非阻塞的selectNow()方法
        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }
           

在調用select之前,再次調用hasTasks()判斷從上次調用該方法到目前為止是否有任務加入,多做了一層防護,因為調用select時,可能會阻塞,這時,如果任務隊列中有任務就會長時間得不到執行,是以須小心謹慎。

如果任務隊列中還是沒有任務,則會調用select方法。在這個方法中會根據入參deadlineNanos來選擇調用NIO的哪個select方法:

如果deadlineNanos為NONE,即沒有定時任務時,直接調用NIO的無參select方法,進入永久阻塞,除非檢測到Channel的IO事件或者被wakeup;

如果存在定時任務,且定時任務的截止時間小于0.5ms,則timeoutMillis 為0,直接調非阻塞的selectNow方法,也就是說馬上有定時任務需要執行了,不要再進入阻塞了;

其他情況,調用select(timeout),進入有逾時時間的阻塞。

2.4.2.3 processSelectedKeys()

參見 《Netty核心元件之NioEventLoop(二)處理消息》

總結

在本文中,對Netty的NioEventLoop進行了深入的解讀,并且詳細講解了它的三大職責之一:檢測Channel的IO事件的機制。

NioEventLoop是Netty最核心的概念,内部運作機制很複雜,在接下來的兩篇文章中會繼續分析。

參考:

《Netty核心元件之NioEventLoop(一)》

繼續閱讀