天天看點

Netty 基礎-元件之EventLoop3. 元件

3. 元件

3.1 EventLoop

事件循環對象

EventLoop 本質是一個單線程執行器(同時維護了一個 Selector),裡面有 run 方法處理 Channel 上源源不斷的 io 事件。

它的繼承關系比較複雜

  • 一條線是繼承自 j.u.c.ScheduledExecutorService 是以包含了線程池中所有的方法
  • 另一條線是繼承自 netty 自己的 OrderedEventExecutor,
    • 提供了 boolean inEventLoop(Thread thread) 方法判斷一個線程是否屬于此 EventLoop
    • 提供了 parent 方法來看看自己屬于哪個 EventLoopGroup

事件循環組

EventLoopGroup 是一組 EventLoop,Channel 一般會調用 EventLoopGroup 的 register 方法來綁定其中一個 EventLoop,後續這個 Channel 上的 io 事件都由此 EventLoop 來處理(保證了 io 事件處理時的線程安全)

  • 繼承自 netty 自己的 EventExecutorGroup
    • 實作了 Iterable 接口提供周遊 EventLoop 的能力
    • 另有 next 方法擷取集合中下一個 EventLoop

3.1.1 擷取 NioEventLoop

// 内部建立了兩個 EventLoop, 每個 EventLoop 維護一個線程
DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
           

輸出

[email protected]
io.netty.channel.Defau[email protected]
[email protected]
           

也可以使用 for 循環

DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
for (EventExecutor eventLoop : group) {
    System.out.println(eventLoop);
}
           

輸出

io.netty.channel.Defau[email protected]
[email protected]
           

3.1.2 NioEventLoop 處理 io 事件

伺服器端兩個 nio worker 勞工

package com.itcxc.netty.c3;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;

/**
 * @author chenxc
 * @date 2021/8/17 22:54
 */
@Slf4j
public class EventLoopServer {

    public static void main(String[] args) {
        new ServerBootstrap()
                // 細分1:boos 隻負責ServerSocketChannel上的accept事件  worker隻負責SocketChannel上的讀寫
                .group(new NioEventLoopGroup(1),new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast( "handler1", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                final ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(Charset.defaultCharset()));
                                super.channelRead(ctx, msg);  //讓消息傳遞給下一個handler
                            }
                        });
                    }
                }).bind(8080);
    }

}

           

用戶端,啟動三次,分别修改發送字元串為 zhangsan(第一次),lisi(第二次),wangwu(第三次)

package com.itcxc.netty.c3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

/**
 * @author chenxc
 * @date 2021/8/17 23:01
 */
@Slf4j
public class EventLoopClient {

    public static void main(String[] args) throws InterruptedException {
        //2.帶有Future,Promise的類型都是和異步方法配套使用
        final ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                //1.連接配接到伺服器
                //異步非阻塞的, main線程發起了調用,由nio線程執行連接配接操作
                .connect(new InetSocketAddress("localhost", 8080));
        //2.1 方法一 使用sync方法同步處理結果
        /*channelFuture.sync();  //阻塞目前線程,直到nio線程建立連接配接完畢
        //将channelFuture.sync();注釋後  将無阻塞的向下執行,直接擷取到還沒建立連接配接的channel,導緻資料不知道發送到那裡去
        final Channel channel = channelFuture.channel();
        channel.writeAndFlush("hello world");
        log.debug("{}",channel);*/

        //2.2 方法二 addListener(回調對象)  方法異步處理結果
        //在nio線程建立好連接配接之後,會調用operationComplete方法
        channelFuture.addListener((ChannelFutureListener) future -> {
            final Channel channel = future.channel();
            channel.writeAndFlush("hello world");
            log.debug("{}",channel);
        });

    }

}

           

最後輸出

19:49:25 [DEBUG] [nioEventLoopGroup-4-1] c.i.n.c.EventLoopServer -  zhangsan         
19:49:25 [DEBUG] [nioEventLoopGroup-4-2] c.i.n.c.EventLoopServer -  lisi               
19:49:25 [DEBUG] [nioEventLoopGroup-4-1] c.i.n.c.EventLoopServer -  wangwu        
 
           

可以看到兩個勞工輪流處理 channel,但勞工與 channel 之間進行了綁定

Netty 基礎-元件之EventLoop3. 元件

再增加兩個非 nio 勞工

//建立一個獨立的EventLoopGroup  不處理io事件  處理複雜的業務邏輯,防止因為處理太長,導緻處理io事件太慢
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
        // 細分1:boos 隻負責ServerSocketChannel上的accept事件  worker隻負責SocketChannel上的讀寫
        .group(new NioEventLoopGroup(1),new NioEventLoopGroup(2))
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) {
                ch.pipeline().addLast( "handler1", new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        final ByteBuf buf = (ByteBuf) msg;
                        log.debug(buf.toString(Charset.defaultCharset()));
                        super.channelRead(ctx, msg);  //讓消息傳遞給下一個handler
                    }
                });
                ch.pipeline().addLast(group, "handler2", new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        final ByteBuf buf = (ByteBuf) msg;
                        log.debug(buf.toString(Charset.defaultCharset()));
                        super.channelRead(ctx, msg);
                    }
                });
            }
        }).bind(8080);
           

用戶端代碼不變,啟動三次,分别修改發送字元串為 zhangsan(第一次),lisi(第二次),wangwu(第三次)

輸出

19:49:25 [DEBUG] [nioEventLoopGroup-4-1] c.i.n.c.EventLoopServer - zhangsan
19:49:25 [DEBUG] [defaultEventLoopGroup-2-1] c.i.n.c.EventLoopServer - zhangsan
19:49:25 [DEBUG] [nioEventLoopGroup-4-2] c.i.n.c.EventLoopServer - lisi
19:49:25 [DEBUG] [defaultEventLoopGroup-2-2] c.i.n.c.EventLoopServer - lisi
19:49:25 [DEBUG] [nioEventLoopGroup-4-1] c.i.n.c.EventLoopServer - wangwu
19:49:25 [DEBUG] [defaultEventLoopGroup-2-1] c.i.n.c.EventLoopServer - wangwu
           

可以看到,nio 勞工和 非 nio 勞工也分别綁定了 channel

Netty 基礎-元件之EventLoop3. 元件

3.1.3 NioEventLoop 處理普通任務

NioEventLoop 除了可以處理 io 事件,同樣可以向它送出普通任務

NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);

log.debug("server start...");
Thread.sleep(2000);
nioWorkers.execute(()->{
    log.debug("normal task...");
});
           

輸出

22:30:36 [DEBUG] [main] c.i.o.EventLoopTest2 - server start...
22:30:38 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - normal task...
           
可以用來執行耗時較長的任務

3.1.4 NioEventLoop 處理定時任務

NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);

log.debug("server start...");
Thread.sleep(2000);
nioWorkers.scheduleAtFixedRate(() -> {
    log.debug("running...");
}, 0, 1, TimeUnit.SECONDS);
           

輸出

22:35:15 [DEBUG] [main] c.i.o.EventLoopTest2 - server start...
22:35:17 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
22:35:18 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
22:35:19 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
22:35:20 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
...
           
可以用來執行定時任務

3.1.5 💡 優雅關閉

優雅關閉

shutdownGracefully

方法。該方法會首先切換

EventLoopGroup

到關閉狀态進而拒絕新的任務的加入,然後在任務隊列的任務都處理完成後,停止線程的運作。進而確定整體應用是在正常有序的狀态下退出的

用戶端代碼:

package com.itcxc.netty.c3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Scanner;

/**
 * @author chenxc
 * @date 2021/8/20 22:51
 */
@Slf4j
public class CloseFutureClient {

    public static void main(String[] args) throws InterruptedException {
        final NioEventLoopGroup group = new NioEventLoopGroup();
        final ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));

        final Channel channel = channelFuture.sync().channel();
        log.debug("{}",channel);
        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while (true){
                final String line = scanner.nextLine();
                if ("q".equals(line)){
                    channel.close();  //close是異步的
                    //log.debug("處理關閉之後的操作");  //不能在這裡善後
                    break;
                }else {
                    channel.writeAndFlush(line);
                }
            }
        },"input").start();

        //擷取cloneFuture對象,
        final ChannelFuture closeFuture = channel.closeFuture();
        log.debug("waiting clone");
        //1)同步關閉處理
        /*closeFuture.sync();
        log.debug("處理關閉之後的操作");*/

        // 2)異步關閉處理
        closeFuture.addListener((ChannelFutureListener) future -> {
            log.debug("處理關閉之後的操作");
            //優雅的關閉group裡面的線程,不再接受新的任務,然後再将該處理的處理完
            group.shutdownGracefully();
        });
    }
}

           

3.1.6 💡 netty中的handler 執行中如何換人?

關鍵代碼

io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 下一個 handler 的事件循環是否與目前的事件循環是同一個線程
    EventExecutor executor = next.executor();
    
    // 是,直接調用
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } 
    // 不是,将要執行的代碼作為任務送出給下一個事件循環處理(換人)
    else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
           
  • 如果兩個 handler 綁定的是同一個線程,那麼就直接調用
  • 否則,把要調用的代碼封裝為一個任務對象,由下一個 handler 的線程來調用