天天看点

Netty原理简析Netty做了什么?源码分析

文章目录

  • Netty做了什么?
  • 源码分析
    • Server端启动过程
    • IO写流程
    • IO读流程
    • 总结

Netty做了什么?

Netty是以NIO为基础的网络通信框架,对jdk提供的的NIO的API做了很多完善和抽象,如果没有Netty我们需要搞定下面这些事:

  • 如何设计这个框架,以保证可维护性和可扩展性
  • 如何定义线程模型:哪些线程处理accept事件、哪些线程读写io、哪些线程处理业务逻辑
  • 解决jdk自带的NIO的一些bug,比如空轮询bug
  • 怎么做超时检测、异常重连等

而Netty将client端与server端的整个通信流程进行了抽象,设计了一套reactor线程模型来接收channel的accept、读、写等IO事件,抽象了一个由ChannelHandler组成的处理链pipeline来处理IO事件,下面是Netty Server端的处理流程图图(Client端没有Boss NioEventLoopGroup):

Netty原理简析Netty做了什么?源码分析

结合上图,Server端处理IO的核心流程可以简单总结如下:

  • Boss NioEventLoopGroup

    负责接受channel的Accept事件,将

    accepted

    的channel交给

    Worker NioEventLoopGroup

    中的一个

    NioEventLoop

    ,作为Worker的

    NioEventLoop

    都有自己的

    Selector

    channel

    会在这个

    Selector

    上注册读写事件
  • 作为Worker的

    NioEventLoop

    会不停的轮询

    Selector

    上的事件,调用

    processSelectedKeys

    方法来处理IO事件,根据入站及出站场景,最终交给

    pipeline

    来调用

    ChannelHandler

    链来处理

在这种设计下,开发人员的实际工作量就是根据业务特点来扩展自己的

ChannelHandler

,实现对应的事件处理方法,并按照顺序注册到

pipeline

里(比如入站时,必须是先解码再交给业务handler,这个顺序不能反过来),比如dubbo扩展的自己的序列化和反序列化的handler以及包含业务线程池逻辑的

AllChannelHandler

先要介绍下Netty的核心组件

ChannelHandler

接口,该接口定义了IO的基本操作,又分为两个子接口,分别代表入站(读数据流程)和出站(写数据流程)时的操作,类图如下:

Netty原理简析Netty做了什么?源码分析

ChannelInboundHandler

接口里定义了入站流程时的各种事件:

Netty原理简析Netty做了什么?源码分析

ChannelOutboundHandler

接口里定义了入站流程时的各种事件:

Netty原理简析Netty做了什么?源码分析

ChannelHandler

在加入

pipeline

的链时会包装成

AbstractChannelHandlerContext

类,

AbstractChannelHandlerContext

类是一个双向链表,

pipeline

里维护了一个head和tail节点,以此来找到处理链的入口,最终pipeline的内部结构图如下:

Netty原理简析Netty做了什么?源码分析

对整个Netty处理流程有个大概认识后,我们分析下源码

源码分析

结合官方文档里的例子来debug下原理,我debug 的Netty版本是4.1.25.Final,官方文档里一步步教我们怎么从一个最简单的只收不发的demo到最后涉及到序列化、反序列化、拆包逻辑的接收和发送的demo,我这里将官网文档上的demo整理到了github上,有兴趣的可以下载下来自己debug,先说下demo里的几个类的用途,然后分析源码:

Netty原理简析Netty做了什么?源码分析
  • TimerServer:负责启动一个Netty Server端
  • TimeServerHandler:自定义的Server端的处理器,负责在有client接入时,写入一个UnixTime对象
  • TimeEncoder:自定义的编码handler,Server端通过这个类将UnixTime对象编码成byte
  • TimeClient:负责启动一个Netty Client端
  • TimeClientHandler:自定义的client端的一个处理器,负责读取io数据并打印出来
  • TimeDecoder:自定义解码类,client端通过这个类来将IO中读取的byte并反序列化成UnixTime对象
  • UnixTime:时间对象

这个项目比较简单,就是Sever端在接收到Client端的连接后,会通过

TimeServerHandler

向通道中写入

UnixTime

对象,再通过

TimeEncoder

UnixTime

对象编码后通过通道发送出去,Client端从通道中读取数据并解码成UnixTime对象然后打印出来

这里以Server端的启动、Server端的写流程、Client端的读流程三步来分析源码。

Server端启动过程

TimerServer

配置Netty Server端的核心组件,然后启动Server端,代码如下:

// boss
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        // worker
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // Netty Server端
            ServerBootstrap b = new ServerBootstrap(); // (2)
            // 设置group
            b.group(bossGroup, workerGroup)
                    // 设置当前为Server端的Channel类型是NioServerSocketChannel
                    .channel(NioServerSocketChannel.class) // (3)
                    // 设置所有accepted的Channel的pipeline处理链(需要经过那些ChannelHandle处理)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeEncoder(), new TimeServerHandler());
                        }
                    });
            // 绑定端口,并初始化NioServerSocketChannel的pipeline,注册事件监听
            ChannelFuture f = b.bind(port).sync(); // (7)

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            // 关闭
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
           

可以看出在Server端启动过程中主要做了下面几件事:

  • 配置Boss和Worker的

    NioEventLoopGroup

  • 为accepted的channel的pipeline设置

    ChannelHandler

  • 通过

    bind(port)

    方法来完成绑定逻辑,这个方法比较核心,主要逻辑如下:
    • 实例化

      NioServerSocketChannel

    • 通过init方法,为

      NioServerSocketChannel

      初始化pipeline配置,包括用来初始化

      ChannelHandler

      链的

      ChannelInitializer

      类(这个类会添加重要的

      ServerBootstrapAcceptor

      ,主要负责将accepted的channel注册到worker里的某个NioEventLoop的Selector上)
    • 从Boss中选一个

      NioEventLoop

      ,并将

      NioServerSocketChannel

      注册到Selector上
    • 注册到Selector后,激活init里的

      ChannelInitializer

      配置,完善

      ChannelHandler

    • NioServerSocketChannel

      绑定端口

这里将最重的绑定端口的逻辑放在了最后,只有当初始化配置、注册Selector成功后才执行端口绑定逻辑,这个过程中大量使用异步操作(用NioEventLoop的异步执行)和事件通知的方式,下面看下源码。

b.bind(port)

源码,最终调用到

doBind

方法:

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 核心逻辑,初始化NioServerSocketChannel,并注册到一个Selector上
    final ChannelFuture regFuture = initAndRegister();
    ...
    // 异步等待结果,并添加结果Listener,根据最终的成功或失败做对应的逻辑
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    // 为regFuture添加operationComplete事件的监听器,只有注册Selector和初始化完成后才根据结果执行绑定端口的逻辑
    regFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            Throwable cause = future.cause();
            if (cause != null) {
                // 设置失败状态
                promise.setFailure(cause);
            } else {
                //
                promise.registered();
                // 绑定端口
                doBind0(regFuture, channel, localAddress, promise);
            }
        }
    });
    return promise;
  
}
           

initAndRegister()

代码如下:

这个方法分为init 和 register两步:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    ...
    // 反射构造器,实例化NioServerSocketChannel
    channel = channelFactory.newChannel();
    // 为NioServerSocketChannel初始化pipeline的handler链
    init(channel);
    ...
    ChannelFuture regFuture = config().group().register(channel);
    ...
    return regFuture;
}

           

init(channel)

代码如下:

这个方法最关键的逻辑是为

NioserverSocketChannel

pipeline

中添加

ServerBootstrapAcceptor

,这个类主要负责将accepted的channel注册到worker里的某个

NioEventLoop

Selector

上,给woker分配任务

@Override
    void init(Channel channel) throws Exception {
        ...

        ChannelPipeline p = channel.pipeline();

        ...
        // 往NioserverSocketChannel的pipeline中添加ServerBootstrapAcceptor
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                // 异步添加ServerBootstrapAcceptor
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
           

register(channel)代码如下:

@Override
  public ChannelFuture register(Channel channel) {
      return next().register(channel);
  }
  
 
private void register0(ChannelPromise promise) {
    ...
    // 调用java的channel的api,注册选中的NioEventLoop的Selector上
    doRegister();
    
    ...
    // 完善channel的pipeline里的channelHandler链,调用上面添加的ChannelInitializer的initChannel方法,添加ServerBootstrapAcceptor
    pipeline.invokeHandlerAddedIfNeeded();

    ...
    // 注册成功,启动监听结果的Listener,包括上面b.bind(port)代码里的ChannelFutureListener
    safeSetSuccess(promise);
    // 调用Registered事件处理链
    pipeline.fireChannelRegistered();
    ...
}
  
           

绑定端口:

异步执行端口绑定的逻辑

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}
           

到这里Server端启动的过程就完了,整个流程很简单,只需要自己少量的配置和一些自定义的ChannelHandler。总结下核心逻辑:

  • 实例化

    NioServerSocketChannel

  • NioServerSocketChannel

    注册到Boss里的一个NioEventLoop的Selector上
  • 完善pipeline的

    ChannelHandler

    链,包括最重要的

    ServerBootstrapAcceptor

  • NioServerSocketChannel

    绑定端口

源码中用了大量的异步、事件驱动的实现方式将这几个核心过程串起来。

IO写流程

读写流程最终都是通过pipeline维护的

ChannelHandler

链来处理,这个项目里Server端在接收到Client端连接后会向通道写入一个

UnixTime

对象,代码在自定义的

TimeServerHandler

里,

channelActive

方法会在与Client端连接成功时触发,我们通过这个简单逻辑来看下写流程原理,代码如下:

/**
 * @author HJ
 * @date 2021-06-05
 **/
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
    // 在client端连接成功后,往通道中写入当前时间
        ChannelFuture f = ctx.writeAndFlush(new UnixTime());
        f.addListener(ChannelFutureListener.CLOSE);
    }
}

           

从代码中可以看出写入的是

UnixTime

对象,肯定还要经过序列化成byte通过socket发送,所以在Server启动的代码里会加入负责序列化和编码的

TimeEncoder

类。

.childHandler(new ChannelInitializer<SocketChannel>() {
  @Override
  public void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast(new TimeEncoder(), new TimeServerHandler());
  }
});
           

UnixTime

里就是一个时间戳,代码如下:

public class UnixTime {
    private final long value;
...
}

           

所以accepted的channel的

pipeline

结构如下:

Netty原理简析Netty做了什么?源码分析

我们看下

ctx.writeAndFlush(new UnixTime())

的过程,

ctx

pipeline

ChannelHandler

链中的一个链表节点(见上面插图),

invokeWriteAndFlush

方法最终会被分成

write

flush

两部分:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}
           

invokeWrite0(msg, promise);

负责将数据写入待flush区域,等待最终的flush操作才会真正通过socket发送,最终会调到下面的方法:

private void write(Object msg, boolean flush, ChannelPromise promise) {
  // 获取下一个出站的ChannelHandler,源码在下面
  AbstractChannelHandlerContext next = findContextOutbound();
  final Object m = pipeline.touch(msg, next);
  // executor就是绑定的NioEventLoop
  EventExecutor executor = next.executor();
  // 判断是否是NioEventLoop自己执行的,如果是的话就直接执行,不是的话就封装成一个task交由NioEventLoop异步执行
  if (executor.inEventLoop()) {
      if (flush) {
          next.invokeWriteAndFlush(m, promise);
      } else {
          next.invokeWrite(m, promise);
      }
  } else {
  // 封装成一个task交由NioEventLoop异步执行
      AbstractWriteTask task;
      if (flush) {
          task = WriteAndFlushTask.newInstance(next, m, promise);
      }  else {
          task = WriteTask.newInstance(next, m, promise);
      }
      safeExecute(executor, task, promise, m);
  }
}

// 由于是出站操作,是从tail往head方向遍历链表,找到出站的ChannelHandler
private AbstractChannelHandlerContext findContextOutbound() {
  AbstractChannelHandlerContext ctx = this;
  do {
      ctx = ctx.prev;
  } while (!ctx.outbound);
  return ctx;
}
           

方法的主要逻辑是在

ChannelHandler

链中找到下一个出站的

ChannelHandler

并交由其处理,根据上面的

pipeline

图可以看出下一个出站操作就是

TimeEncoder

(负责将pojo对象转成byte),debug截图如下:

Netty原理简析Netty做了什么?源码分析

TimeEncoder

代码如下:

就是将

UnixTime

这个pojo对象转成byte写入到buffer里

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {

    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) throws Exception {
        out.writeInt((int)msg.value());
    }
}

           

最终调用到head节点的write方法,debug图如下:

Netty原理简析Netty做了什么?源码分析

这里的并没有将消息真正通过socket发出去,而是将消息封装成一个

Entry

,放到一个待flushed的链表里,debug如下:

Netty原理简析Netty做了什么?源码分析

下面接着看

invokeFlush0();

方法:

最终会调用到head节点的flush方法:

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}
           

Netty的

Unsafe

类是直接与

SocketChannel

交互的类,最终调用的代码如下:

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
  SocketChannel ch = javaChannel();
  int writeSpinCount = config().getWriteSpinCount();
  do {
     ...
     // 将所有待flush的数据放到这个数组中
      ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
      int nioBufferCnt = in.nioBufferCount();
      ...
      // 通过SocketChannel将数据发送出去
      switch (nioBufferCnt) {
          case 0:
              // We have something else beside ByteBuffers to write so fallback to normal writes.
              writeSpinCount -= doWrite0(in);
              break;
          case 1: {
              ...
              final int localWrittenBytes = ch.write(buffer);
              ...
              break;
          }
          default: {
              ...
              final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
              ...
              break;
          }
      }
  } while (writeSpinCount > 0);

  incompleteWrite(writeSpinCount < 0);
}

           

总结

可以看出写流程主要是通过pipeline维护的出站

ChannelHandler

链处理数据(往head方向遍历出站ChannelHandler),分成了

write

flush

两步,先是将数据写入到待flush的链表,然后再调用flush将所有数据通过SockeChannel发送出去,最终是调用

Unsafe

类的

write

flush

方法,整个过程使用了大量的异步的事件驱动处理方式。

IO读流程

读流程和写流程是一样都是通过pipeline的ChannelHandler链来处理的,区别是读流程是入站操作,是从head节点往tail方向,这个项目里Client端会读取数据,完整流程图如下:

Netty原理简析Netty做了什么?源码分析

总结

通过上面的分析,可以看出Netty的异步事件驱动的原理,主要核心组件包括Boss线程池、Worker线程池、NioEventLoop、pipeline、ChannelHandler,其中

NioEventLoop

是事件驱动的核心,是整个框架的发动机,有两个核心方法:

  • processSelectedKeys

    方法处理accept、读、写这些IO事件
  • runAllTask

    来处理其他的异步任务,比如注册、激活、连接、绑定、写数据等事件任务,也可以加入自定义任务

整个架构很清晰,并且各个组件职责分明。