天天看點

【超詳細解釋】從Java NIO到Netty的每一步 (九)

這裡寫目錄标題

  • ​​Netty 源碼解讀​​
  • ​​EventLoop源碼​​
  • ​​1.執行個體化NioEventLoopGroup​​
  • ​​2.eventLoop的執行邏輯​​
  • ​​關注我日常更新分享Java程式設計技術!希望大家都能早日走上人生巅峰!​​
【超詳細解釋】從Java NIO到Netty的每一步 (九)
可以添加小助手vx:xiehuangbao1123 領取java全套學習資料

Netty 源碼解讀

EventLoop源碼

1.執行個體化NioEventLoopGroup

我們丢棄服務的代碼直接使用的無參的構造函數進行執行個體化的,那麼我們就從這個構造函數開始看吧,見io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup()

public NioEventLoopGroup() { 
        this(0);
    }

    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

    public NioEventLoopGroup(int nThreads, Executor executor) {
        //看看這個地方建立了一個SelectorProvider那你是不是想到了Java NIO中的Selector了呢
        this(nThreads, executor, SelectorProvider.provider());
    }
    
    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }      

往super裡跟,看看還幹了什麼,見:io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object…)

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        //看這個地方,如果用的無參構造函數這個地方傳進來的都是0,但最終這個線程數為DEFAULT_EVENT_LOOP_THREADS
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }      

可以看到如果不傳線程數預設的線程數是DEFAULT_EVENT_LOOP_THREADS,那這個又是多少呢?點進去看定義可以看到是計算機核數的2倍:​

​Runtime.getRuntime().availableProcessors() * 2​

​。

繼續往super裡面跟進,見io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object…)

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
        
        //執行個體化executor
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        //EventExecutor數組
        children = new EventExecutor[nThreads];
        
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                //執行個體化數組中的成員,我們跟進去看看
                children[i] = newChild(executor, args);
                success = true;
            } 
            ...
        }
        //給這個evetLoop建立一個選擇器,選擇器你可以了解為有多個eventLoop時,Channel選擇運作在哪個eventLoop的線程上,感興趣的話後續我們也寫個專題講講。
        chooser = chooserFactory.newChooser(children);
    }      

我們繼續跟進​

​newChild​

​方法,見,io.netty.channel.nio.NioEventLoopGroup#newChild

@Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        //這個地方就執行個體化了一個eventLoop,我們繼續跟進構造函數看看
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }      

跟進構造函數,見,io.netty.channel.nio.NioEventLoop#NioEventLoop

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,

                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        //這個super我們就不繼續跟進了,你看傳參executor就可以想到把executor儲存成成員變量為後續給這個eventLoop啟動一個線程做鋪墊了
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        //我們主要看這,可以看到Java NIO的selector在這裡被建立并且成為了eventLoop的成員變量了
        selector = openSelector();
        selectStrategy = strategy;
}      

綜上,我們回顧一下執行個體化NioEventLoopGroup都幹了什麼:建立了一個大小為計算機核數2倍的eventLoop數組并執行個體化了每個eventLoop,最核心的就是建立了eventLoop的成員變量:executor和selector

2.eventLoop的執行邏輯

還記得在上述章節我們講了會在建立完ServerSocketChannel的之後會注冊到其中一個eventLoop上,見io.netty.channel.AbstractChannel.AbstractUnsafe#register

@Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ...
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    //在這個地方eventLoop開始執行,我們跟進去看
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                   ...
                }
            }
        }      

我們跟進這個execute方法,見io.netty.util.concurrent.SingleThreadEventExecutor#execute

@Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            //看這個地方,因為目前我們再main線程裡,是以執行這個方法啟用一個新的線程
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
    
    //上面調的是這個方法
    private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                //繼續跟進
                doStartThread();
            }
        }
    }
    
    //調的這裡
    private void doStartThread() {
        assert thread == null;
        //最後是通過這個ThreadPerTaskExecutor來啟動的新的線程
        executor.execute(new Runnable() {
           executor.execute(new Runnable() {
            @Override
            public void run() {
                //首先把目前的這個線程儲存在目前這個eventLoop的成員變臉thread中,看這樣就完成了一個eventLoop和一個thread的綁定
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    //接下來我們主要看這個地方到底幹了什麼
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    ...
                }
            } 
        });
    }      

我們繼續跟進這個​

​SingleThreadEventExecutor.this.run()​

​;見io.netty.channel.nio.NioEventLoop#run

@Override
    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        //(1)這個select有沒有很眼熟呢?回想一下Java NIO
                        select(wakenUp.getAndSet(false));
                        ...
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        //(2)這裡是不是也眼熟?
                        processSelectedKeys();
                    } finally {
                        //(3)我們也看看這裡幹了什麼
                        runAllTasks();
                    }
                } 
                ...
        }
    }      
  1. select

我們跟進這個方法見io.netty.channel.nio.NioEventLoop#select

private void select(boolean oldWakenUp) throws IOException {
        //拿到Java NIO的selector
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            //這段是不是很熟悉,就是通過for循環不斷輪詢是否有時間發生
            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
                
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                int selectedKeys = selector.select(timeoutMillis);
                ...
             }
         
         }
    }      

上邊的代碼我們先隻關心Netty其實就是調用了Java NIO的selector開始輪詢是否有時間發生。之是以它寫的比較複雜是為了避免一個比較有名的一個空輪詢的bug,那在這裡我們先不讨論,以後可以成立獨立的專題來講述一下。

  1. processSelectedKeys()

一旦有事件發生我們就可以通過​

​processSelectedKeys()​

​處理事件了,跟進見:io.netty.channel.nio.NioEventLoop#processSelectedKeys

private void processSelectedKeys() {
        //有些人可能會好奇為啥這個selectedKeys不是在selector的成員變量被Neety單拿出來了,其實這個地方Netty針對這個selector的selectedKeys做了一些優化,我們也不細說,也可以單獨成立個專題說,我們還是先串流程。
        if (selectedKeys != null) {
            //執行這裡
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        //這個地方和Java NIO依次處理發生的事件也同樣一樣
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            
            selectedKeys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                //因為我們建立了NioServerSocketChannel是以我們主要看這裡,要處理事件了
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
            ...
        }
    }
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        //得到目前AbstractNioChannel中的unsafe成員變量
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ...
        try {
        
            //下面的代碼是不是又很熟悉了?就是Java NIO處理各種不同僚件的代碼了
            int readyOps = k.readyOps();
            
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {              
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                unsafe.finishConnect();
            }

           
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {         
                ch.unsafe().forceFlush();
            }

           
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                //這個地方我們還記的在Java NIO中幹了什麼嗎?建立SocketChannel并注冊讀事件對不對?我們會在下一章讨論這裡,即Netty是如何處理新連接配接接入的
                unsafe.read();
                if (!ch.isOpen()) {     
                    return;
                }
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }      

綜上,eventLoop的執行已經講解完,我們回顧上一章并聯合Java NIO想想都做了什麼?首先建立了NioServerSocketChannel,然後開啟selector輪詢事件,最後在selector上注冊了OP_ACCEPT事件和Java NIO我們寫的流程差不多呢?這樣記是不是好記多了呢?

【超詳細解釋】從Java NIO到Netty的每一步 (九)

關注我日常更新分享Java程式設計技術!希望大家都能早日走上人生巅峰!