天天看點

Netty源碼分析(1)——Bootstrap

開始之前,還是先貼一段netty用戶端的經典使用姿勢,如下:

try {
            // 通過無參構造函數,建立一個Bootstrap執行個體
            Bootstrap b = new Bootstrap();
            // 設定EventLoop線程組
            b.group(new NioEventLoopGroup())
            // 設定具體使用的Channel子類
             .channel(NioSocketChannel.class)
            // 設定tcp的參數
             .option(ChannelOption.TCP_NODELAY, true)
            // 設定資料處理的handler
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                    p.addLast(new EchoClientHandler());
                 }
             });
            // 建立與服務的的連接配接
            ChannelFuture f = b.connect(HOST, PORT).sync();
            // 等待channel關閉
            f.channel().closeFuture().sync();
        } finally {
            // 關閉EventLoop線程組
            group.shutdownGracefully();
        }
           

從上述代碼可以看出,netty用戶端的使用很友善,由于用戶端需要配置的參數較多,是以Bootstrap提供了一個無參構造函數,而具體的參數配置,則通過build模式進行各自獨立配置。

group配置的就是用來執行I/O操作的線程池,每個線程以EventLoop的形式存在,配置代碼如下:

public B group(EventLoopGroup group) {
    if (group == null) {
        throw new NullPointerException("group");
    }
    if (this.group != null) {
        throw new IllegalStateException("group set already");
    }
    this.group = group;
    return (B) this;
}
           

做了一些非null的校驗,如果傳入的group為null,以及如果已經被設定過,則都會抛出異常,校驗通過之後,則進行屬性指派。

對于channel的配置,會根據傳入的class,建構一個工廠類,然後指派給channelFactory這個屬性,指派過程與group指派過程類似。

而工廠類,則會根據需要,通過反射的機制,得到一個channel具體子類的執行個體,用于建立連接配接。

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new BootstrapChannelFactory<C>(channelClass));
}

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return (B) this;
}
           

option就是配置tcp相關的參數,就是一系列的key-value對。

handler配置的就是用戶端想要對I/O過程中的資料的處理邏輯,就是一系列的ChannelHandler構成的清單。

Bootstrap在經過上述一系列配置後,各項準備工作已經就緒,接下來重點分析其connect過程。

public ChannelFuture connect(String inetHost, int inetPort) {
    return connect(new InetSocketAddress(inetHost, inetPort));
}

public ChannelFuture connect(SocketAddress remoteAddress) {
    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    }
    validate();
    return doConnect(remoteAddress, localAddress());
}
           

connect的調用鍊如上,最終會通過調用doConnect方法進行connect操作,在doConnect之前,會調用validate方法進行校驗操作,主要是對group、channelFactory、handler這三個必須配置進行校驗,看是否做了配置,代碼如下:

public Bootstrap validate() {
    super.validate();
    if (handler() == null) {
        throw new IllegalStateException("handler not set");
    }
    return this;
}

public B validate() {
    if (group == null) {
        throw new IllegalStateException("group not set");
    }
    if (channelFactory == null) {
        throw new IllegalStateException("channel or channelFactory not set");
    }
    return (B) this;
}
           

在上述校驗通過後,繼續看connect真正發生的地方,也就是doConnect方法裡做了什麼。

private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    /**
     * 建立 NioSocketChannel , 并将該channel 注冊到一個 eventLoop 上去
     * 傳回的 ChannelFuture 子類為 DefaultChannelPromise
     * 這步的操作都是本機操作,還不涉及到網絡操作
     */
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    final ChannelPromise promise = channel.newPromise();

    /**
     * 根據注冊時,傳回的channelFuture的狀态來決定,進行connect操作
     * 這裡都是完全異步的,如果注冊還沒有完成,則會監聽注冊的狀态,在注冊完成時,才會進行connect操作
     */
    if (regFuture.isDone()) {
        doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
    } else {
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
            }
        });
    }
    return promise;
}
           

這段代碼很短,是進行connect操作的入口,主要分為兩大步驟,register和connect操作。

1、register就是将建立的NioSocketChannel注冊到NioEventLoop上,最終就是将socket注冊到一個selector上,
2、connect則是建立與遠端伺服器的網絡連接配接,我的了解,就是與遠端主機完成了3次握手的操作。

從上述這段代碼可以看出,register和connect操作都是異步操作,如調用initAndRegister方法後,傳回的是一個regFutrure,這是一個異步調用的傳回結果,接着對其做了判斷,如果其中的異常不為null,說明register過程出錯,則直接就講regFutrure作為整個doConnect方法的傳回值,也就是結束了connect操作。判斷如下:

if (regFuture.cause() != null) {
        return regFuture;
    }
           

如果regFutrure沒有異常,則會再判斷是否已經done,如果done,則直接調用doConnect0方法進行connect操作,如果沒有完成,則在regFutrure上添加了一個ChannelFutureListener,主要邏輯就是在regFuture完成時,調用doConnect0方法,這裡需要注意下,在這之前,通過如下代碼:

建立了一個ChannelPromise的執行個體promise,并作為doConnect0的第五個入參,然後doConnect方法的傳回值也是這個promise。

從這個過程可以看出,register和connect過程都是異步操作,後面具體分析裡面的過程時,會更加明确這一點。

這裡簡單說明下ChannelFuture和ChannelPromise,這兩個都是異步執行的方法結果,其中ChannelPromise是ChannelFuture的子類,ChannelFuture隻能擷取結果,不能對異步傳回結果進行操作,而ChannelPromise對其進行了擴充,可以對異步傳回結果進行設定等操作。

接下裡,先來看register的過程,主要邏輯在initAndRegister方法中,代碼如下:

final ChannelFuture initAndRegister() {
    // 1、通過channelFactory建立一個NioSocketChannel的執行個體
    final Channel channel = channelFactory().newChannel();
    try {
        // 2、初始化channel
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        return channel.newFailedFuture(t);
    }
    // 3、對channel進行注冊
    ChannelFuture regFuture = group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}
           
1、根據Bootstrap的配置,用channelFactory,利用反射建立一個NioSocketChannel的執行個體;
2、對建立好的NioSocketChannel的執行個體進行初始化操作,代碼如下:
void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(handler());

    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
            try {
                if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            } catch (Throwable t) {
                logger.warn("Failed to set a channel option: " + channel, t);
            }
        }
    }

    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
    }
}
           

初始化的邏輯很明确,

a、把配置的handler添加到channel的pipeline中,用于後續資料處理; b、把配置的tcp的參數添加到channel的配置中; c、把配置的屬性添加到channel的屬性中。

3、上述初始化完成之後,則進行channel的注冊過程,代碼如下:
ChannelFuture regFuture = group().register(channel);
           

其中group方法傳回的就是Bootstrap在初始時傳入的NioEventLoopGroup對象,其調用register方法,最終是調用了SingleThreadEventLoop的register方法,如下:

public ChannelFuture register(Channel channel) {
    return register(channel, new DefaultChannelPromise(channel, this));
}
           

在register的入參中建立了一個DefaultChannelPromise執行個體,該執行個體也就是該register方法後面會傳回的ChannelFuture。

public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    if (promise == null) {
        throw new NullPointerException("promise");
    }

    channel.unsafe().register(this, promise);
    return promise;
}
           

在調用了unsafte的register方法後,就會把這個promise傳回。接下來繼續看register裡做了什麼。

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 1
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    // 2
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    // 3
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }
    // 4
    AbstractChannel.this.eventLoop = eventLoop;
    // 5
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new OneTimeTask() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            // 6
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
           
1、判斷eventloop是否為null,如果為null,則抛出異常;
2、判斷該channel是否已經被注冊過,一個channel隻能被注冊一次,不能注冊到多個eventloop上,是以如果注冊,則抛出異常;
3、相容性判斷,看eventloop是否是一個NioEventLoop的執行個體;

protected boolean isCompatible(EventLoop loop) {

return loop instanceof NioEventLoop;

}

4、将eventloop指派給channel的eventloop屬性;
5、這裡就是register實作異步的地方,判斷目前執行現場是否就是eventloop的線程,如果是,則進行調用register0方法,如果不是,則将register0方法封裝成一個task,送出給eventloop來執行,注意register0的入參就是前面傳入的promise,是以可以根據這個promise來判斷register是否完成,以及是否成功等;
6、如果注冊出現異常,則會做一些收尾工作,如關閉channel,promise中設定fail标志等。

是以到這裡,使用者執行的register的動作已經完成了,但真正的register操作還沒有發生,被送出給eventloop去異步執行了,但是會傳回一個ChannelFuture的子類執行個體promise,可以用來檢測注冊的完成情況。

再繼續跟進到register0裡,看看做了什麼操作。

private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        // 1
        doRegister();
        // 2
        registered = true;
        // 3
        safeSetSuccess(promise);
        // 4
        pipeline.fireChannelRegistered();
        if (isActive()) {
            // 5
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}
           
1、doRegister方法,進行真正的注冊,實質就是調用了java nio中的channel的register方法,将其注冊到selector上去,這裡需要注意的是,注冊時,register的第二個參數是設定感興趣的操作,這裡設定的是0,說明沒有設定任何感興趣的操作,這裡隻是簡單的完成了注冊的動作,對于感興趣的操作的設定是在fireChannelActive中設定的,後續會分析到。
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, , this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}
           
2、注冊成功後,會将channel的registered标志設定為true;
3、在promise中設定注冊成功的辨別,這樣在前面doConnect方法中,監聽注冊情況的那個listener就可以開始進行doConnect0方法的執行了;
4、上述動作完成之後,就會調用pipeline的fireChannelRegistered方法,在pipeline的ChannelHandler鍊中,依次處理channel注冊完成的操作;
5、接着判斷該channel是否處于活躍狀态,也就是該channel中包含的ch是否處于open和connect的狀态,一般情況下,第3步結束後才觸發了doConnect0操作,是以一般這裡的判斷都是false,也就是不會觸發fireChannelActive操作,但也不是絕對。
public boolean isActive() {
    SocketChannel ch = javaChannel();
    return ch.isOpen() && ch.isConnected();
}
           

上述就是注冊的過程,在注冊完成之後,就會觸發connect操作,來繼續看下doConnect0方法中做了什麼。

private static void doConnect0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                if (localAddress == null) {
                    channel.connect(remoteAddress, promise);
                } else {
                    channel.connect(remoteAddress, localAddress, promise);
                }
                promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}
           

這裡做的更直白,直接封裝了一個task,然後送出給eventloop去異步執行,然後就傳回了。

前面我們分析過,在register完成之後,才會觸發doConnect0操作,當這裡封裝的任務開始執行時,首先就是對regFuture的狀态進行判斷,看是成功還是失敗,成功了,才會繼續執行connect操作,調用channel的connect操作,就是調用的該channel的pipeline的connect操作,對于connect是一個使用者發起的動作,是以是一個outbound的操作,outbound操作都是從tail開始,傳遞到head,是以真正的操作是在head的connect方法中。

public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}

public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}
           

而head的connect方法中,則是直接調用了unsafe的connect方法,繼續向下看。

public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}
           

unsafe的connect的方法如下:

public void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    // 1、設定channel不能取消标志,并判斷channel是否處于open狀态
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }
    try {
        // 2、如果connectPromise不為null,則說明已經做過連接配接,則這裡就要抛出異常了
        if (connectPromise != null) {
            throw new IllegalStateException("connection attempt already made");
        }
        // 3、判斷目前channel是否活躍,也就是要open和connect都完成,才是true,才開始連接配接,是以wasActive應該是false
        boolean wasActive = isActive();
        // 4、進行連接配接操作,下面會詳細分析裡面内容
        if (doConnect(remoteAddress, localAddress)) {
            // 5、連接配接成功,則會觸發channelActive事件,以及将selectionKey的監聽事件中加入read事件
            fulfillConnectPromise(promise, wasActive);
        } else {
            // 6、如果目前沒有連接配接成功
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // Schedule connect timeout.
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > ) {
                /**
                 * 如果逾時時間大于0, 則設定一個定時任務, 在逾時時間時, 檢查連接配接是否成功,
                 * 如果還沒有連接配接上, 則會抛出連接配接逾時異常, 這是netty自己做的一個逾時檢查任務
                 */
                connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
                    @Override
                    public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause =
                                new ConnectTimeoutException("connection timed out: " + remoteAddress);
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }

            promise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                }
            });
        }
    } catch (Throwable t) {
        if (t instanceof ConnectException) {
            Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
            newT.setStackTrace(t.getStackTrace());
            t = newT;
        }
        promise.tryFailure(t);
        closeIfClosed();
    }
}
           
4、進行連接配接操作的具體邏輯如下,如果連接配接成功了,則傳回true,由于channel是非阻塞的,是以暫時沒有連接配接成功的,會注冊OP_CONNECT事件,等待其連接配接成功。
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        javaChannel().socket().bind(localAddress);
    }
    boolean success = false;
    try {
        boolean connected = javaChannel().connect(remoteAddress);
        if (!connected) {
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}
           
5、如果連接配接當場就成功了,則會fullFillConnectPromise操作,如下:
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
    if (promise == null) {
        return;
    }
    // 1、嘗試設定連接配接成功的辨別,讓使用者調用的地方,知道連接配接成功了
    boolean promiseSet = promise.trySuccess();
    // 2、如果在連接配接之前不處于活躍狀态,而現在處于活躍狀态了,則觸發fireChannelActive操作
    if (!wasActive && isActive()) {
        pipeline().fireChannelActive();
    }
    // 3、如果第2步設定失敗,則會關閉channel
    if (!promiseSet) {
        close(voidPromise());
    }
}
           
6、在目前沒有連接配接成功的情況下,netty對這種異步連接配接的操作,做了一個連接配接逾時的檢測,就是設定了一個定時任務,用來判斷連接配接是否成功。
if (connectTimeoutMillis > ) {
    /**
     * 如果逾時時間大于0, 則設定一個定時任務, 在逾時時間時, 檢查連接配接是否成功,
     * 如果還沒有連接配接上, 則會抛出連接配接逾時異常, 這是netty自己做的一個逾時檢查任務
     */
    connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
        @Override
        public void run() {
            ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
            ConnectTimeoutException cause =
                    new ConnectTimeoutException("connection timed out: " + remoteAddress);
            if (connectPromise != null && connectPromise.tryFailure(cause)) {
                close(voidPromise());
            }
        }
    }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
           

在設定netty的用戶端時,有一個參數 CONNECT_TIMEOUT_MILLIS 就是用來設定這裡的逾時時間的,如果設定過小,而網絡較差時,則有可能會出現 ConnectTimeoutException 的異常,可以根據網絡環境,适當設定該參數的大小。

在channel被注冊的selector上去時,并沒有設定OP_READ事件,那在哪裡設定的哪?

在第5步中,講到了fireChannelActive這個操作,來繼續看看裡面做了什麼。

public ChannelPipeline fireChannelActive() {
    head.fireChannelActive();

    if (channel.config().isAutoRead()) {
        channel.read();
    }

    return this;
}
           

可以看到,先會調用pipeline上的所有ChannelHandler的channelActive方法。

然後會判斷是否自動讀,該配置預設是true,是以會進入channel的read方法,繼續看下去。

public Channel read() {
    pipeline.read();
    return this;
}
           

直接調用了pipeline的read方法,由于是一個outbound事件,是以最終是調用head的read方法,進入看看。

public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}


public void beginRead() {
    // Channel.read() or ChannelHandlerContext.read() was called
    readPending = true;
    super.beginRead();
}
public void beginRead() {
    if (!isActive()) {
        return;
    }
    try {
        doBeginRead();
    } catch (final Exception e) {
        invokeLater(new OneTimeTask() {
            @Override
            public void run() {
                pipeline.fireExceptionCaught(e);
            }
        });
        close(voidPromise());
    }
}
           

經過一層層的調用,最終是調用了doBeginRead方法,實作如下:

protected void doBeginRead() throws Exception {
    if (inputShutdown) {
        return;
    }

    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == ) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
           

這裡找到了設定感興趣事件的地方。

selectionKey.interestOps(interestOps | readInterestOp);
           

至此,netty用戶端建立與服務端的連接配接操作就結束了。

總結如下:

1、netty中,處處都是異步操作,這樣就能了解為什麼netty中到處都ChannelFuture了;
2、連接配接過程中,主要分為兩大步驟,先register,且register成功,再connect;
3、真正的register和connect操作,都是封裝成一個task,送出給eventloop去執行的;
4、connect timeout的檢測是netty自己實作的一套機制,通過一個定時任務來檢測實作的;
5、對于OP_READ事件,在channel已經建立連接配接好,觸發channelActive操作時,才會添加到事件檢測中的。

繼續閱讀