首先要說明的是标題。沒錯!當然是為了博眼球,改成史上最詳細的Netty源碼分析是自取其辱。改成史上最不詳細的Netty源碼分析,萬一說的不對也沒人噴。話分兩頭,這個是我分析的初篇,我還是很認真和用心的寫的。班門弄斧,如果有錯誤請指出。
為什麼要寫Netty源碼分析? 用《Netty權威指南》話說
随着大資料,網際網路和雲計算的發展,傳統的垂直架構将逐漸被分布式,彈性伸縮的新架構替代。
系統隻要分布式部署,就存在多個節點之間的通訊問題,強調高可擴充和高性能,是以往往會選擇高性能的通訊分式。使用
Netty+二進制編解碼這些内部私有協定,已經逐漸成為業界主流。
簡單的例子
啟動下面程式後,指令行裡輸入後,随便敲個字元,就會有
telnet 127.0.0.1 9988
列印出來,并且在console裡會列印出敲入的字元。如果沒有telnet指令,使用
welcome!!!
也可以列印出
curl 127.0.0.1:9988
welcome!!!
代碼如下
public class WelcomeServer {
private int port;
public WelcomeServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new WelcomeServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 9988;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new WelcomeServer(port).run();
}
}
public class WelcomeServerHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf bb=(ByteBuf)msg;
byte[] b= new byte[bb.readableBytes()];
bb.readBytes(b);
System.out.println("read message:"+new String(b));
byte[] welcomebyte="welcome!!!".getBytes();
bb.writeBytes(welcomebyte);
ctx.writeAndFlush(bb);
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
開始分析
從
ChannelFuture f = b.bind(port).sync();
入手,在一系列的調用後會到達AbstractBootstrap#doBind
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
...
}
在上面的類裡最重要的就是initAndRegister(),顧名思義就是初始并注冊。
下面的類去掉了沒相關的代碼,隻保留必要的用于分析的代碼
final ChannelFuture initAndRegister() {
//初始
Channel channel = null;
try {
channel = channelFactory.newChannel();//(1)
init(channel);//(2)
} catch (Throwable t) {
...
}
//注冊
ChannelFuture regFuture = config().group().register(channel);//(3)
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
從代碼裡就是看出分成3步:
- 建構出一個channel (1)
- channel 初始化 (2)
- 将channel注冊到group裡并傳回一個futture對象 (3)
接下來就分建構,初始化 ,注冊 3部分進行分析
建構
通過
channelFactory.newChannel()
進行的建構出一個chnnel,這裡使用了·
channelFactory
這個工廠對象,
channelFactory
哪裡得出的
在例子WelcomeServer中有下面代碼
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
channel(NioServerSocketChannel.class),它的代碼在下面
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));//(1)
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
...
//初始化出 channelFactory
this.channelFactory = channelFactory;
//傳回自己,鍊式調用,builder模式常用
return (B) this;
}
下面是ReflectiveChannelFactory的newChannel()
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
}
new ReflectiveChannelFactory<C>(channelClass)
,
我們傳入的class是
NioServerSocketChannel.class
.
看到ReflectiveChannelFactoryd.newChannel()就是反射建立出一個NioServerSocketChannel對象。
建立完NioServerSocketChanne後,傳入下面的init方法
小結:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
這段代碼裡channel(NioServerSocketChannel.class),名義上是指定了channel, 實際上是指定了ReflectiveChannelFactory。當構造channel的時候是通過反射構造出channel.
初始化
上面分析new了一個channel 對象
init(channel)都幹了什麼?
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer<Channel>() { //(3)
@Override
public void initChannel(final Channel ch) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
// 拿到ChannelPipeline
final ChannelPipeline pipeline = ch.pipeline();
// 拿到config裡的handler,這些handler就是WelcomeServer裡構造出ServerBootStrap裡的handler()方法放入的,注意不是childHandler()
//WelcomeServer裡目前沒有寫handler()
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
channel.pipeline()是什麼?這得從
NioServerSocketChannel構造過程入手,我們知道NioServerSocketChannel是
clazz.newInstance()
構造出來的。構造函數如下
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
//JDK裡的NIO裡的SelectorProvider
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}
super(null, channel, SelectionKey.OP_ACCEPT);的調用棧如下
class AbstractNioMessageChannel{
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
}
class AbstractNioChannel{
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
...
}
}
}
class AbstractChannel{
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
}
channel.pipeline()傳回的ChannelPipeline 就是上面的
newChannelPipeline()
得出的.
下面是newChannelPipeline()的代碼
class AbstractChannel{
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
}
從上面的代碼裡分析得出pipeline其實是雙向連結清單,如下
HeadContext.next-->TailContext
TailContext.prev-->HeadContext
有tail和head的全局引用分别指向TailContext和HeadContext
TailContext和HeadContext的差別
TailContext
HeadContext
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {}
可以看出HeadContext比TailContext多了一個ChannelOutboundHandler接口。ChannelOutboundHandler和ChannelInboundHandler的作用後續再介紹
回到代碼,看addLast
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer<Channel>() { //(3)
@Override
public void initChannel(final Channel ch) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) { channel.config().setOptions(options);
}
// 拿到ChannelPipeline
final ChannelPipeline pipeline = ch.pipeline();
// 拿到config裡的handler,這些handler就是構造出ServerBootStrap裡的childHandler()方法放入的
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 這個處理client連接配接請求的關鍵Context的handler,這個在後面會分析,這裡暫時不關心
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
addLast(...)
,最後會調用到下面代碼,關注addLast0
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
//将handler分裝成為一個Context對象
newCtx = newContext(group, filterName(name, handler), handler);
//将這個Context對象加入到雙向連結清單的tail節點之前
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
//registered=false 表示該channel 還沒注冊到selector
//會最後調用到ChannelHandler.handlerAdded(...)方法裡,這個方法裡,最終會initChannel(ctx)
if (!registered) {
newCtx.setAddPending();
//将這個任務加入到pipeline的pendingHandlerCallbackHead連結清單上
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
addLast0,回顧上面的pipleline裡的雙向連結清單,就容易了解到将這個AbstractChannelHandlerContext加入到雙向連結清單的tail的上個節點
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
上面的流程就是初始的過程,總結下,簡單說就是下面2步
- new出一個Channel, 這個channel是
,通過反射調用無參數夠造函數構造出來。NioServerSocketChannel
- 構造過程裡建立了一個pipeline對象,這個pipeline對象是個雙向連結清單的結構
-
初始化這個channel,在channel的pipeline裡加入從handler(…)裡拿到的handler封裝成的Context對象
如下
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(...)
4.最後會往pipeline加入ServerBootstrapAcceptor這個處理client連接配接的handler的context
5.如上分析server端pipeline應該是

注冊
上面的2步分别是new 出1個channel,然後對channel做了一些初始化,如channel裡的pipeline這個雙向連結清單的初始化操作。
現在進入到第三步,将channel注冊到group裡,如下
(3)
ChannelFuture regFuture = config().group().register(channel);
那問題是什麼是group?
先看
config()
,調用到最後會發現這是ServerBootstrap的一個全局變量ServerBootstrapConfig,ServerBootstrapConfig的有個傳入參數this指向自己,如下代碼
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
}
config().group()其實就是調用到
ServerBootstrap.group().
ServerBootstrap.group()是什麼呢?得看我們建立 WelcomeServer的時候,有new了2個EventLoopGroup,一個是bossGroup,另個是workerGroup。代碼如下
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
ServerBootstrap中的group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法如下
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>{
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
...
this.childGroup = childGroup;
return this;
}
}
super.group(parentGroup)代碼如下
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
public B group(EventLoopGroup group) {
this.group = group;
return (B) this;
}
}
做的事就是
- bossGroup付給了ServerBootstrap裡全局變量
group
- workerGroup付給了ServerBootstrap裡全局變量
.childGroup
config().group()
指向的就是bossGroup這個
NioEventLoopGroup
是以下面的代碼
就是把
channel
注冊到
EventLoopGroup
裡,确切的說
把
NIOServerChannel
注冊到bossGroup這個
NioEventLoopGroup
裡.
NioEventLoopGroup
裡的register(channel)方法是在它的父類
MultithreadEventLoopGroup
定義的。
public class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup{
public EventLoop next() {
return (EventLoop) super.next();
}
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
}
public class MultithreadEventExecutorGroup{
public EventExecutor next() {
return chooser.next();
}
}
這個chooser又是個什麼對象,以及怎麼初始化出來?
這得從下面的代碼入手
NioEventLoopGroup無參構造函數最後調用super(…)
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
}
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
}
//nThreads==0,是以取DEFAULT_EVENT_LOOP_THREADS,取DEFAULT_EVENT_LOOP_THREAD預設就是cpu核心數*2
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
}
public abastract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup{
private final EventExecutor[] children;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {
...
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//EventExecutor 構造出來的是NioEventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
...
} finally {
...
}
}
//!!!!注意這裡-chooser 在此構造出來,它是傳入的chooserFactory,
//即DefaultEventExecutorChooserFactory.INSTANCE
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
}
DefaultEventExecutorChooserFactory 是個單例
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
public EventExecutorChooser newChooser(EventExecutor[] executors) {
//如果傳入的executors是2的n次方
//在這裡我們傳入的executors的個數是cpu核心數*2,一定是2的n次方,是以傳回的是PowerOfTowEventExecutorChooser
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
}
我們要關注的就是PowerOfTowEventExecutorChooser.next(),
executors[idx.getAndIncrement() & executors.length - 1]
說白了就是 EventExecutor[]裡挨個被選中去執行任務。
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
回到
public class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup{
public EventLoop next() {
return (EventLoop) super.next();
}
public ChannelFuture register(Channel channel){
//next()就是獲得executors數組裡的某個executor
//(io.netty.channel.nio.NioEventLoop)
//然後執行注冊channel
return next().register(channel);
}
}
next()得到是的executor 是EventExecutor[] children裡的一個,也就是
io.netty.channel.nio.NioEventLoop
接下來需要明确
NioEventLoop
和
NioEventLoopGroup
關系
NioEventLoopGroup 類
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
}
NioEventLoop類
的遠祖就是
NioEventLoop
。然後可以回到它兩關系,
EventExecutor
裡有個EventExecutor[] children的數組,裡面包含的真實對象就是
NioEventLoopGroup
NioEventLoop
就是從EventExecutor[] children的數組裡拿到一個
next().register(channel)
,
NioEventLoop
NioEventLoop.register(channel)
接下來就看NioEventLoop的register(Channel channel)方法,這個方法其實是它的父類SingleThreadEventLoop裡的方法
public class SingleThreadEventLoop{
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
}
promise.channel()
我們知道是
NioServerSocketChannel
,它的unsafe()方法傳回了一個
Unsafe
接口對象,
Unsafe
是個什麼鬼?
還記得
NioServerSocketChannel
構造到最後調用如下代碼,其中有
newUnsafe()
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
讓我們進入
newUnsafe()
方法,構造出一個Unsafe的接口對象。看下
Unsafe
接口 的說明。正常使用是不會碰到用到這些類
Unsafe operations that should never be called from user-code.
初略的看了下這個接口提供的方法,例如
- void register(EventLoop eventLoop, ChannelPromise promise);
-
void bind(SocketAddress localAddress, ChannelPromise promise);
可以斷定這個類是用來做連接配接相關的
debug會得到最後的對象是
NioMessageUnsafe
private final class NioMessageUnsafe extends AbstractNioUnsafe {
}
我們關注的是register(EventLoop eventLoop, ChannelPromise promise)的實作,這個實作是在
AbstractNioUnsafe
的父類
AbstractUnsafe
裡實作的
protected abstract class AbstractUnsafe implements Unsafe {
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
//eventLoop 是傳入NioEventLoop AbstractChannel.this.eventLoop = eventLoop;
//第一次進來eventLoop.inEventLoop()是false
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//關注點-eventLoop.execute
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
}
上面的代碼我們關注
eventLoop.execute(new Runnable() {...});
,這個execute是
NioEventLoop
的父類
SingleThreadEventExecutor
裡實作的,如下代碼
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private final Executor executor;
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
// 第一次肯定是到這裡
//關注點-啟動線程!!!
startThread();
//關注點-加入任務!!!
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
private void doStartThread() {
assert thread == null;
//這個executor又是什麼?
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
...
} finally {
...
}
}
});
}
}
上面的代碼裡這個executor是什麼?
我們在create NioEventLoopGroup的時候,會調用到它的父類構造函數中,如下面傳入的executor.但是傳入的是null,為空則構造出
ThreadPerTaskExecutor(newDefaultThreadFactory())
賦給executor。看下面代碼
class MultithreadEventExecutorGroup{
private final EventExecutor[] children;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {
...
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
...
}
}
是以這個executor真實身份是
io.netty.util.concurrent.ThreadPerTaskExecutor
ThreadPerTaskExecutor的類如下
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
到此startThread()就很明确了,就是啟動傳入的command線程。最後就是調用
SingleThreadEventExecutor.this.run();
這個run()在
SingleThreadEventExecutor
裡是
protected abstract void run()
,它的實作在
NIOEventLoop
裡,如下,這是個死循環
public class NIOEventLoop{
protected void run() {
for (;;) {
try {
//死循環從一個 private final Queue<Runnable> taskQueue裡去不斷的擷取任務,taskQueue是哪裡建立出來的?找下它在哪裡
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
死循環從一個 private final Queue taskQueue裡去不斷的擷取任務,taskQueue是哪裡建立出來的?找下它在哪裡建立就知道了,它是在下面的方法裡
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
//看這裡構造出taskQueue
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
啟動完線程後,就是加入任務了,代碼比較簡單,
往taskQueue裡放任務
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
到此register的代碼終于可以告一段落,簡單的總結下initandregister的過程
- 建立了一個NioServerSocketChannel
- 建立了一個NioEventLoopGroup
- NioEventLoopGroup裡有個child[]的數組來存放了NioEventLoop
- 将NioServerSocketChannel注冊到NioEventLoopGroup
- NioServerChannel注冊到NioEventLoopGroup,不是直接注冊,而是從child[]的數組中挑一個NioEventLoop,挑哪個由choser來決定。然後将NioServerChannel注冊到NioEventLoop
- 将NioServerChannel注冊到NioEventLoop,也不是直接注冊,而是先啟動一個線程,執行的是NioEventLoop的run()
- run()方法是個死循環,會不斷從taskqueue這個隊列裡去擷取到是否有runable的任務
- addTask(task) 這個方法才真正的把注冊channel的任務加入到taskqueue裡
- run()方法裡檢測到taskqueue隊列裡有任務了就進行相應的處理
run()裡的分析待後續分解