天天看點

史上最不詳細的Netty源碼分析-Netty啟動流程

首先要說明的是标題。沒錯!當然是為了博眼球,改成史上最詳細的Netty源碼分析是自取其辱。改成史上最不詳細的Netty源碼分析,萬一說的不對也沒人噴。話分兩頭,這個是我分析的初篇,我還是很認真和用心的寫的。班門弄斧,如果有錯誤請指出。

為什麼要寫Netty源碼分析? 用《Netty權威指南》話說

随着大資料,網際網路和雲計算的發展,傳統的垂直架構将逐漸被分布式,彈性伸縮的新架構替代。

系統隻要分布式部署,就存在多個節點之間的通訊問題,強調高可擴充和高性能,是以往往會選擇高性能的通訊分式。使用

Netty+二進制編解碼這些内部私有協定,已經逐漸成為業界主流。

簡單的例子

啟動下面程式後,指令行裡輸入

telnet 127.0.0.1 9988

後,随便敲個字元,就會有

welcome!!!

列印出來,并且在console裡會列印出敲入的字元。如果沒有telnet指令,使用

curl 127.0.0.1:9988

也可以列印出

welcome!!!

代碼如下

public class WelcomeServer {
    private int port;
    public WelcomeServer(int port) {
        this.port = port;
    }
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new WelcomeServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        int port = 9988;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        new WelcomeServer(port).run();
    }
}
           
public class WelcomeServerHandler extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf bb=(ByteBuf)msg;
        byte[] b= new byte[bb.readableBytes()];
        bb.readBytes(b);
        System.out.println("read message:"+new String(b));
        byte[] welcomebyte="welcome!!!".getBytes();
        bb.writeBytes(welcomebyte);
        ctx.writeAndFlush(bb);
    }
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
        cause.printStackTrace();
        ctx.close();
    }
}
           

開始分析

ChannelFuture f = b.bind(port).sync();

入手,在一系列的調用後會到達AbstractBootstrap#doBind

private ChannelFuture doBind(final SocketAddress localAddress) {
 final ChannelFuture regFuture = initAndRegister();
 ...   
}
           

在上面的類裡最重要的就是initAndRegister(),顧名思義就是初始并注冊。

下面的類去掉了沒相關的代碼,隻保留必要的用于分析的代碼

final ChannelFuture initAndRegister() {
        //初始
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();//(1)
            init(channel);//(2)
        } catch (Throwable t) {
         ...
        }
        //注冊
        ChannelFuture regFuture = config().group().register(channel);//(3)
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }

           

從代碼裡就是看出分成3步:

  1. 建構出一個channel (1)
  2. channel 初始化 (2)
  3. 将channel注冊到group裡并傳回一個futture對象 (3)

接下來就分建構,初始化 ,注冊 3部分進行分析

建構

通過

channelFactory.newChannel()

進行的建構出一個chnnel,這裡使用了·

channelFactory

這個工廠對象,

channelFactory

哪裡得出的

在例子WelcomeServer中有下面代碼

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);

           

channel(NioServerSocketChannel.class),它的代碼在下面

public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));//(1)
    }
           
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        ...
        //初始化出 channelFactory
        this.channelFactory = channelFactory;
        //傳回自己,鍊式調用,builder模式常用
        return (B) this;
    }
           

下面是ReflectiveChannelFactory的newChannel()

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }
   
}
           

new ReflectiveChannelFactory<C>(channelClass)

我們傳入的class是

NioServerSocketChannel.class

.

看到ReflectiveChannelFactoryd.newChannel()就是反射建立出一個NioServerSocketChannel對象。

建立完NioServerSocketChanne後,傳入下面的init方法

小結:

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
           

這段代碼裡channel(NioServerSocketChannel.class),名義上是指定了channel, 實際上是指定了ReflectiveChannelFactory。當構造channel的時候是通過反射構造出channel.

初始化

上面分析new了一個channel 對象

init(channel)都幹了什麼?

void init(Channel channel) throws Exception {
  ChannelPipeline p = channel.pipeline();
 
  p.addLast(new ChannelInitializer<Channel>() { //(3)
            @Override
            public void initChannel(final Channel ch) throws Exception {
            
              final Map<ChannelOption<?>, Object> options = options0();
              synchronized (options) {    
                 channel.config().setOptions(options);
               }
                // 拿到ChannelPipeline
                final ChannelPipeline pipeline = ch.pipeline();
                // 拿到config裡的handler,這些handler就是WelcomeServer裡構造出ServerBootStrap裡的handler()方法放入的,注意不是childHandler()
                //WelcomeServer裡目前沒有寫handler()
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    
}
           

channel.pipeline()是什麼?這得從

NioServerSocketChannel構造過程入手,我們知道NioServerSocketChannel是

clazz.newInstance()

構造出來的。構造函數如下

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {
    
    //JDK裡的NIO裡的SelectorProvider                        
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }
    
     public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
}
           

super(null, channel, SelectionKey.OP_ACCEPT);的調用棧如下

class AbstractNioMessageChannel{
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
           super(parent, ch, readInterestOp);
    }
}
           
class AbstractNioChannel{
 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
           ...
        }
    }
}
           
class AbstractChannel{
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
}
           

channel.pipeline()傳回的ChannelPipeline 就是上面的

newChannelPipeline()

得出的.

下面是newChannelPipeline()的代碼

class AbstractChannel{
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
    
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }
}
           

從上面的代碼裡分析得出pipeline其實是雙向連結清單,如下

HeadContext.next-->TailContext
TailContext.prev-->HeadContext
           

有tail和head的全局引用分别指向TailContext和HeadContext

TailContext和HeadContext的差別

TailContext

HeadContext

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {}
           

可以看出HeadContext比TailContext多了一個ChannelOutboundHandler接口。ChannelOutboundHandler和ChannelInboundHandler的作用後續再介紹

回到代碼,看addLast

ChannelPipeline p = channel.pipeline(); 
  p.addLast(new ChannelInitializer<Channel>() { //(3)
            @Override
            public void initChannel(final Channel ch) throws Exception {
            
              final Map<ChannelOption<?>, Object> options = options0();
              synchronized (options) {     channel.config().setOptions(options);
               }
                // 拿到ChannelPipeline
                final ChannelPipeline pipeline = ch.pipeline();
                // 拿到config裡的handler,這些handler就是構造出ServerBootStrap裡的childHandler()方法放入的
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                      // 這個處理client連接配接請求的關鍵Context的handler,這個在後面會分析,這裡暫時不關心
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
           

addLast(...)

,最後會調用到下面代碼,關注addLast0

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            //将handler分裝成為一個Context對象
            newCtx = newContext(group, filterName(name, handler), handler);
            //将這個Context對象加入到雙向連結清單的tail節點之前
            addLast0(newCtx);          
            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.           
            //registered=false 表示該channel 還沒注冊到selector
            //會最後調用到ChannelHandler.handlerAdded(...)方法裡,這個方法裡,最終會initChannel(ctx)
            if (!registered) {
                newCtx.setAddPending();
                //将這個任務加入到pipeline的pendingHandlerCallbackHead連結清單上
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
           

addLast0,回顧上面的pipleline裡的雙向連結清單,就容易了解到将這個AbstractChannelHandlerContext加入到雙向連結清單的tail的上個節點

private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
}
           

上面的流程就是初始的過程,總結下,簡單說就是下面2步

  1. new出一個Channel, 這個channel是

    NioServerSocketChannel

    ,通過反射調用無參數夠造函數構造出來。
  2. 構造過程裡建立了一個pipeline對象,這個pipeline對象是個雙向連結清單的結構
  3. 初始化這個channel,在channel的pipeline裡加入從handler(…)裡拿到的handler封裝成的Context對象

    如下

b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(...)
           

4.最後會往pipeline加入ServerBootstrapAcceptor這個處理client連接配接的handler的context

5.如上分析server端pipeline應該是

史上最不詳細的Netty源碼分析-Netty啟動流程
注冊

上面的2步分别是new 出1個channel,然後對channel做了一些初始化,如channel裡的pipeline這個雙向連結清單的初始化操作。

現在進入到第三步,将channel注冊到group裡,如下

(3)

ChannelFuture regFuture = config().group().register(channel);

那問題是什麼是group?

先看

config()

,調用到最後會發現這是ServerBootstrap的一個全局變量ServerBootstrapConfig,ServerBootstrapConfig的有個傳入參數this指向自己,如下代碼

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
}
           

config().group()其實就是調用到

ServerBootstrap.group().

ServerBootstrap.group()是什麼呢?得看我們建立 WelcomeServer的時候,有new了2個EventLoopGroup,一個是bossGroup,另個是workerGroup。代碼如下

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
           

ServerBootstrap中的group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法如下

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>{
    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        ...
        this.childGroup = childGroup;
        return this;
    }
}
           

super.group(parentGroup)代碼如下

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
   public B group(EventLoopGroup group) {
        this.group = group;
        return (B) this;
    }
}
           

做的事就是

  • bossGroup付給了ServerBootstrap裡全局變量

    group

  • workerGroup付給了ServerBootstrap裡全局變量

    childGroup

    .

config().group()

指向的就是bossGroup這個

NioEventLoopGroup

是以下面的代碼

就是把

channel

注冊到

EventLoopGroup

裡,确切的說

NIOServerChannel

注冊到bossGroup這個

NioEventLoopGroup

裡.

NioEventLoopGroup

裡的register(channel)方法是在它的父類

MultithreadEventLoopGroup

定義的。

public class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup{
    public EventLoop next() {
        return (EventLoop) super.next();
    }
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
}
           
public class MultithreadEventExecutorGroup{
  public EventExecutor next() {
        return chooser.next();
    }
}
           

這個chooser又是個什麼對象,以及怎麼初始化出來?

這得從下面的代碼入手

NioEventLoopGroup無參構造函數最後調用super(…)

public class NioEventLoopGroup extends MultithreadEventLoopGroup {
   public NioEventLoopGroup() {
        this(0);
    }
    
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }
    
    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
}
           
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

  private static final int DEFAULT_EVENT_LOOP_THREADS;

  static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
  }
  //nThreads==0,是以取DEFAULT_EVENT_LOOP_THREADS,取DEFAULT_EVENT_LOOP_THREAD預設就是cpu核心數*2
  protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
  }
}
           
public abastract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup{
   
    private final EventExecutor[] children;

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    
     protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {
        ...
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
               //EventExecutor 構造出來的是NioEventLoop
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
               ...
            } finally {
               ...
            }
        }

       //!!!!注意這裡-chooser 在此構造出來,它是傳入的chooserFactory,
       //即DefaultEventExecutorChooserFactory.INSTANCE
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);                             
     }
}
           

DefaultEventExecutorChooserFactory 是個單例

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
       //如果傳入的executors是2的n次方
       //在這裡我們傳入的executors的個數是cpu核心數*2,一定是2的n次方,是以傳回的是PowerOfTowEventExecutorChooser
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTowEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
    }

           

我們要關注的就是PowerOfTowEventExecutorChooser.next(),

executors[idx.getAndIncrement() & executors.length - 1]

說白了就是 EventExecutor[]裡挨個被選中去執行任務。

private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }
           

回到

public class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup{
    public EventLoop next() {
        return (EventLoop) super.next();
    }
    public ChannelFuture register(Channel channel){
    //next()就是獲得executors數組裡的某個executor
    //(io.netty.channel.nio.NioEventLoop)
    //然後執行注冊channel
        return next().register(channel);
    }
}
           

next()得到是的executor 是EventExecutor[] children裡的一個,也就是

io.netty.channel.nio.NioEventLoop

接下來需要明确

NioEventLoop

NioEventLoopGroup

關系

NioEventLoopGroup 類

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    private final EventExecutor[] children;
}
           

NioEventLoop類

NioEventLoop

的遠祖就是

EventExecutor

。然後可以回到它兩關系,

NioEventLoopGroup

裡有個EventExecutor[] children的數組,裡面包含的真實對象就是

NioEventLoop

next().register(channel)

就是從EventExecutor[] children的數組裡拿到一個

NioEventLoop

,

NioEventLoop.register(channel)

接下來就看NioEventLoop的register(Channel channel)方法,這個方法其實是它的父類SingleThreadEventLoop裡的方法

public class SingleThreadEventLoop{
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
}
           

promise.channel()

我們知道是

NioServerSocketChannel

,它的unsafe()方法傳回了一個

Unsafe

接口對象,

Unsafe

是個什麼鬼?

還記得

NioServerSocketChannel

構造到最後調用如下代碼,其中有

newUnsafe()

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
           

讓我們進入

newUnsafe()

方法,構造出一個Unsafe的接口對象。看下

Unsafe

接口 的說明。正常使用是不會碰到用到這些類

Unsafe operations that should never be called from user-code.

初略的看了下這個接口提供的方法,例如

  • void register(EventLoop eventLoop, ChannelPromise promise);
  • void bind(SocketAddress localAddress, ChannelPromise promise);

    可以斷定這個類是用來做連接配接相關的

debug會得到最後的對象是

NioMessageUnsafe

private final class NioMessageUnsafe extends AbstractNioUnsafe {
}
           

我們關注的是register(EventLoop eventLoop, ChannelPromise promise)的實作,這個實作是在

AbstractNioUnsafe

的父類

AbstractUnsafe

裡實作的

protected abstract class AbstractUnsafe implements Unsafe {
       public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ...
         //eventLoop 是傳入NioEventLoop AbstractChannel.this.eventLoop = eventLoop;
         //第一次進來eventLoop.inEventLoop()是false
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                   //關注點-eventLoop.execute
                   
                   eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                 ...
                }
            }
        }
  }
           

上面的代碼我們關注

eventLoop.execute(new Runnable() {...});

,這個execute是

NioEventLoop

的父類

SingleThreadEventExecutor

裡實作的,如下代碼

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
 private final Executor executor;
 public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
           // 第一次肯定是到這裡
           //關注點-啟動線程!!!
            startThread();
            //關注點-加入任務!!!
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
    
  private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }
    
    private void doStartThread() {
        assert thread == null;
        //這個executor又是什麼?
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    ...
                } finally {
                    ...
                }
            }
        });
    }
}    
           

上面的代碼裡這個executor是什麼?

我們在create NioEventLoopGroup的時候,會調用到它的父類構造函數中,如下面傳入的executor.但是傳入的是null,為空則構造出

ThreadPerTaskExecutor(newDefaultThreadFactory())

賦給executor。看下面代碼

class MultithreadEventExecutorGroup{
   
     private final EventExecutor[] children;

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    
     protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {
        ...
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        ...
   }
   
}
           

是以這個executor真實身份是

io.netty.util.concurrent.ThreadPerTaskExecutor

ThreadPerTaskExecutor的類如下

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}
           

到此startThread()就很明确了,就是啟動傳入的command線程。最後就是調用

SingleThreadEventExecutor.this.run();

這個run()在

SingleThreadEventExecutor

裡是

protected abstract void run()

,它的實作在

NIOEventLoop

裡,如下,這是個死循環

public class NIOEventLoop{
protected void run() {
        for (;;) {
            try {
              //死循環從一個 private final Queue<Runnable> taskQueue裡去不斷的擷取任務,taskQueue是哪裡建立出來的?找下它在哪裡
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                       
                        // (OK - no wake-up required).

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
}
           
死循環從一個 private final Queue taskQueue裡去不斷的擷取任務,taskQueue是哪裡建立出來的?找下它在哪裡建立就知道了,它是在下面的方法裡
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        //看這裡構造出taskQueue
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

           

啟動完線程後,就是加入任務了,代碼比較簡單,

往taskQueue裡放任務

protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }
           

到此register的代碼終于可以告一段落,簡單的總結下initandregister的過程

  1. 建立了一個NioServerSocketChannel
  2. 建立了一個NioEventLoopGroup
  3. NioEventLoopGroup裡有個child[]的數組來存放了NioEventLoop
  4. 将NioServerSocketChannel注冊到NioEventLoopGroup
  5. NioServerChannel注冊到NioEventLoopGroup,不是直接注冊,而是從child[]的數組中挑一個NioEventLoop,挑哪個由choser來決定。然後将NioServerChannel注冊到NioEventLoop
  6. 将NioServerChannel注冊到NioEventLoop,也不是直接注冊,而是先啟動一個線程,執行的是NioEventLoop的run()
  7. run()方法是個死循環,會不斷從taskqueue這個隊列裡去擷取到是否有runable的任務
  8. addTask(task) 這個方法才真正的把注冊channel的任務加入到taskqueue裡
  9. run()方法裡檢測到taskqueue隊列裡有任務了就進行相應的處理

run()裡的分析待後續分解

繼續閱讀