文章目录
- Netty做了什么?
- 源码分析
-
- Server端启动过程
- IO写流程
- IO读流程
- 总结
Netty做了什么?
Netty是以NIO为基础的网络通信框架,对jdk提供的的NIO的API做了很多完善和抽象,如果没有Netty我们需要搞定下面这些事:
- 如何设计这个框架,以保证可维护性和可扩展性
- 如何定义线程模型:哪些线程处理accept事件、哪些线程读写io、哪些线程处理业务逻辑
- 解决jdk自带的NIO的一些bug,比如空轮询bug
- 怎么做超时检测、异常重连等
而Netty将client端与server端的整个通信流程进行了抽象,设计了一套reactor线程模型来接收channel的accept、读、写等IO事件,抽象了一个由ChannelHandler组成的处理链pipeline来处理IO事件,下面是Netty Server端的处理流程图图(Client端没有Boss NioEventLoopGroup):
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0zYtJGa1clW1UzVZFDbzIGaOJDTwYVbiVHNHpleO1GTulzRilWO5xkNNh0YwIFSh9Fd4VGdsATMfd3bkFGazxyaHRGcWdUYuVzVa9GczoVdG1mWfVGc5RHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cWZwpmL5QzN4ATOxITM3AjNwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpeg)
结合上图,Server端处理IO的核心流程可以简单总结如下:
-
负责接受channel的Accept事件,将Boss NioEventLoopGroup
的channel交给accepted
中的一个Worker NioEventLoopGroup
,作为Worker的NioEventLoop
都有自己的NioEventLoop
,Selector
会在这个channel
上注册读写事件Selector
- 作为Worker的
会不停的轮询NioEventLoop
上的事件,调用Selector
方法来处理IO事件,根据入站及出站场景,最终交给processSelectedKeys
来调用pipeline
链来处理ChannelHandler
在这种设计下,开发人员的实际工作量就是根据业务特点来扩展自己的
ChannelHandler
,实现对应的事件处理方法,并按照顺序注册到
pipeline
里(比如入站时,必须是先解码再交给业务handler,这个顺序不能反过来),比如dubbo扩展的自己的序列化和反序列化的handler以及包含业务线程池逻辑的
AllChannelHandler
。
先要介绍下Netty的核心组件
ChannelHandler
接口,该接口定义了IO的基本操作,又分为两个子接口,分别代表入站(读数据流程)和出站(写数据流程)时的操作,类图如下:
ChannelInboundHandler
接口里定义了入站流程时的各种事件:
ChannelOutboundHandler
接口里定义了入站流程时的各种事件:
ChannelHandler
在加入
pipeline
的链时会包装成
AbstractChannelHandlerContext
类,
AbstractChannelHandlerContext
类是一个双向链表,
pipeline
里维护了一个head和tail节点,以此来找到处理链的入口,最终pipeline的内部结构图如下:
对整个Netty处理流程有个大概认识后,我们分析下源码
源码分析
结合官方文档里的例子来debug下原理,我debug 的Netty版本是4.1.25.Final,官方文档里一步步教我们怎么从一个最简单的只收不发的demo到最后涉及到序列化、反序列化、拆包逻辑的接收和发送的demo,我这里将官网文档上的demo整理到了github上,有兴趣的可以下载下来自己debug,先说下demo里的几个类的用途,然后分析源码:
- TimerServer:负责启动一个Netty Server端
- TimeServerHandler:自定义的Server端的处理器,负责在有client接入时,写入一个UnixTime对象
- TimeEncoder:自定义的编码handler,Server端通过这个类将UnixTime对象编码成byte
- TimeClient:负责启动一个Netty Client端
- TimeClientHandler:自定义的client端的一个处理器,负责读取io数据并打印出来
- TimeDecoder:自定义解码类,client端通过这个类来将IO中读取的byte并反序列化成UnixTime对象
- UnixTime:时间对象
这个项目比较简单,就是Sever端在接收到Client端的连接后,会通过
TimeServerHandler
向通道中写入
UnixTime
对象,再通过
TimeEncoder
将
UnixTime
对象编码后通过通道发送出去,Client端从通道中读取数据并解码成UnixTime对象然后打印出来
这里以Server端的启动、Server端的写流程、Client端的读流程三步来分析源码。
Server端启动过程
TimerServer
配置Netty Server端的核心组件,然后启动Server端,代码如下:
// boss
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
// worker
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// Netty Server端
ServerBootstrap b = new ServerBootstrap(); // (2)
// 设置group
b.group(bossGroup, workerGroup)
// 设置当前为Server端的Channel类型是NioServerSocketChannel
.channel(NioServerSocketChannel.class) // (3)
// 设置所有accepted的Channel的pipeline处理链(需要经过那些ChannelHandle处理)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeEncoder(), new TimeServerHandler());
}
});
// 绑定端口,并初始化NioServerSocketChannel的pipeline,注册事件监听
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
// 关闭
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
可以看出在Server端启动过程中主要做了下面几件事:
- 配置Boss和Worker的
NioEventLoopGroup
- 为accepted的channel的pipeline设置
链ChannelHandler
- 通过
方法来完成绑定逻辑,这个方法比较核心,主要逻辑如下:bind(port)
- 实例化
NioServerSocketChannel
- 通过init方法,为
初始化pipeline配置,包括用来初始化NioServerSocketChannel
链的ChannelHandler
类(这个类会添加重要的ChannelInitializer
,主要负责将accepted的channel注册到worker里的某个NioEventLoop的Selector上)ServerBootstrapAcceptor
- 从Boss中选一个
,并将NioEventLoop
注册到Selector上NioServerSocketChannel
- 注册到Selector后,激活init里的
配置,完善ChannelInitializer
链ChannelHandler
- 为
绑定端口NioServerSocketChannel
- 实例化
这里将最重的绑定端口的逻辑放在了最后,只有当初始化配置、注册Selector成功后才执行端口绑定逻辑,这个过程中大量使用异步操作(用NioEventLoop的异步执行)和事件通知的方式,下面看下源码。
b.bind(port)
源码,最终调用到
doBind
方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 核心逻辑,初始化NioServerSocketChannel,并注册到一个Selector上
final ChannelFuture regFuture = initAndRegister();
...
// 异步等待结果,并添加结果Listener,根据最终的成功或失败做对应的逻辑
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 为regFuture添加operationComplete事件的监听器,只有注册Selector和初始化完成后才根据结果执行绑定端口的逻辑
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// 设置失败状态
promise.setFailure(cause);
} else {
//
promise.registered();
// 绑定端口
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
initAndRegister()
代码如下:
这个方法分为init 和 register两步:
final ChannelFuture initAndRegister() {
Channel channel = null;
...
// 反射构造器,实例化NioServerSocketChannel
channel = channelFactory.newChannel();
// 为NioServerSocketChannel初始化pipeline的handler链
init(channel);
...
ChannelFuture regFuture = config().group().register(channel);
...
return regFuture;
}
init(channel)
代码如下:
这个方法最关键的逻辑是为
NioserverSocketChannel
的
pipeline
中添加
ServerBootstrapAcceptor
,这个类主要负责将accepted的channel注册到worker里的某个
NioEventLoop
的
Selector
上,给woker分配任务
@Override
void init(Channel channel) throws Exception {
...
ChannelPipeline p = channel.pipeline();
...
// 往NioserverSocketChannel的pipeline中添加ServerBootstrapAcceptor
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 异步添加ServerBootstrapAcceptor
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
register(channel)代码如下:
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
private void register0(ChannelPromise promise) {
...
// 调用java的channel的api,注册选中的NioEventLoop的Selector上
doRegister();
...
// 完善channel的pipeline里的channelHandler链,调用上面添加的ChannelInitializer的initChannel方法,添加ServerBootstrapAcceptor
pipeline.invokeHandlerAddedIfNeeded();
...
// 注册成功,启动监听结果的Listener,包括上面b.bind(port)代码里的ChannelFutureListener
safeSetSuccess(promise);
// 调用Registered事件处理链
pipeline.fireChannelRegistered();
...
}
绑定端口:
异步执行端口绑定的逻辑
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
到这里Server端启动的过程就完了,整个流程很简单,只需要自己少量的配置和一些自定义的ChannelHandler。总结下核心逻辑:
- 实例化
NioServerSocketChannel
- 将
注册到Boss里的一个NioEventLoop的Selector上NioServerSocketChannel
- 完善pipeline的
链,包括最重要的ChannelHandler
ServerBootstrapAcceptor
- 为
绑定端口NioServerSocketChannel
源码中用了大量的异步、事件驱动的实现方式将这几个核心过程串起来。
IO写流程
读写流程最终都是通过pipeline维护的
ChannelHandler
链来处理,这个项目里Server端在接收到Client端连接后会向通道写入一个
UnixTime
对象,代码在自定义的
TimeServerHandler
里,
channelActive
方法会在与Client端连接成功时触发,我们通过这个简单逻辑来看下写流程原理,代码如下:
/**
* @author HJ
* @date 2021-06-05
**/
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
// 在client端连接成功后,往通道中写入当前时间
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
}
从代码中可以看出写入的是
UnixTime
对象,肯定还要经过序列化成byte通过socket发送,所以在Server启动的代码里会加入负责序列化和编码的
TimeEncoder
类。
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeEncoder(), new TimeServerHandler());
}
});
UnixTime
里就是一个时间戳,代码如下:
public class UnixTime {
private final long value;
...
}
所以accepted的channel的
pipeline
结构如下:
我们看下
ctx.writeAndFlush(new UnixTime())
的过程,
ctx
是
pipeline
的
ChannelHandler
链中的一个链表节点(见上面插图),
invokeWriteAndFlush
方法最终会被分成
write
和
flush
两部分:
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
invokeWrite0(msg, promise);
负责将数据写入待flush区域,等待最终的flush操作才会真正通过socket发送,最终会调到下面的方法:
private void write(Object msg, boolean flush, ChannelPromise promise) {
// 获取下一个出站的ChannelHandler,源码在下面
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
// executor就是绑定的NioEventLoop
EventExecutor executor = next.executor();
// 判断是否是NioEventLoop自己执行的,如果是的话就直接执行,不是的话就封装成一个task交由NioEventLoop异步执行
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
// 封装成一个task交由NioEventLoop异步执行
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
// 由于是出站操作,是从tail往head方向遍历链表,找到出站的ChannelHandler
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
方法的主要逻辑是在
ChannelHandler
链中找到下一个出站的
ChannelHandler
并交由其处理,根据上面的
pipeline
图可以看出下一个出站操作就是
TimeEncoder
(负责将pojo对象转成byte),debug截图如下:
TimeEncoder
代码如下:
就是将
UnixTime
这个pojo对象转成byte写入到buffer里
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) throws Exception {
out.writeInt((int)msg.value());
}
}
最终调用到head节点的write方法,debug图如下:
这里的并没有将消息真正通过socket发出去,而是将消息封装成一个
Entry
,放到一个待flushed的链表里,debug如下:
下面接着看
invokeFlush0();
方法:
最终会调用到head节点的flush方法:
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
Netty的
Unsafe
类是直接与
SocketChannel
交互的类,最终调用的代码如下:
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
...
// 将所有待flush的数据放到这个数组中
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
...
// 通过SocketChannel将数据发送出去
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
break;
case 1: {
...
final int localWrittenBytes = ch.write(buffer);
...
break;
}
default: {
...
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
...
break;
}
}
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
总结
可以看出写流程主要是通过pipeline维护的出站
ChannelHandler
链处理数据(往head方向遍历出站ChannelHandler),分成了
write
和
flush
两步,先是将数据写入到待flush的链表,然后再调用flush将所有数据通过SockeChannel发送出去,最终是调用
Unsafe
类的
write
和
flush
方法,整个过程使用了大量的异步的事件驱动处理方式。
IO读流程
读流程和写流程是一样都是通过pipeline的ChannelHandler链来处理的,区别是读流程是入站操作,是从head节点往tail方向,这个项目里Client端会读取数据,完整流程图如下:
总结
通过上面的分析,可以看出Netty的异步事件驱动的原理,主要核心组件包括Boss线程池、Worker线程池、NioEventLoop、pipeline、ChannelHandler,其中
NioEventLoop
是事件驱动的核心,是整个框架的发动机,有两个核心方法:
-
方法处理accept、读、写这些IO事件processSelectedKeys
-
来处理其他的异步任务,比如注册、激活、连接、绑定、写数据等事件任务,也可以加入自定义任务runAllTask
整个架构很清晰,并且各个组件职责分明。