目录
绪论
步骤一:NioEventLoop创建
new ThreadPerTaskExecutor()[线程创建器]
每次执行任务时都会创建一个线程实体(FastThreadLocalThread--封装的一个线程)
识别nioEventLoop-1-xx
for(){newChild()} [构造NioEventLoop]
创建一个selector
保存线程执行器ThreadPerTaskExecutor
创建一个MpscQueue
chooserFactory.newChooser() [线程选择器]
步骤二:NioEventLoop启动
服务端启动绑定端口
新连接介入通过chooser绑定一个NioEventLoop
步骤三:NioEventLoop执行逻辑
SingleThreadEventExecutor.this.run()
select() [检查是否有IO事件]
processSelectedKeys() [处理IO事件]
runAllTasks() [处理异步任务队列--外部线程放进来的任务]
绪论
三个问题
- 默认情况下,netty服务端起多少线程?何时启动?
- Netty如何解觉jdk空轮训bug的?
- Netty如何保证串行无锁化?
内容
- NioEventLoop创建
- NioEventLoop启动
- NioEventLoop执行逻辑
步骤一:NioEventLoop创建
new NioEventLoopGroup()
- new ThreadPerTaskExecutor()[线程创建器]
- for(){newChild()} [构造NioEventLoop]
- chooserFactory.newChooser() [线程选择器]
代码入口
EventLoopGroup group = new NioEventLoopGroup();
一直深入代码
public NioEventLoopGroup() {
this(0); //线程数默认为0
}
进入this(0):
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
进入this(...)方法:
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
进入this(...)方法:
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
进入this(...)方法:
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider
selectorProvider,final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory,
RejectedExecutionHandlers.reject());
}
进入super(...)方法:
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
进入super(...)方法:
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory,
Object... args) {
this(nThreads, threadFactory == null ? null :
new ThreadPerTaskExecutor(threadFactory), args);
}
进入this(...)方法:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
继续进入this(...)方法看到最终方法:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
//代码校验
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
//创建步骤1--线程创建器
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//创建步骤2--构造NioEventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//创建步骤3--线程选择器
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
new ThreadPerTaskExecutor()[线程创建器]
- 每次执行任务时都会创建一个线程实体
- NioEventLoop线程命名规则nioEventLoop-1-xx
每次执行任务时都会创建一个线程实体(FastThreadLocalThread--封装的一个线程)
new ThreadPerTaskExecutor(),执行execute()方法,利用threadFactory产生一个新线程。传入的参数是一个newDefaultThreadFactory()产生的Factory实例
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
newDefaultThreadFactory()产生的Factory实例
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
进入:
public DefaultThreadFactory(Class<?> poolType) {
this(poolType, false, Thread.NORM_PRIORITY);
}
进入this()方法:
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
this(toPoolName(poolType), daemon, priority);
}
进入this()方法:
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority, System.getSecurityManager() == null ?
Thread.currentThread().getThreadGroup()
:System.getSecurityManager().getThreadGroup());
}
进入this()方法:
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup
threadGroup) {
if (poolName == null) {
throw new NullPointerException("poolName");
}
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
"priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}
识别nioEventLoop-1-xx
public static String toPoolName(Class<?> poolType) {
if (poolType == null) {
throw new NullPointerException("poolType");
}
String poolName = StringUtil.simpleClassName(poolType);
switch (poolName.length()) {
case 0:
return "unknown";
case 1:
return poolName.toLowerCase(Locale.US);
default:
if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
} else {
return poolName;
}
}
}
产生的Factory实例的newThread()方法,产生FastThreadLocalThread实体
public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
for(){newChild()} [构造NioEventLoop]
- 保存线程执行器ThreadPerTaskExecutor
- 创建一个MpscQueue
- 创建一个selector
入口 children[i] = newChild(executor, args); 找到NioEventLoopGroup类实现的方法
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
可以看到NioEventLoop的构造函数
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector; //创建一个selector
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
创建一个selector
SelectorTuple 是一个内部包装类,包含了final Selector unwrappedSelector和final Selector selector;,在openSelector()方法中创建了selector。
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
......
}
继续进入NioEventLoop的构造函数中的super()方法
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}
进入super()方法:
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory,
boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
}
可以看到在this()方法中传进来了new ThreadPerTaskExecutor(threadFactory)执行器。继续进入this()方法
保存线程执行器ThreadPerTaskExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
//将传进来的执行器保存起来
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
进入newTaskQueue()方法,找到NioEventLoop中的实现,可以看到创建了newMpscQueue()队列。
创建一个MpscQueue
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
chooserFactory.newChooser() [线程选择器]
给新连接绑定对应的NioEventLoop
- isPowerOfTwo() -- 判断是不是2的次幂
- 是--执行PowerOfTwoEventExecutorChooser(优化);index++ & (length-1)
- 否--执行GenericEventExecutorChooser(普通); abs(index++ % (length)
新连接绑定NioEventLoop的过程:每来一个新连接,NioEventLoop[]数组会往后移动一位,移到队尾后就会从第一个继续绑定NioEventLoop。
入口:chooser = chooserFactory.newChooser(children);children为前面创建的NioEventLoop数组
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
idx.getAndIncrement() & executors.length - 1 解释:
假如idx是111010,NioEventLoop数组大小为16,减1之后为1111,两者相与后结果为1010.可以快速定位到数组内的位置(因为是2的次幂)。而且当idx不断增大时,高位就不会有作用,而只会在后四位进行循环。
而取模%操作依赖于底层操作才能得到,速度会比&慢。
步骤二:NioEventLoop启动
- 服务端启动绑定端口
- 新连接介入通过chooser绑定一个NioEventLoop
服务端启动绑定端口
bind() --> execute(task) [入口]
startThread() --> dostartThread() [创建线程]
ThreadPerTaskExecutor.execute()
thread=Thread.currentThread()
NioEventLoop.run()
服务端启动过程中,主线程会调用bind()方法,该方法最终会将实际绑定的流程封装成一个task,然后调用服务端Channel的一个execute()方法去具体的执行。然后netty会判断调用execute()的线程不是NioEventLoop线程,于是调用startThread()中的dostartThread()开始创建线程(使用ThreadPerTaskExecutor,ThreadPerTaskExecutor.execute()会产生一个线程--具体过程是NioEventLoop将当前创建的线程进行保存thread=Thread.currentThread(),然后调用NioEventLoop.run()启动)。
从bind()方法中doBind()中的doBind0(regFuture, channel, localAddress, promise);进入
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
接着调用execute()方法进入类SingleThreadEventExecutor中的实现。其中,inEventLoop()方法判断当前执行的线程是否是NioEventLoop的线程。
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop(); //该方法判断当前执行的线程是否是NioEventLoop的线程
addTask(task);
if (!inEventLoop) { //不是则创建新的线程
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
进入inEventLoop()方法:
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
进入inEventLoop()方法:
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
不是NioEventLoop线程的话就调用startThread()中的dostartThread()创建线程。
首先判断当前线程状态是否是启动的,未启动则使用CAS的方法进行实际线程启动。
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}
其中的executor是之前newChild()中第一个参数传入到NioEventLoop里面
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
if (logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
新连接介入通过chooser绑定一个NioEventLoop
待续
步骤三:NioEventLoop执行逻辑
SingleThreadEventExecutor.this.run()
进入NioEventLoop.run() --> for(;;)
- select() [检查是否有IO事件]
- processSelectedKeys() [处理IO事件]
- runAllTasks() [处理异步任务队列--外部线程放进来的任务]
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false)); //进行select操作处于未唤醒状态
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
//ioRatio默认为50,也就是处理IO事件和处理外部事件时间一样
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
select() [检查是否有IO事件]
- deadline以及任务穿插逻辑处理
- 阻塞式select
- 避免jdk空轮询的bug
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime(); //当前时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
//delayNanos(currentTimeNanos)--计算定时任务队列第一个任务的截止时间
//selectDeadLineNanos -- 当前select操作不能超过这个时间
for (;;) {
//首先计算是否超时
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) { //如果超时一次也没有select,则进行一个非阻塞的select方法
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
//未超时则查看队列里是否有任务,有的话并进行唤醒操作,执行非阻塞select
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
//截止时间没到,且当前任务队列为空的话则进行阻塞式select操作
int selectedKeys = selector.select(timeoutMillis); //timeoutMillis--本次超时的最大时间
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
//避免jdk空轮询bug
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { //已经执行过一次阻塞式select操作
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
//当前时间减去开始时间,小于timeout时间,表示并没有阻塞直接返回了,导致空轮询触发。接下来的逻辑是,判断空轮询的次数,如果大于512,调用selectRebuildSelector(selectCnt)--
//把老的selectKey注册到新的select上,这样新的select的阻塞式操作就可能不发生空轮询
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
hasTasks()
protected boolean hasTasks() {
return super.hasTasks() || !tailTasks.isEmpty();
}
tailTasks = newTaskQueue(maxPendingTasks);
selectRebuildSelector(selectCnt),其中attachment()为经过netty包装的channel
private Selector selectRebuildSelector(int selectCnt) throws IOException {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
Selector selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
return selector;
}
进入rebuildSelector():
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}
进入rebuildSelector0():
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment(); //经过netty包装的channel
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}
processSelectedKeys() [处理IO事件]
- selected keySet优化(默认进行优化):select操作每次都会把已经就绪的IO事件添加到底层的hashSet的数据结构里,netty会通过反射的方式将hashSet替换成数组的实现(netty不需要set的remove,contains操作)。这样在任何情况下,操作时间复杂度为O(1),优于hashSet.
- processSelectedKeysOptimized() :真正处理IO事件
keySet优化,在创建NioEventLoop时,会调用一个openSelector,优化操作其实是在他里面完成的。
在其中建立了一个数组,关键是add()方法。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean contains(Object o) {
return false;
}
@Override
public int size() {
return size;
}
@Override
public Iterator<SelectionKey> iterator() {
return new Iterator<SelectionKey>() {
private int idx;
@Override
public boolean hasNext() {
return idx < size;
}
@Override
public SelectionKey next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return keys[idx++];
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
void reset() {
reset(0);
}
void reset(int start) {
Arrays.fill(keys, start, size, null);
size = 0;
}
private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
}
openSelector()中的优化操作
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
//默认进行优化
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
//反射
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
//将原生selector替换成优化后的selector
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
processSelectedKeysOptimized()
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
进入 processSelectedKey()方法
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) { //key不合法
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps(); //读取IO事件OP_CONNECT、OP_WRITE、OP_READ OP_ACCEPT
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
runAllTasks() [处理异步任务队列--外部线程放进来的任务]
- task的分类和添加
- 任务的聚合
- 任务的执行
task的分类和添加
两个task任务队列,一个是普通任务队列,一个是定时任务队列,在NioEventLoop创建时就建立了。
普通任务队列
入口:NioEventLoop构造函数 --> 调用super()方法,进入SingleThreadEventLoop的构造方法 -->调用super()方法,进入SingleThreadEventExecutor类的构造方法,可以看到taskQueue的创建。
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
protected Queue<Runnable> newTaskQueue() {
return newTaskQueue(maxPendingTasks);
}
当外部线程调用NioEventLoop的一个execute()方法时,会判断调用execute()方法是在NioEventLoop线程里还是外部线程。若是在外部线程里面,则启动startThread()。其中有一个addTask()方法。里面有一个offerTask()方法,向队列里添加一个task。
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
定时队列
将callable包装成一个定时任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(callable, "callable");
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
delay = 0;
}
validateScheduled0(delay, unit);
return schedule(new ScheduledFutureTask<V>(
this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
首先判断是在NioEventLoop内发布的schedule还是在外部线程发布的schedule。如果是外部线程,则把添加操作变成线程安全的操作;内部线程则直接添加。这么做的原因是 scheduledTaskQueue()不是线程安全的,只是一个普通的优先队列。它是通过添加定时任务这个操作当成一个普通任务来保证所有对于scheduledTaskQueue的操作都是在NioEventLoop里实现的,由此保证线程安全。
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
SCHEDULED_FUTURE_TASK_COMPARATOR,
// Use same initial capacity as java.util.PriorityQueue
11);
}
return scheduledTaskQueue;
}
任务的聚合
入口:NioEventLoop.run()方法 --> runAllTasks(ioTime * (100 - ioRatio) / ioRatio); //执行任务时的时间不能超过传进来的参数时间。
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue(); //聚合任务
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
......
}
fetchFromScheduledTaskQueue():首先从定时任务队列中获取第一个任务。这个定时任务队列是一个优先队列,它的排序方式是按照截止时间来排序,如果截止时间相同则按照ID排序。其次,使用pollScheduledTask()取出任务后,将任务添加到普通任务队列里,如果添加失败,将该任务重新加入定时任务队列,否则的话,该任务会直接丢掉。添加后则继续在循环中获取任务,执行完之后,所有的定时任务都会被拉取到普通任务队列中。
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
进入ScheduledFutureTask类:
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
pollScheduledTask():首先拿到定时任务队列,获取第一个任务,若任务为空,则直接返回;如果定时任务的截止时间小于传进来的时间,就把定时任务从定时任务队列中移除,然后返回。否则返回null,说明当前定时任务队列里所有任务的截止时间还有到,没必要拿出来执行。
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;
}
任务的执行
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue(); //聚合任务
Runnable task = pollTask(); //从普通队列里取一个任务
if (task == null) {
afterRunningAllTasks(); //每个任务执行完之后都会进行一个收尾操作
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; //计算截止时间
long runTasks = 0;
long lastExecutionTime;
for (;;) { //循环执行队列里的所有任务
safeExecute(task); //安全执行任务
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) { //每执行64个任务会计算当前时间,若超过截止时间,就不执行任务。为什么是64?因为nanoTime()相当耗时
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime; //记录上一个执行时间
return true;
}
protected Runnable pollTask() {
assert inEventLoop();
return pollTaskFrom(taskQueue);
}
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
Runnable task = taskQueue.poll();
if (task == WAKEUP_TASK) {
continue;
}
return task;
}
}
protected static void safeExecute(Runnable task) { //出现异常之后并不阻止任务执行,知识打印异常日志
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}