首先要说明的是标题。没错!当然是为了博眼球,改成史上最详细的Netty源码分析是自取其辱。改成史上最不详细的Netty源码分析,万一说的不对也没人喷。话分两头,这个是我分析的初篇,我还是很认真和用心的写的。班门弄斧,如果有错误请指出。
为什么要写Netty源码分析? 用《Netty权威指南》话说
随着大数据,互联网和云计算的发展,传统的垂直架构将逐渐被分布式,弹性伸缩的新架构替代。
系统只要分布式部署,就存在多个节点之间的通讯问题,强调高可扩展和高性能,因此往往会选择高性能的通讯分式。使用
Netty+二进制编解码这些内部私有协议,已经逐渐成为业界主流。
简单的例子
启动下面程序后,命令行里输入后,随便敲个字符,就会有
telnet 127.0.0.1 9988
打印出来,并且在console里会打印出敲入的字符。如果没有telnet命令,使用
welcome!!!
也可以打印出
curl 127.0.0.1:9988
welcome!!!
代码如下
public class WelcomeServer {
private int port;
public WelcomeServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new WelcomeServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 9988;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new WelcomeServer(port).run();
}
}
public class WelcomeServerHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf bb=(ByteBuf)msg;
byte[] b= new byte[bb.readableBytes()];
bb.readBytes(b);
System.out.println("read message:"+new String(b));
byte[] welcomebyte="welcome!!!".getBytes();
bb.writeBytes(welcomebyte);
ctx.writeAndFlush(bb);
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
开始分析
从
ChannelFuture f = b.bind(port).sync();
入手,在一系列的调用后会到达AbstractBootstrap#doBind
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
...
}
在上面的类里最重要的就是initAndRegister(),顾名思义就是初始并注册。
下面的类去掉了没相关的代码,只保留必要的用于分析的代码
final ChannelFuture initAndRegister() {
//初始
Channel channel = null;
try {
channel = channelFactory.newChannel();//(1)
init(channel);//(2)
} catch (Throwable t) {
...
}
//注册
ChannelFuture regFuture = config().group().register(channel);//(3)
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
从代码里就是看出分成3步:
- 构建出一个channel (1)
- channel 初始化 (2)
- 将channel注册到group里并返回一个futture对象 (3)
接下来就分构建,初始化 ,注册 3部分进行分析
构建
通过
channelFactory.newChannel()
进行的构建出一个chnnel,这里使用了·
channelFactory
这个工厂对象,
channelFactory
哪里得出的
在例子WelcomeServer中有下面代码
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
channel(NioServerSocketChannel.class),它的代码在下面
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));//(1)
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
...
//初始化出 channelFactory
this.channelFactory = channelFactory;
//返回自己,链式调用,builder模式常用
return (B) this;
}
下面是ReflectiveChannelFactory的newChannel()
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
}
new ReflectiveChannelFactory<C>(channelClass)
,
我们传入的class是
NioServerSocketChannel.class
.
看到ReflectiveChannelFactoryd.newChannel()就是反射创建出一个NioServerSocketChannel对象。
创建完NioServerSocketChanne后,传入下面的init方法
小结:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
这段代码里channel(NioServerSocketChannel.class),名义上是指定了channel, 实际上是指定了ReflectiveChannelFactory。当构造channel的时候是通过反射构造出channel.
初始化
上面分析new了一个channel 对象
init(channel)都干了什么?
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer<Channel>() { //(3)
@Override
public void initChannel(final Channel ch) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
// 拿到ChannelPipeline
final ChannelPipeline pipeline = ch.pipeline();
// 拿到config里的handler,这些handler就是WelcomeServer里构造出ServerBootStrap里的handler()方法放入的,注意不是childHandler()
//WelcomeServer里目前没有写handler()
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
channel.pipeline()是什么?这得从
NioServerSocketChannel构造过程入手,我们知道NioServerSocketChannel是
clazz.newInstance()
构造出来的。构造函数如下
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
//JDK里的NIO里的SelectorProvider
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}
super(null, channel, SelectionKey.OP_ACCEPT);的调用栈如下
class AbstractNioMessageChannel{
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
}
class AbstractNioChannel{
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
...
}
}
}
class AbstractChannel{
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
}
channel.pipeline()返回的ChannelPipeline 就是上面的
newChannelPipeline()
得出的.
下面是newChannelPipeline()的代码
class AbstractChannel{
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
}
从上面的代码里分析得出pipeline其实是双向链表,如下
HeadContext.next-->TailContext
TailContext.prev-->HeadContext
有tail和head的全局引用分别指向TailContext和HeadContext
TailContext和HeadContext的区别
TailContext
HeadContext
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {}
可以看出HeadContext比TailContext多了一个ChannelOutboundHandler接口。ChannelOutboundHandler和ChannelInboundHandler的作用后续再介绍
回到代码,看addLast
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer<Channel>() { //(3)
@Override
public void initChannel(final Channel ch) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) { channel.config().setOptions(options);
}
// 拿到ChannelPipeline
final ChannelPipeline pipeline = ch.pipeline();
// 拿到config里的handler,这些handler就是构造出ServerBootStrap里的childHandler()方法放入的
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 这个处理client连接请求的关键Context的handler,这个在后面会分析,这里暂时不关心
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
addLast(...)
,最后会调用到下面代码,关注addLast0
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
//将handler分装成为一个Context对象
newCtx = newContext(group, filterName(name, handler), handler);
//将这个Context对象加入到双向链表的tail节点之前
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
//registered=false 表示该channel 还没注册到selector
//会最后调用到ChannelHandler.handlerAdded(...)方法里,这个方法里,最终会initChannel(ctx)
if (!registered) {
newCtx.setAddPending();
//将这个任务加入到pipeline的pendingHandlerCallbackHead链表上
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
addLast0,回顾上面的pipleline里的双向链表,就容易了解到将这个AbstractChannelHandlerContext加入到双向链表的tail的上个节点
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
上面的流程就是初始的过程,总结下,简单说就是下面2步
- new出一个Channel, 这个channel是
,通过反射调用无参数够造函数构造出来。NioServerSocketChannel
- 构造过程里创建了一个pipeline对象,这个pipeline对象是个双向链表的结构
-
初始化这个channel,在channel的pipeline里加入从handler(…)里拿到的handler封装成的Context对象
如下
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(...)
4.最后会往pipeline加入ServerBootstrapAcceptor这个处理client连接的handler的context
5.如上分析server端pipeline应该是

注册
上面的2步分别是new 出1个channel,然后对channel做了一些初始化,如channel里的pipeline这个双向链表的初始化操作。
现在进入到第三步,将channel注册到group里,如下
(3)
ChannelFuture regFuture = config().group().register(channel);
那问题是什么是group?
先看
config()
,调用到最后会发现这是ServerBootstrap的一个全局变量ServerBootstrapConfig,ServerBootstrapConfig的有个传入参数this指向自己,如下代码
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
}
config().group()其实就是调用到
ServerBootstrap.group().
ServerBootstrap.group()是什么呢?得看我们创建 WelcomeServer的时候,有new了2个EventLoopGroup,一个是bossGroup,另个是workerGroup。代码如下
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
ServerBootstrap中的group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法如下
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>{
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
...
this.childGroup = childGroup;
return this;
}
}
super.group(parentGroup)代码如下
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
public B group(EventLoopGroup group) {
this.group = group;
return (B) this;
}
}
做的事就是
- bossGroup付给了ServerBootstrap里全局变量
group
- workerGroup付给了ServerBootstrap里全局变量
.childGroup
config().group()
指向的就是bossGroup这个
NioEventLoopGroup
因此下面的代码
就是把
channel
注册到
EventLoopGroup
里,确切的说
把
NIOServerChannel
注册到bossGroup这个
NioEventLoopGroup
里.
NioEventLoopGroup
里的register(channel)方法是在它的父类
MultithreadEventLoopGroup
定义的。
public class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup{
public EventLoop next() {
return (EventLoop) super.next();
}
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
}
public class MultithreadEventExecutorGroup{
public EventExecutor next() {
return chooser.next();
}
}
这个chooser又是个什么对象,以及怎么初始化出来?
这得从下面的代码入手
NioEventLoopGroup无参构造函数最后调用super(…)
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
}
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
}
//nThreads==0,所以取DEFAULT_EVENT_LOOP_THREADS,取DEFAULT_EVENT_LOOP_THREAD默认就是cpu核心数*2
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
}
public abastract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup{
private final EventExecutor[] children;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {
...
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//EventExecutor 构造出来的是NioEventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
...
} finally {
...
}
}
//!!!!注意这里-chooser 在此构造出来,它是传入的chooserFactory,
//即DefaultEventExecutorChooserFactory.INSTANCE
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
}
DefaultEventExecutorChooserFactory 是个单例
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
public EventExecutorChooser newChooser(EventExecutor[] executors) {
//如果传入的executors是2的n次方
//在这里我们传入的executors的个数是cpu核心数*2,一定是2的n次方,所以返回的是PowerOfTowEventExecutorChooser
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
}
我们要关注的就是PowerOfTowEventExecutorChooser.next(),
executors[idx.getAndIncrement() & executors.length - 1]
说白了就是 EventExecutor[]里挨个被选中去执行任务。
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
回到
public class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup{
public EventLoop next() {
return (EventLoop) super.next();
}
public ChannelFuture register(Channel channel){
//next()就是获得executors数组里的某个executor
//(io.netty.channel.nio.NioEventLoop)
//然后执行注册channel
return next().register(channel);
}
}
next()得到是的executor 是EventExecutor[] children里的一个,也就是
io.netty.channel.nio.NioEventLoop
接下来需要明确
NioEventLoop
和
NioEventLoopGroup
关系
NioEventLoopGroup 类
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
}
NioEventLoop类
的远祖就是
NioEventLoop
。然后可以回到它两关系,
EventExecutor
里有个EventExecutor[] children的数组,里面包含的真实对象就是
NioEventLoopGroup
NioEventLoop
就是从EventExecutor[] children的数组里拿到一个
next().register(channel)
,
NioEventLoop
NioEventLoop.register(channel)
接下来就看NioEventLoop的register(Channel channel)方法,这个方法其实是它的父类SingleThreadEventLoop里的方法
public class SingleThreadEventLoop{
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
}
promise.channel()
我们知道是
NioServerSocketChannel
,它的unsafe()方法返回了一个
Unsafe
接口对象,
Unsafe
是个什么鬼?
还记得
NioServerSocketChannel
构造到最后调用如下代码,其中有
newUnsafe()
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
让我们进入
newUnsafe()
方法,构造出一个Unsafe的接口对象。看下
Unsafe
接口 的说明。正常使用是不会碰到用到这些类
Unsafe operations that should never be called from user-code.
初略的看了下这个接口提供的方法,例如
- void register(EventLoop eventLoop, ChannelPromise promise);
-
void bind(SocketAddress localAddress, ChannelPromise promise);
可以断定这个类是用来做连接相关的
debug会得到最后的对象是
NioMessageUnsafe
private final class NioMessageUnsafe extends AbstractNioUnsafe {
}
我们关注的是register(EventLoop eventLoop, ChannelPromise promise)的实现,这个实现是在
AbstractNioUnsafe
的父类
AbstractUnsafe
里实现的
protected abstract class AbstractUnsafe implements Unsafe {
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
//eventLoop 是传入NioEventLoop AbstractChannel.this.eventLoop = eventLoop;
//第一次进来eventLoop.inEventLoop()是false
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//关注点-eventLoop.execute
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
}
上面的代码我们关注
eventLoop.execute(new Runnable() {...});
,这个execute是
NioEventLoop
的父类
SingleThreadEventExecutor
里实现的,如下代码
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private final Executor executor;
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
// 第一次肯定是到这里
//关注点-启动线程!!!
startThread();
//关注点-加入任务!!!
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
private void doStartThread() {
assert thread == null;
//这个executor又是什么?
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
...
} finally {
...
}
}
});
}
}
上面的代码里这个executor是什么?
我们在create NioEventLoopGroup的时候,会调用到它的父类构造函数中,如下面传入的executor.但是传入的是null,为空则构造出
ThreadPerTaskExecutor(newDefaultThreadFactory())
赋给executor。看下面代码
class MultithreadEventExecutorGroup{
private final EventExecutor[] children;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {
...
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
...
}
}
因此这个executor真实身份是
io.netty.util.concurrent.ThreadPerTaskExecutor
ThreadPerTaskExecutor的类如下
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
到此startThread()就很明确了,就是启动传入的command线程。最后就是调用
SingleThreadEventExecutor.this.run();
这个run()在
SingleThreadEventExecutor
里是
protected abstract void run()
,它的实现在
NIOEventLoop
里,如下,这是个死循环
public class NIOEventLoop{
protected void run() {
for (;;) {
try {
//死循环从一个 private final Queue<Runnable> taskQueue里去不断的获取任务,taskQueue是哪里建立出来的?找下它在哪里
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
死循环从一个 private final Queue taskQueue里去不断的获取任务,taskQueue是哪里建立出来的?找下它在哪里建立就知道了,它是在下面的方法里
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
//看这里构造出taskQueue
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
启动完线程后,就是加入任务了,代码比较简单,
往taskQueue里放任务
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
到此register的代码终于可以告一段落,简单的总结下initandregister的过程
- 创建了一个NioServerSocketChannel
- 创建了一个NioEventLoopGroup
- NioEventLoopGroup里有个child[]的数组来存放了NioEventLoop
- 将NioServerSocketChannel注册到NioEventLoopGroup
- NioServerChannel注册到NioEventLoopGroup,不是直接注册,而是从child[]的数组中挑一个NioEventLoop,挑哪个由choser来决定。然后将NioServerChannel注册到NioEventLoop
- 将NioServerChannel注册到NioEventLoop,也不是直接注册,而是先启动一个线程,执行的是NioEventLoop的run()
- run()方法是个死循环,会不断从taskqueue这个队列里去获取到是否有runable的任务
- addTask(task) 这个方法才真正的把注册channel的任务加入到taskqueue里
- run()方法里检测到taskqueue队列里有任务了就进行相应的处理
run()里的分析待后续分解