天天看點

netty系列之:EventExecutor,EventExecutorGroup和netty中的實作

目錄

  • 簡介
  • EventExecutorGroup
  • EventExecutor
  • EventExecutorGroup在netty中的基本實作
  • EventExecutor在netty中的基本實作
  • 總結

簡介

netty作為一個異步NIO架構,多線程肯定是它的基礎,但是對于netty的實際使用者來說,一般是不需要接觸到多線程的,我們隻需要按照netty架構規定的流程走下去,自定義handler來處理對應的消息即可。

那麼有朋友會問了,作為一個NIO架構,netty的多線程到底展現在什麼地方呢?它的底層原理是什麼呢?

今天帶大家來看看netty中的任務執行器EventExecutor和EventExecutorGroup。

EventExecutorGroup

因為EventExecutor繼承自EventExecutorGroup,這裡我們先來詳細講解一下EventExecutorGroup。

先看下EventExecutorGroup的定義:

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor>
           

EventExecutorGroup繼承自JDK的ScheduledExecutorService,即可以執行定時任務,也可以像普通的任務執行器一樣送出任務去執行。

同時EventExecutorGroup還繼承了Iterable接口,表示EventExecutorGroup是可周遊的,它的周遊對象是EventExecutor。

EventExecutorGroup有兩個和Iterable相關的方法,分别是next和iterator:

EventExecutor next();

    @Override
    Iterator<EventExecutor> iterator();
           

在EventExecutorGroup中調用next方法會傳回一個EventExecutor對象,那麼EventExecutorGroup和EventExecutor是什麼關系呢?

我們再來看一下EventExecutor的定義:

public interface EventExecutor extends EventExecutorGroup
           

可以看到EventExecutor實際上是EventExecutorGroup的子類。但是在父類EventExecutorGroup中居然有對子類EventExecutor的引用。

這種在父類的Group中引用傳回子類的設計模式在netty中非常常見,大家可以自行體會一下這樣的設計到底是好還是壞。

EventExecutorGroup作為一個EventExecutor的Group對象,它是用來管理group中的EventExecutor的。是以在EventExecutorGroup中設計了一些對EventExecutor的統一管理接口。

比如

boolean isShuttingDown()

方法用來判斷這個group中的所有EventExecutor全都在被shutdown或者已經被shutdown。

另外EventExecutorGroupt提供了shutdown group中所有EventExector的方法:

Future<?> shutdownGracefully()

和 terminate方法:

Future<?> terminationFuture()

這兩個方法都傳回了Future,是以我們可以認為這兩個方法是異步方法。

EventExecutorGroup中其他的方法都是一些對JDK中ScheduledExecutorService方法的重寫,比如submit,schedule,scheduleAtFixedRate,scheduleWithFixedDelay等。

EventExecutor

接下來我們再研究一下EventExecutor,在上一節中,我們簡單的提到了EventExecutor繼承自EventExecutorGroup,和EventExecutorGroup相比,EventExecutor有哪些新的方法呢?

我們知道EventExecutorGroup繼承了Iterable,并且定義了一個next方法用來傳回Group中的一個EventExecutor對象。

因為Group中有很多個EventExecutor,至于具體傳回哪一個EventExecutor,還是由具體的實作類來實作的。

在EventExecutor中,它重寫了這個方法:

@Override
EventExecutor next();
           

這裡的next方法,傳回的是EventExecutor本身。

另外,因為EventExecutor是由EventExecutorGroup來管理的,是以EventExecutor中還存在一個parent方法,用來傳回管理EventExecutor的EventExecutorGroup:

EventExecutorGroup parent();
           

EventExecutor中新加了兩個inEventLoop方法,用來判斷給定的線程是否在event loop中執行。

boolean inEventLoop();

    boolean inEventLoop(Thread thread);
           

EventExecutor還提供兩個方法可以傳回Promise和ProgressivePromise.

<V> Promise<V> newPromise();
<V> ProgressivePromise<V> newProgressivePromise();
           

熟悉ECMAScript的朋友可能知道,Promise是ES6引入的一個新的文法功能,用來解決回調地獄的問題。這裡的netty引入的Promise繼承自Future,并且添加了兩個success和failure的狀态。

ProgressivePromise更進一步,在Promise基礎上,提供了一個progress來表示進度。

除此之外,EventExecutor還提供了對Succeeded的結果和Failed異常封裝成為Future的方法。

<V> Future<V> newSucceededFuture(V result);

    <V> Future<V> newFailedFuture(Throwable cause);
           

EventExecutorGroup在netty中的基本實作

EventExecutorGroup和EventExecutor在netty中有很多非常重要的實作,其中最常見的就是EventLoop和EventLoopGroup,鑒于EventLoop和EventLoopGroup的重要性,我們會在後面的章節中重點講解他們。這裡先來看下netty中的其他實作。

netty中EventExecutorGroup的預設實作叫做DefaultEventExecutorGroup,它的繼承關系如下所示:

netty系列之:EventExecutor,EventExecutorGroup和netty中的實作

可以看到DefaultEventExecutorGroup繼承自MultithreadEventExecutorGroup,而MultithreadEventExecutorGroup又繼承自AbstractEventExecutorGroup。

先看下AbstractEventExecutorGroup的邏輯。AbstractEventExecutorGroup基本上是對EventExecutorGroup中接口的一些實作。

我們知道EventExecutorGroup中定義了一個next()方法,可以傳回Group中的一個EventExecutor。

在AbstractEventExecutorGroup中,幾乎所有EventExecutorGroup中的方法實作,都是調用next()方法來完成的,以submit方法為例:

public Future<?> submit(Runnable task) {
        return next().submit(task);
    }
           

可以看到submit方法首先調用next擷取到的EventExecutor,然後再調用EventExecutor中的submit方法。

AbstractEventExecutorGroup中的其他方法都是這樣的實作。但是AbstractEventExecutorGroup中并沒有實作next()方法,具體如何從Group中擷取到EventExecutor,還需要看底層的具體實作。

MultithreadEventExecutorGroup繼承自AbstractEventExecutorGroup,提供了多線程任務的支援。

MultithreadEventExecutorGroup有兩類構造函數,在構造函數中可以指定多線程的個數,還有任務執行器Executor,如果沒有提供Executor的話,可以提供一個ThreadFactory,MultithreadEventExecutorGroup會調用

new ThreadPerTaskExecutor(threadFactory)

來為每一個線程構造一個Executor:

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
    }

        protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
           

MultithreadEventExecutorGroup對多線程的支援是怎麼實作的呢?

首先MultithreadEventExecutorGroup提供了兩個children,分别是children和readonlyChildren:

private final EventExecutor[] children;
    private final Set<EventExecutor> readonlyChildren;
           

children和MultithreadEventExecutorGroup中的線程個數是一一對應的,有多少個線程,children就有多大。

children = new EventExecutor[nThreads];
           

然後通過調用newChild方法,将傳入的executor構造成為EventExecutor傳回:

children[i] = newChild(executor, args);
           

看一下newChild方法的定義:

protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
           

這個方法在MultithreadEventExecutorGroup中并沒有實作,需要在更具體的類中實作它。

readonlyChildren是child的隻讀版本,用來在周遊方法中傳回:

readonlyChildren = Collections.unmodifiableSet(childrenSet);

public Iterator<EventExecutor> iterator() {
        return readonlyChildren.iterator();
    }
           

我們現在有了Group中的所有EventExecutor,那麼在MultithreadEventExecutorGroup中,next方法是怎麼選擇具體傳回哪一個EventExecutor呢?

先來看一下next方法的定義:

private final EventExecutorChooserFactory.EventExecutorChooser chooser;

chooser = chooserFactory.newChooser(children);

public EventExecutor next() {
        return chooser.next();
    }
           

next方法調用的是chooser的next方法,看一下chooser的next方法具體實作:

public EventExecutor next() {
            return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
        }
           

可以看到,其實是一個很簡單的根據index來擷取對象的操作。

最後看一下DefaultEventExecutorGroup中對newChild方法的實作:

protected EventExecutor newChild(Executor executor, Object... args) throws Exception {
        return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]);
    }
           

newChild傳回的EventExecutor使用的是DefaultEventExecutor。這個類是EventExecutor在netty中的預設實作,我們在下一小結中詳細進行講解。

EventExecutor在netty中的基本實作

EventExecutor在netty中的預設實作是DefaultEventExecutor,先看下它的繼承結構:

netty系列之:EventExecutor,EventExecutorGroup和netty中的實作

DefaultEventExecutor繼承自SingleThreadEventExecutor,而SingleThreadEventExecutor又繼承自AbstractScheduledEventExecutor,AbstractScheduledEventExecutor繼承自AbstractEventExecutor。

先來看一下AbstractEventExecutor的定義:

public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor
           

AbstractEventExecutor繼承了AbstractExecutorService,并且實作了EventExecutor接口。

AbstractExecutorService是JDK中的類,它提供了 ExecutorService 的一些實作,比如submit, invokeAny and invokeAll等方法。

AbstractEventExecutor作為ExecutorGroup的一員,它提供了一個EventExecutorGroup類型的parent屬性:

private final EventExecutorGroup parent;

public EventExecutorGroup parent() {
        return parent;
    }
           

對于next方法來說,AbstractEventExecutor傳回的是它本身:

public EventExecutor next() {
        return this;
    }
           

AbstractScheduledEventExecutor繼承自AbstractEventExecutor,它内部使用了一個PriorityQueue來存儲包含定時任務的ScheduledFutureTask,進而實作定時任務的功能:

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
           

接下來是SingleThreadEventExecutor,從名字可以看出,SingleThreadEventExecutor使用的是單線程來執行送出的tasks,SingleThreadEventExecutor提供了一個預設的pending執行task的任務大小:DEFAULT_MAX_PENDING_EXECUTOR_TASKS,還定義了任務執行的幾種狀态:

private static final int ST_NOT_STARTED = 1;
    private static final int ST_STARTED = 2;
    private static final int ST_SHUTTING_DOWN = 3;
    private static final int ST_SHUTDOWN = 4;
    private static final int ST_TERMINATED = 5;
           

之前提到了EventExecutor中有一個特有的inEventLoop方法,判斷給定的thread是否在eventLoop中,在SingleThreadEventExecutor中,我們看一下具體的實作:

public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }
           

具體而言就是判斷給定的線程和SingleThreadEventExecutor中定義的thread屬性是不是同一個thread,SingleThreadEventExecutor中的thread是這樣定義的:

這個thread是在doStartThread方法中進行初始化的:

executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
           

是以這個thread是任務執行的線程,也就是executor中執行任務用到的線程。

再看一下非常關鍵的execute方法:

private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            startThread();
           

這個方法首先将task添加到任務隊列中,然後調用startThread開啟線程來執行任務。

最後來看一下DefaultEventExecutor,這個netty中的預設實作:

public final class DefaultEventExecutor extends SingleThreadEventExecutor 
           

DefaultEventExecutor繼承自SingleThreadEventExecutor,這個類中,它定義了run方法如何實作:

protected void run() {
        for (;;) {
            Runnable task = takeTask();
            if (task != null) {
                task.run();
                updateLastExecutionTime();
            }

            if (confirmShutdown()) {
                break;
            }
        }
    }
           

在SingleThreadEventExecutor中,我們會把任務加入到task queue中,在run方法中,會從task queue中取出對應的task,然後調用task的run方法執行。

總結

DefaultEventExecutorGroup繼承了MultithreadEventExecutorGroup,MultithreadEventExecutorGroup中實際調用的是SingleThreadEventExecutor來執行具體的任務。

本文已收錄于 http://www.flydean.com/05-1-netty-event…entexecutorgroup/

最通俗的解讀,最深刻的幹貨,最簡潔的教程,衆多你不知道的小技巧等你來發現!

歡迎關注我的公衆号:「程式那些事」,懂技術,更懂你!