天天看點

【超詳細解釋】從Java NIO到Netty的每一步 (八)

這裡寫目錄标題

  • ​​Netty 源碼解讀​​
  • ​​bind啟動源碼​​
  • ​​1.初始化和注冊邏輯​​
  • ​​1.1建立ServerSocketChannel​​
  • ​​1.2注冊selector​​
  • ​​2.端口綁定邏輯​​
  • ​​2.1端口綁定​​
  • ​​2.2在pipeline傳播ChannelActive事件​​
  • ​​關注我日常更新分享Java程式設計技術!希望大家都能早日走上人生巅峰!​​
【超詳細解釋】從Java NIO到Netty的每一步 (八)

Netty 源碼解讀

bind啟動源碼

我們的服務是通過下面代碼進行綁定端口并啟動的,是以我們從bind()方法開始分析

ChannelFuture f = b.bind(port).sync();      

一直跟進到io.netty.bootstrap.AbstractBootstrap#doBind方法

private ChannelFuture doBind(final SocketAddress localAddress) {
        //1 初始化和注冊邏輯
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) 
            ChannelPromise promise = channel.newPromise();
            //2 端口綁定邏輯
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                         promise.setFailure(cause);
                    } else {
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }      

1.初始化和注冊邏輯

繼續跟進initAndRegister()方法,根絕下面的注釋,你類比一下Java NIO的代碼,你會不會有莫名的親切感?

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            //(1)建立ServerSocketChannel
            channel = channelFactory.newChannel();
            //初始化Channel,不展開說明了
            init(channel);
        } catch (Throwable t) {
            if (channel != null) 
                channel.unsafe().closeForcibly();
            }
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        //(2)注冊selector
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }      

1.1建立ServerSocketChannel

回看一下抛棄服務的代碼 ​​

.channel(NioServerSocketChannel.class)      

這裡就是把這個NioServerSocketChannel執行個體化,見io.netty.channel.ReflectiveChannelFactory#newChannel

@Override
    public T newChannel() {
        try {
            //這裡這個clazz指的就是NioServerSocketChannel.class,具體怎麼傳進來的,大家可以自己看代碼,這裡不介紹了。
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }      

通過反射進行執行個體化我們就直接看NioServerSocketChannel的構造函數即可看看都幹了什麼,見io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel()

/**
     * Create a new instance
     */
    public NioServerSocketChannel() {
        這裡主要幹了兩件事情:
        (1)newSocket(DEFAULT_SELECTOR_PROVIDER),其實這裡就是建立了Java NIO中的ServerSocketChannel
        (2)this方法繼續調用構造函數
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    //(2)處會調用這個構造函數
    public NioServerSocketChannel(ServerSocketChannel channel) {
        //我們主要關心這個調用父類的構造函數幹了什麼,看見SelectionKey.OP_ACCEPT有沒有很熟悉的樣子
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }      

見io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel

/**
     * Create a new instance
     *
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
     */
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        //我們仍然繼續往裡跟進
        super(parent);
        this.ch = ch;
        //這個地方把SelectionKey.OP_ACCEPT事件儲存到成員變量中了哦
        this.readInterestOp = readInterestOp;
        try {
            //設定成非阻塞模式,有沒有感覺又很熟悉了?
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }      

見io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)

/**
     * Creates a new instance.
     *
     * @param parent
     *        the parent of this channel. {@code null} if there's no parent.
     */
    protected AbstractChannel(Channel parent) {
        //這裡把Java NIO的ServerSocketChannel儲存進了成員變量
        this.parent = parent;
        //給這個Channel一個id
        id = newId();
        //這個先忽略,以後介紹
        unsafe = newUnsafe();
        //看看這個地方,是不是生成了一個Pipeline和一個Channel關聯了?
        pipeline = newChannelPipeline();
    }      

我們來心中回顧一下建立一個NioServerSocketChannel都幹了哪些事情:

  1. 建立了一個Java NIO底層的SeverSocketChannel,作為自己的成員變量;
  2. 給自己弄了個id;
  3. 建立了一個Pipeline作為自己的成員變量;
  4. 設定成了非阻塞模式。

1.2注冊selector

​config().group()​

​​拿到的就是我們的bossGroup(具體怎麼區分的,可以自己通過丢棄服務代碼b.group(bossGroup, workerGroup)跟進去看一下),那整個​

​config().group().register(channel)​

​;我們就可以了解為在EventLoop上注冊我們的Channel。

接下來我們先忽略Netty如何選擇bossGroup哪個EventLoop直接跳到在EventLoop上注冊的方法,見io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)

@Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        //用NioServerSocketChannel的unsafe注冊,注意這個地方傳參是把目前的EventLoop和NioServerSocketChannel都傳進去了
        promise.channel().unsafe().register(this, promise);
        return promise;
    }      

繼續跟進,見io.netty.channel.AbstractChannel.AbstractUnsafe#register

@Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            
            ...
            AbstractChannel.this.eventLoop = eventLoop;

            //現在咱們再mian主線程裡,是以會走else
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try { 
                    //這個地方涉及到eventLoop的啟用,我們下一章講
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            //是以會在目前eventLoop進行注冊
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    ...
                }
            }
        }      

往register0裡繼續跟,見io.netty.channel.AbstractChannel.AbstractUnsafe#register0

private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                //其他不關心繼續往裡跟這個方法
                doRegister();
                ...
        }      

跟進doRegister(),見io.netty.channel.nio.AbstractNioChannel#doRegister

@Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                //看到這段代碼是不是又有莫名的親切感,讓我們回想一下《從Java NIO到Netty(二)》的内容
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
               ...
        }
    }      

在上述代碼中,javaChannel()拿到的就是Java NIO的ServerSocketChannel,這個register方法也是Java NIO的那個register方法,把這個ServerSocketChannel注冊到目前eventLoop的selector上面去。

那可能有人問了,ServerSocketChannel應該把OP_ACCEPT事件注冊到selector上面去,這裡怎麼展現的呢?别着急,往下看。

2.端口綁定邏輯

當建立完NioServerSocketChannel後,會調用doBind0把建立好的NioServerSocketChannel傳進來,見io.netty.bootstrap.AbstractBootstrap#doBind0 ​​

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    //在NioServerSocketChannel的eventLoop上執行bind
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }      

繼續跟進,見io.netty.channel.AbstractChannel.AbstractUnsafe#bind

@Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            ...
            boolean wasActive = isActive();
            try {
                //(1)端口綁定
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }

            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        (2)在pipeline傳播ChannelActive事件:主要關心如何把OP_ACCEPT事件注冊到selector上面的
                        pipeline.fireChannelActive();
                    }
                });
            }

            safeSetSuccess(promise);
        }      

2.1端口綁定

繼續往裡跟,見io.netty.channel.socket.nio.NioServerSocketChannel#doBind

@Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        //這個地方是不是又感覺很熟悉?就是調用Java NIO的bind進行端口綁定的,見《從Java NIO到Netty(一)》
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }      

2.2在pipeline傳播ChannelActive事件

往裡跟,見io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive ​

@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //這個地方就是把事件繼續向下一個連結清單節點傳播
        ctx.fireChannelActive();
        //主要看這個
        readIfIsAutoRead();
    }
    private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                //往這裡面走
                channel.read();
            }
    }      
@Override
    protected void doBeginRead() throws Exception {
        //這個官方注釋也表述很清楚了,這兩個方法最終會調用到這個地方
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            //還記得我們之前建立NioServerSocketChannel時把SelectionKey.OP_ACCEPT事件指派給了這個readInterestOp嗎?看這個地方就把這個事件加到這個selectionKey中了
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }      
//打開ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//監聽端口9999
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
//設定為非阻塞模式
serverSocketChannel.configureBlocking(false);
//建立Selector
Selector selector = Selector.open();
//将選serverSocketChannel注冊到selector,并在注冊過程中指出該serverSocketChannel可以進行Accept操作
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);      

關注我日常更新分享Java程式設計技術!希望大家都能早日走上人生巅峰