天天看點

netty4(二)ServerBootstrap的啟動一

EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
//負責資料讀寫
EventLoopGroup workerGroup = new NioEventLoopGroup();
//服務類
try {
    ServerBootstrap bootstrap = new ServerBootstrap();
    //reactor模型接收和處理
    bootstrap.group(bossGroup, workerGroup)
            //channelFactory屬性為ReflectiveChannelFactory通過反射建立NioServerSocketChannel對象
            .channel(NioServerSocketChannel.class) // (3)
            .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new DiscardServerHandler());
                }
            })
            //完成三次握手後等待隊列的大小,超過了則拒絕,預設50
            .option(ChannelOption.SO_BACKLOG, 128)          // (5)
            .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

    // Bind and start to accept incoming connections.
    ChannelFuture f = bootstrap.bind(5555).sync();       

就從啟動bind()方法開始進入到

AbstractBootstrap      
netty4(二)ServerBootstrap的啟動一

然後進入dobind方法

private ChannelFuture doBind(final SocketAddress localAddress) {
    //傳回DefaultChannelPromise,注冊線程池到channel
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        //在這一點上,我們知道注冊是完整和成功的
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        省略。。。。。
    }
}      

然後我們先執行,在這邊建立channel,然後先init初始化,傳回到

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        //建立之前設定的NioServerSocketChannel對象
        channel = channelFactory.newChannel();
        //服務端,初始化channel屬性,并且設定pipeline,建立ServerBootstrapAcceptor
        //用戶端ChannelInitializer初始化
        init(channel);
    } catch (Throwable t) {
        大緻就是注冊失敗,我們注冊到全局線程去執行
    }
    //ServerBootstrapConfig裡面放了ServerBootstrap,獲得線程池最後到SingleThreadEventLoop裡面把線程池注冊給channel
    //我們建立的channel的pipeline也在這裡加入
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

   
    return regFuture;
}      

調用這裡面的init()方法,主要就是設定channel屬性和初始化一個ChannelInitializer

void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        //之前填的屬性射到channel裡
        setChannelOptions(channel, options, logger);
    }

   省略。。。。

    ChannelPipeline p = channel.pipeline();

    省略。。。。

    //查到head與tail之間,後面會初始化這個ChannelInitializer
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}      

上面執行完了傳回到AbstractBootstrap的initAndRegister方法,接着執行這個

//ServerBootstrapConfig裡面放了ServerBootstrap,獲得線程池最後到SingleThreadEventLoop裡面把線程池注冊給channel
//我們建立的channel的pipeline也在這裡加入
ChannelFuture regFuture = config().group().register(channel);      

先去MultithreadEventLoopGroup的方法裡面選出一個線程池

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}      

最後到AbstractChannel的方法裡面去注冊,這個promise就是傳回用來做sync()的東西,這裡一個線程池就一個線程

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    。。。。。

    AbstractChannel.this.eventLoop = eventLoop;

    //是否啟動這個線程池
    if (eventLoop.inEventLoop()) {
        //注冊channel到select,同時設定promise完成
        register0(promise);
    } else {
        try {
            //送出并且啟動線程池
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
           。。。。。。
        }
    }
}

      

然後調用register0方法

private void register0(ChannelPromise promise) {
    try {
        
        //AbstractNioChannel的方法,注冊到select
        doRegister();
        
        //調用回調函數
        pipeline.invokeHandlerAddedIfNeeded();

        //設定promise完成,避免阻塞
        safeSetSuccess(promise);
        //把ChannelInitializer的東西添加到pipeline
        pipeline.fireChannelRegistered();
       
        //如果通道從未注冊過,則隻啟用通道激活。如果通道被撤銷注冊并重新注冊,這将防止觸發多個通道活動。
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                。。。。。
                //設定讀,還有感興趣的東西accept之類的
                beginRead();
            }
        }
    } catch (Throwable t) {
      。。。。。
    }
}

      

進去doRegister看看

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            //把channel注冊到selector裡面,但為什麼是個0呢傳回之後增加興趣事件
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}      

傳回上一個register0方法這邊初始化剛才添加的管道調用initlize方法,網管道裡面加上一個ServerBootstrapAcceptor

因為管道是分為inbond和outbond的讀取和輸出的時候要搞清楚

//把ChannelInitializer的東西添加到pipeline
pipeline.fireChannelRegistered();      

傳回上一個register0方法,在這邊注冊讀感興趣

//設定讀,還有感興趣的東西accept之類的
beginRead();      

然後這邊運作完了就傳回到doBind方法

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    //這個方法在channelregister()被觸發之前被調用。給使用者處理程式一個機會,在它的通道注冊()實作中設定管道
    //獲得NioEventLoop裡面的線程池
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                //到AbstractChannel裡面,經過pipeline最後應該是給實際的channel注冊位址吧
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}      

然後弄完了再傳回就完成了服務端啟動了