NioEventLoop 任務的執行
今天跟核心方法,關于任務的處理
下面是EventLoop第一次執行execute方法的時候,會觸發的邏輯,會執行一個核心Runnable任務,該任務會進行selector的選擇,然後處理三類任務,以及I/O就緒事件:
- 注冊、綁定端口、為NioServerChannel的pipline添加連接配接處理器等任務都會放到任務隊列taskQueue,這是一類
- 還有一類是定時任務,但是定時任務的處理隻要定時任務到了執行時間,也會放入taskQueue
- 第三類是收尾任務,會放到tailTasks
除了處理任務,還會處理Selector監聽到的這種就緒事件。
SingleThreadEventExecutor.this.run()
//io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
private void doStartThread() {
assert thread == null;
// 調用目前EventLoop所包含的executor(子executor)
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 進行selector的選擇,然後執行三類任務
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...
}
}
});
}
我們跟SingleThreadEventExecutor.this.run()具體實作方法io.netty.channel.nio.NioEventLoop#run:
//io.netty.channel.nio.NioEventLoop#run
protected void run() {
// 永久循環
for (;;) {
try {
try {
// ------------------------- 1 selector選擇 -------------------
// 計算出選擇selector政策
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE: // NioEventLoop不支援
continue;
case SelectStrategy.BUSY_WAIT: // Nio不支援
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: // NioEventLoop支援的唯一政策
// 若執行這裡,說明目前任務隊列中沒有任務
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
// 若目前線程剛被喚醒,selector立即将其選擇的結果傳回給我們
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio用于控制IO處理與任務隊列中任務的處理所占時間比例
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
// ------------------------- 2 處理就緒的IO -------------------
// IO操作的開始時間
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// ------------------------- 3 執行任務隊列中的任務 -------------------
// Ensure we always run tasks.
// IO操作總用時
final long ioTime = System.nanoTime() - ioStartTime;
// ioTime * [(100 - ioRatio) / ioRatio]
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
以上代碼我們分三大流程:
- 1.selector選擇
- 2.處理就緒的IO
- 3.執行任務隊列中的任務
1.selector選擇
//io.netty.channel.nio.NioEventLoop#run
protected void run() {
// 永久循環
for (;;) {
try {
try {
// ------------------------- 1 selector選擇 -------------------
// 計算出選擇selector政策
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE: // NioEventLoop不支援
continue;
case SelectStrategy.BUSY_WAIT: // Nio不支援
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: // NioEventLoop支援的唯一政策
// 若執行這裡,說明目前任務隊列中沒有任務
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
// 若目前線程剛被喚醒,selector立即将其選擇的結果傳回給我們
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
...
}
}
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()):
計算出選擇政策,有三種政策:
- SelectStrategy.SELECT值為-1 (隻有這個政策會用到,其他都沒用)
- SelectStrategy.CONTINUE值為-2
- SelectStrategy.BUSY_WAIT值為-3
先看下hasTasks():
//io.netty.channel.SingleThreadEventLoop#hasTasks
protected boolean hasTasks() {
// 判斷 taskQueue 或 tailTasks 任務隊列是否為空
return super.hasTasks() || !tailTasks.isEmpty();
}
//io.netty.util.concurrent.SingleThreadEventExecutor#hasTasks
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}
即判斷目前EventLoop中的普通任務隊列和尾部任務隊列中是否有任務。
再看selectStrategy.calculateStrategy:
//io.netty.channel.DefaultSelectStrategy
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
// 若任務隊列有任務,則馬上進行非阻塞選擇
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
有任務走selectSupplier.get(),selectSupplier是啥,是NioEventLoop 中的一個成員變量:
public final class NioEventLoop extends SingleThreadEventLoop {
...
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
// 非阻塞選擇
return selectNow();
}
};
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeout for
* the select method and the select method will block for that time unless
* waken up.
* 布爾值,用于控制選擇器是否被阻塞。select應該跳出它的選擇過程。
* 在我們的例子中,我們為選擇方法使用了一個逾時阻塞方式,而選擇方法将在那個時間内阻塞,除非被喚醒。
* 解釋:簡單來說就是在NioEventLoop的線程在執行SelectStrategy.SELECT政策對應
* 的select方法的時候,裡面會用selector.select(long)指定逾時時間阻塞的方式
* 進行選擇,如果我們希望跳出這個政策對應的整個select方法,可以将wakenUp變量
* 置為true,代表喚醒,内部會自己調用selector.wakeup(),喚醒線程并跳出
* 整個select選擇這一步,而去執行後面處理就緒IO事件和任務隊列中的任務邏輯。
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
int selectNow() throws IOException {
try {
// NIO底層的非阻塞選擇
return selector.selectNow();
} finally {
// restore wakeup state if needed
// wakenUp為true,表示目前eventLoop所綁定的線程剛剛被喚醒
// wakenUp為false,表示目前eventLoop所綁定的線程即将被阻塞
if (wakenUp.get()) {
// 立即将選擇的結果寫入到目前eventLoop的集合
// 喚醒selector.select方法
selector.wakeup();
}
}
}
...
}
selector.selectNow()的傳回值是已經就緒的key的數量,沒有就緒的就是0,即selectSupplier.get()的傳回值是大于等于0的,是以隻要任務隊列有任務肯定是不會走到後面case SelectStrategy.SELECT的邏輯,不會進行選擇。
selector.wakeup():這個是nio的api,之前應該介紹過,使尚未傳回的第一個選擇操作立即傳回,即如果selector.select()方法在執行中并且被阻塞,調用這個方法可以讓selector.select()立即傳回
/**
* //java.nio.channels.Selector#wakeup
*
* Causes the first selection operation that has not yet returned to return
* immediately.
* 使尚未傳回的第一個選擇操作立即傳回。
*
* <p> If another thread is currently blocked in an invocation of the
* {@link #select()} or {@link #select(long)} methods then that invocation
* will return immediately. If no selection operation is currently in
* progress then the next invocation of one of these methods will return
* immediately unless the {@link #selectNow()} method is invoked in the
* meantime. In any case the value returned by that invocation may be
* non-zero. Subsequent invocations of the {@link #select()} or {@link
* #select(long)} methods will block as usual unless this method is invoked
* again in the meantime.
*
* <p> Invoking this method more than once between two successive selection
* operations has the same effect as invoking it just once. </p>
*
* @return This selector
*/
public abstract Selector wakeup();
回到selectStrategy.calculateStrategy,再看這裡的意思就很清楚了:
若任務隊列有任務,則馬上進行非阻塞選擇
,此時傳回值肯定是大于等于0,直接走出swith case了,不會被任何一個case選中
//io.netty.channel.DefaultSelectStrategy
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
// 若任務隊列有任務,則馬上進行非阻塞選擇
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
否則走case SelectStrategy.SELECT,準備執行SELECT選擇政策對應的select方法,隻要走這段代碼,即代表
目前任務隊列肯定是沒有任何任務的
:
case SelectStrategy.SELECT: // NioEventLoop支援的唯一政策
// 若執行這裡,說明目前任務隊列中沒有任務
// 把wakenUp設定為false,意思是目前線程馬上即将要阻塞,阻塞去處理Selector.select進行選擇
select(wakenUp.getAndSet(false));
// 若目前線程剛被喚醒,selector立即将其選擇的結果傳回給我們
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
select(wakenUp.getAndSet(false))的意思是:
把wakenUp設定為false,設定為阻塞狀态,代表目前線程正在處理“選擇操作”,可以認為是正在執行SelectStrategy.SELECT對應的select方法,但不能簡單認為是selector.select()方法導緻的線程阻塞
,select()這個方法比較複雜,分四大塊:
- 1 處理定時任務
- 2 在選擇期間,判斷任務隊列中有新任務加入
- 3 阻塞式選擇
- 4 處理導緻選擇結束阻塞的各種情況(5種)
private void select(boolean oldWakenUp) throws IOException {
//每個NioEventLoop都有一個Selector,在構造裡面初始化的
Selector selector = this.selector;
try {
// 計數器,記錄目前選擇執行的輪數
int selectCnt = 0;
// 擷取目前select()開始的時間點
long currentTimeNanos = System.nanoTime();
// delayNanos():從定時任務隊列中取出一個定時任務,計算其還有多久就要執行了
// selectDeadLineNanos : 表示這個定時任務要開始執行的時間點
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// --------------------- 1 處理定時任務 ------------------
// 對于馬上就要到執行時間的定時任務,立即進行選擇
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
// 非阻塞選擇
selector.selectNow();
selectCnt = 1;
}
break;
}
// --------------------- 2 在選擇期間,任務隊列中有新任務加入 ------------------
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
// 非阻塞選擇
selector.selectNow();
selectCnt = 1;
break;
}
// --------------------- 3 阻塞式選擇 ------------------
// select()方法結束的條件:
// 1)有channel被選擇
// 2)seleter.wakeup()被調用
// 3)目前線程被打斷
// 4)阻塞時間逾時
// 5)其實這裡還有一個結束的條件:
// 當長時間沒有就緒的channel時,輪詢會出現長時間空轉,進而會導緻CPU占用率飙升,
// 此時會使select()結束
// 注意,timeoutMillis 在這裡是作為select()的阻塞時長的
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something, 有channel被選擇
// - waken up by user, or 或 seleter.wakeup()被調用
// - the task queue has a pending task. // 任務隊列中有挂起的任務
// - a scheduled task is ready for processing // 有定時任務
break;
}
// --------------------- 4 處理一些特殊情況 ------------------
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
// 代碼走到這裡,說明select()結束的條件是4)或5)
// 記錄目前時間
long time = System.nanoTime();
// 下面的式子等價于:
// time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)
// 目前for循環已經執行的時長 >= 阻塞時長
// 若if的這個條件成立,說明前面的select()方法是通過條件4)結束的
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
// 執行else說明 目前for循環已經執行的時長 < 阻塞時長 ,說明前面的select()是通過
// 條件5)結束的。若空轉次數大于等于指定的門檻值512,則重新建構selector
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
1.1 處理定時任務
看下delayNanos:從定時任務隊列中取出一個定時任務,計算其還有多久就要執行了
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
//io.netty.util.concurrent.SingleThreadEventExecutor#delayNanos
// currentTimeNanos是目前select()開始的時間點
protected long delayNanos(long currentTimeNanos) {
// 從定時任務隊列中擷取一個任務
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
// 若沒有定時任務,則傳回1秒
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
// 傳回任務還有多久就要開始執行
return scheduledTask.delayNanos(currentTimeNanos);
}
//擷取任務該方法在父類實作中
//io.netty.util.concurrent.AbstractScheduledEventExecutor#peekScheduledTask
final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (scheduledTaskQueue == null) {
return null;
}
// 從定時任務隊列中取出一個任務
// 定時任務隊列中的任務是按照時間升序排列的
return scheduledTaskQueue.peek();
}
...
}
看下scheduledTask.delayNanos
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
...
//傳回任務還有多久就要開始執行
//currentTimeNanos是目前select()開始的時間點
public long delayNanos(long currentTimeNanos) {
// currentTimeNanos - START_TIME : 目前定時任務系統已經啟動了多久
// deadlineNanos() - (currentTimeNanos - START_TIME) :目前任務還需要等待多久就可以執行了
// 判斷取最大值,如果存活時間超過了定時任務指定的時間,會是負數,則傳回0,需要馬上執行
return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
}
// 記錄目前類SingleThreadEventExecutor加載的時間點
// 可以了解為定時任務系統啟動的時間(類變量)
private static final long START_TIME = System.nanoTime();
public long deadlineNanos() {
// 目前定時任務的執行期限
// 這個期限一定是相對于START_TIME的
return deadlineNanos;
}
...
}
是以long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos),selectDeadLineNanos 代表的就是定時任務隊列中最早執行的定時任務要開始執行的時間點,注意如果沒有定時任務delayNanos傳回的是1秒
看下for循環裡面第一個處理定時任務的邏輯:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
// 計數器,記錄目前選擇執行的輪數
int selectCnt = 0;
// 擷取目前select()開始的時間點
long currentTimeNanos = System.nanoTime();
// delayNanos():從定時任務隊列中取出一個定時任務,計算其還有多久就要執行了
// selectDeadLineNanos : 表示這個定時任務要開始執行的時間點
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// --------------------- 1 處理定時任務 ------------------
// 對于馬上就要到執行時間的定時任務,立即進行選擇
// 時間機關都是納秒,500000L是0.5毫秒,加0.5豪秒再除以1秒,取餘的意思是"有快到了的任務,
// 就退出目前select操作,立即選擇傳回結果,然後結束循環,出去去處理定時任務"
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
// 非阻塞選擇
selector.selectNow();
selectCnt = 1;
}
break;
}
...
}
...
如果有馬上要執行的定時任務,則selector立即選擇,然後break退出循環後,退出select方法,出去以後,走到第三步“處理任務隊列中的任務”邏輯,會處理定時任務的邏輯,代碼就在下面的在第三步,先看下位置在哪後面在講定時任務的處理:
//io.netty.channel.nio.NioEventLoop#run
protected void run() {
// 永久循環
for (;;) {
try {
try {
// ------------------------- 1 selector選擇 -------------------
// 計算出選擇selector政策
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
...
case SelectStrategy.SELECT: // NioEventLoop支援的唯一政策
// 若執行這裡,說明目前任務隊列中沒有任務
select(wakenUp.getAndSet(false));
// 若目前線程剛被喚醒,selector立即将其選擇的結果傳回給我們
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
...
}
cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio用于控制IO處理與任務隊列中任務的處理所占時間比例
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
...
} else {
// ------------------------- 2 處理就緒的IO -------------------
// IO操作的開始時間
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// ------------------------- 3 執行任務隊列中的任務 -------------------
// Ensure we always run tasks.
// IO操作總用時
final long ioTime = System.nanoTime() - ioStartTime;
// ioTime * [(100 - ioRatio) / ioRatio]
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
...
}
}
select()中如果暫時沒有快要執行的定時任務,則不會退出循環,走後面的邏輯:
1.2 在選擇期間,判斷任務隊列中是否有新任務加入
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
...
for (;;) {
...
//如果暫時沒有快要執行的定時任務,則不會退出循環,走後面的邏輯:
// --------------------- 2 在選擇期間,任務隊列中有新任務加入 ------------------
// 在進select方法之前,任務隊列中肯定是沒有任務的,在select過程中,任務隊列是有可能有新任務加入的
// 是以這個if代表的情況是 沒有即将執行的定時任務,任務隊列有任務 的情況
// 這個時候就需要結束select去執行任務隊列裡面的任務,和上面定時邏輯退出循環一樣
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
//任務隊列有任務,并且目前wakenUp是false狀态,則置為true喚醒狀态
// 喚醒狀态不會進行select
// 非阻塞選擇
selector.selectNow();
selectCnt = 1;
break;
}
//如果定時任務沒有即将執行的,任務隊列也沒有任務要執行,則繼續走下面邏輯:阻塞等待方式select
...
}
...
} catch (CancelledKeyException e) {
...
}
}
如果定時任務沒有即将執行的,任務隊列也沒有任務要執行,則繼續走下面邏輯
1.3 阻塞式選擇
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
...
for (;;) {
...
//如果定時任務沒有即将執行的,任務隊列也沒有任務要執行,則繼續走下面邏輯:阻塞等待方式select
// --------------------- 3 阻塞式選擇 ------------------
// selector.select()方法結束的條件(看源碼注釋):
// 1)至少有一個channel被選擇
// 2)seleter.wakeup()被調用
// 3)目前線程被打斷
// 4)阻塞時間逾時
// 5)其實這裡還有一個結束的條件:
// 當長時間沒有就緒的channel時,輪詢會出現長時間空轉,進而會導緻CPU占用率飙升,
// 此時會使select()結束,這是一個BUG,後面會處理這個BUG
// 也算是一個優化吧,避免CPU占用率飙升
// 簡單解釋:比如selector.select,我們阻塞500個時間機關,當走了10個時間機關後
// 一直沒有就緒的channel,select一直在空轉,此時CPU占用率飙升,這裡就會結束阻塞直接傳回
// 等再次輪詢到這裡繼續select的時候,還是空轉CPU占用率又飙升,還會立即結束阻塞
// 是以這種情況我們設定的變量selectCnt會一直遞增
//
// 注意,timeoutMillis 在這裡是作為select()的阻塞時長的
// 如果之前有定時任務,則timeoutMillis 是定時任務執行的時間點
// 如果沒有delayNanos預設傳回是1秒
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
...
}
...
} catch (CancelledKeyException e) {
...
}
}
1.4 處理導緻選擇結束阻塞的各種情況(5種)
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
...
for (;;) {
...
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
//下面幾種情況會退出循環:
// - Selected something, 有channel被選擇
// - waken up by user, or
// oldWakenUp為true,或者wakenUp.get()為ture
// 代表使用者主動将wakenUp置為true,同時會調用
// seleter.wakeup()方法來喚醒線程,是以這裡代表的
// 是seleter.wakeup()被調用的情況
// 前兩種情況考慮的是select()方法結束的1,2兩種情況
//
// - the task queue has a pending task. // 任務隊列中有挂起的任務
// - a scheduled task is ready for processing // 有定時任務要執行了
// 即最後兩種情況是因為select操作的時候又阻塞了一段時間,
// 是以再重新校驗一下任務隊列是否有任務要處理或者定時任務要執行了
break;
}
// 代碼走到這裡,說明select()結束不是1),2)兩種情況
// 這裡考慮的是select()方法結束原因是目前線程被打斷的情況
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
// 代碼走到這裡,說明select()結束的條件是4)或5)
// 記錄目前時間
long time = System.nanoTime();
// 下面的式子等價于:
// time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)
// 目前for循環已經執行的時長 >= 阻塞時長
// 若if的這個條件成立,說明前面的select()方法是通過條件4)結束的
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
// 執行else說明 目前for循環已經執行的時長 < 阻塞時長 ,說明前面的select()
// 是通過條件5)結束的。若空轉次數大于等于指定的門檻值512,則重新建構selector
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
//重新建構的邏輯就不關注了
selector = selectRebuildSelector(selectCnt);
//建構完成以後又重新置為1
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
注意
:
SELECTOR_AUTO_REBUILD_THRESHOLD
總結:
- 如果任務隊列中
,則會讓有任務
,直接selector進行非阻塞選擇
處理就緒IO、任務隊列任務的邏輯進入後面
- 如果任務隊列中
,會執行SelectStrategy.SELECT對應的選擇政策,執行select方法,進行阻塞式選擇,當然不是簡單的直接就執行selector.select(long)阻塞式選擇,整個select方法有如下四個步驟:沒有任務
- 1.處理定時任務,計算阻塞選擇需要阻塞的時間
- 擷取最快要執行的定時任務,如果有定時任務,計算該定時任務執行的時間點
- 如果該時間點馬上就要到了,直接讓selector進行非阻塞選擇,然後會立即結束select政策,
處理就緒IO/任務隊列任務的邏輯進入後面
- 如果時間沒有馬上到,則後面第三步selector阻塞選擇的時候,阻塞的時間就是該時間
- 如果該時間點馬上就要到了,直接讓selector進行非阻塞選擇,然後會立即結束select政策,
- 如果沒有定時任務,則預設1秒
- 擷取最快要執行的定時任務,如果有定時任務,計算該定時任務執行的時間點
- 2.如果沒有快要執行的定時任務需要處理,則判斷任務隊列中有沒有任務(在選擇政策期間,任務隊列中可能會有新任務加入)
- 如果選擇政策期間有新任務加入,則直接讓selector進行非阻塞選擇,然後立即結束選擇政策,進入後面邏輯
- 3.如果任務隊列沒有任務,進行第三步selector.select阻塞時選擇,阻塞的時間由上面處理定時任務的時候計算好了
- 有5中情況會立即結束selector.select方法:
- 1)至少有一個channel被選擇
- 2)seleter.wakeup()被調用
- 3)目前線程被打斷
- 4)阻塞時間逾時
- 5)當長時間沒有就緒的channel時,輪詢會出現長時間空轉,進而會導緻CPU占用率飙升,為了避免這種情況會使select()結束
- 如果出現·1、2兩種結束情況·,或者
,或者select過程中,任務隊列中加入新任務
則會立即結束選擇政策,進入後面處理任務邏輯定時任務執行時間到了
- 否則進入第四步,處理剩下的情況
- 有5中情況會立即結束selector.select方法:
- 4.處理導緻選擇結束阻塞的各種情況(5種)
- 線程被打斷的情況
- select()方法是通過條件4結束的情況
- 通過條件5結束的情況
- 該情況是selector輪詢空轉導緻CPU占用率飙升進而結束select方法的情況
- 如果空轉次數大于等于指定的門檻值512,則重新建構selector
- 1.處理定時任務,計算阻塞選擇需要阻塞的時間
2.處理就緒的IO
不管selector選擇這一步多麼複雜,結束這一步後說明有任務需要處理了,包括就緒IO任務和任務隊列的任務(定時任務等),先看處理就緒IO
protected void run() {
// 永久循環
for (;;) {
try {
// ------------------------- 1 selector選擇 -------------------
// ...
...
// selector選擇結束後:
cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio用于控制IO處理與任務隊列中任務的處理所占時間比例
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
//100的話等于不用控制,全部都處理完
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
// ------------------------- 2 處理就緒的IO -------------------
// IO操作的開始時間
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// ------------------------- 3 執行任務隊列中的任務 -------------------
// Ensure we always run tasks.
// IO操作總用時
final long ioTime = System.nanoTime() - ioStartTime;
// ioTime * [(100 - ioRatio) / ioRatio]
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
處理就緒IO核心方法processSelectedKeys:
public final class NioEventLoop extends SingleThreadEventLoop {
...
private SelectedSelectionKeySet selectedKeys;
private void processSelectedKeys() {
// 若selectedKeys是優化過的
if (selectedKeys != null) {
// 優化的
processSelectedKeysOptimized();
} else {
// 一般的
processSelectedKeysPlain(selector.selectedKeys());
}
}
...
}
注意
:
Netty對Nio對SelectionKeySet做過優化,看SelectedSelectionKeySet這個類:
Nio的Selector裡面有三個集合,key set/Selected set/cancel set,都是Set
Netty優化就把這些Set都變成數組了,效率更高了
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
//改成數組了
SelectionKey[] keys;
//數組的length
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
...
}
2.1 如果是優化過的Set就執行processSelectedKeysOptimized:
//io.netty.channel.nio.NioEventLoop#processSelectedKeysOptimized
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// 設定為null以後channel關閉的時候可以馬上就行gc
// 其就相當于對set集合處理時,要将處理過的key從set集合中删除是一樣的,
// 為了避免對key的重複處理
selectedKeys.keys[i] = null;
// 對于NioEventLoop,key中的附件attachement中存放的是目前key所關聯的NioChannel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 處理目前周遊的key
processSelectedKey(k, (AbstractNioChannel) a);
} else {
//NioTask是在測試的時候放的,我們不用管
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
//io.netty.channel.nio.NioEventLoop#processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 處理key失效的情況
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
// 擷取到目前key所有就緒的操作
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
// 若就緒操作中包含連接配接操作,處理連接配接就緒
// 這個一般在用戶端連接配接才會有這種情況
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
// 處理一次就會取消監聽連接配接事件
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
// 進行連接配接
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
// 處理寫就緒
// 當将資料寫入到buffer,那麼目前channel就處于寫就緒
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 還要檢查readOps是否為0,以解決可能出現的JDK bug,否則可能導緻自旋循環
// 如果沒有關注的事件,也unsafe.read,隻要read就會打斷高速循環
// 處理讀就緒 或 接收連接配接就緒
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
- 連接配接就緒:對于用戶端連接配接才有這種情況
關于用戶端連接配接就緒操作:
用戶端發起連接配接請求,第一次直接連上了就不說了
如果沒連上,這時候會把連接配接就緒事件注冊到Selector,讓Selector去監聽連接配接,隻要連接配接了一次沒連接配接上,這個連接配接就處于連接配接就緒狀态
是以用戶端連接配接,先連一次,成功就成功了,否則就是處于就緒狀态
//io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#finishConnect public final void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. // 注意,隻有在連接配接嘗試既沒有取消也沒有逾時的情況下,事件循環才會調用此方法。 assert eventLoop().inEventLoop(); try { boolean wasActive = isActive(); doFinishConnect(); fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } } //說了隻有用戶端才有這種事件,是以doFinishConnect的實作我們看NioSocketChannel //io.netty.channel.socket.nio.NioSocketChannel#doFinishConnect protected void doFinishConnect() throws Exception { //看到調用的是nio的finishConnect方法 //之前講nio的時候說過 if (!javaChannel().finishConnect()) { throw new Error(); } }
- 寫就緒:平常我們調用api向通道寫資料,其實都是寫入到Buffer,當寫入到buffer,那麼目前channel就處于寫就緒狀态,此時就會将給定緩沖區的内容重新整理到遠端對等點。
- 讀就緒和接受連接配接就緒,都是通過read得到的
unsafe.read()方法有兩個實作,message消息的是用來處理接受連接配接就緒的,byte的是用來處理讀就緒的
看下NioMessageUnsafe:
再看下NioByteUnsafe:private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { ... try { try { do { //讀消息 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; //将消息傳給pipline上的處理器,并觸發其channelRead方法 //pipline下一章會介紹 pipeline.fireChannelRead(readBuf.get(i)); } ... } finally { ... } } } //看下讀消息的實作 //io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { //這就是為什麼接受連接配接觸發channelRead方法時 //接受的參數msg就是Channel的原因 buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
//io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read public final void read() { ... ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); //讀消息資料 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; //将消息傳到pipline上的處理器,觸發其channelRead方法 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); ... } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { ... } } //讀資料一般都是用戶端對應的通道發來的資料 //io.netty.channel.socket.nio.NioSocketChannel#doReadBytes protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }
2.2 如果selectedKeys沒有優化過,是一般的就走processSelectedKeysPlain方法
在回到處理就緒IO,如果selectedKeys沒有優化過,是一般的就走processSelectedKeysPlain方法,和優化差別就是一個操作Set一個操作數組:
//io.netty.channel.nio.NioEventLoop#processSelectedKeysPlain
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
3.執行任務隊列中的任務
io.netty.channel.nio.NioEventLoop#run中的第三步 執行任務隊列中的任務
io.netty.channel.nio.NioEventLoop#run
protected void run() {
// 永久循環
for (;;) {
try {
// ------------------------- 1 selector選擇 -------------------
...
cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio用于控制IO處理與任務隊列中任務的處理所占時間比例
// ioRatio初始值50
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
//如果是100,不用控制,全部處理完
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
// ------------------------- 2 處理就緒的IO -------------------
// IO操作的開始時間
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// ------------------------- 3 執行任務隊列中的任務 -------------------
// Ensure we always run tasks.
// IO操作總用時
final long ioTime = System.nanoTime() - ioStartTime;
// ioTime * [(100 - ioRatio) / ioRatio]
// 如果ioRatio是20,則(100-20)/20 = 4 ,即ioTime * 4
// 如果ioRatio是50,則(100-50)50 = 1 ,即ioTime * 1
// ioRatio用于控制IO處理與任務隊列中任務的處理所占時間比例
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
接下來看任務處理方法:
//io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)
protected boolean runAllTasks(long timeoutNanos) {
// 将定時任務隊列中所有可以執行的任務(即已經超過定時時間的任務還沒處理的任務)添加到taskQueue
fetchFromScheduledTaskQueue();
// 從taskQueue中擷取一個任務
Runnable task = pollTask();
// 若該任務為null,說明目前任務隊列中沒有任務了,
// 此時執行tailTasks中的收尾任務
if (task == null) {
afterRunningAllTasks();
return false;
}
// ScheduledFutureTask.nanoTime()傳回的是ScheduledFutureTask類的存活時間
// 可以了解為異步任務系統啟動到現在的時間
// 計算runAllTasks方法最多執行到哪個時間點必須結束,控制處理任務的時間
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
// 周遊執行taskQueue中的所有任務
for (;;) {
// 執行目前周遊的任務
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
// 每64次任務檢查一次逾時
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
} // end-for
// 執行tailTasks中的收尾任務
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
該方法三個核心流程:
- 将定時任務隊列中所有可以執行的任務(即已經超過定時時間還沒處理的任務)添加到taskQueue
- 執行目前周遊的任務
- 執行tailTasks中的收尾任務
3.1 将定時任務隊列中所有可以執行的任務(即已經超過定時還沒處理的任務)添加到taskQueue
//io.netty.util.concurrent.SingleThreadEventExecutor#fetchFromScheduledTaskQueue
// 将定時任務隊列中所有可以執行的任務添加到taskQueue
private boolean fetchFromScheduledTaskQueue() {
// 擷取目前時間相對于定時任務系統開始的時間的時長(即定時任務系統已經運作的時間)
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
// 從定時任務隊列中取出一個最緊急的需要執行的定時任務
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
// 在定時任務不空的前提下,将任務添加到taskQueue
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
// 若沒有添加成功,則重新放回到定時任務隊列
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
// 從定時任務隊列中再取出一個最緊急的需要執行的定時任務
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
從定時任務隊列取一個最緊急的已經要執行的任務
//io.netty.util.concurrent.AbstractScheduledEventExecutor#pollScheduledTask(long)
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
// 從定時任務隊列中取一個任務
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ?
null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
// 任務的期限時間都是相對于定時任務系統開始的那個時間的
// 若配置的期限時間比定時任務系統已經運作的時間小,說明這個任務早就應該執行了
if (scheduledTask.deadlineNanos() <= nanoTime) {
// 從定時任務隊列中删除該任務
scheduledTaskQueue.remove();
// 傳回該任務,以将其添加到taskQUeue去執行
return scheduledTask;
}
return null;
}
由此可以看出來,fetchFromScheduledTaskQueue方法就是将此時可以執行的所有異步任務添加到普通任務隊列taskQueue
pollTask取任務:
//io.netty.util.concurrent.SingleThreadEventExecutor#pollTask
protected Runnable pollTask() {
assert inEventLoop();
return pollTaskFrom(taskQueue);
}
//io.netty.util.concurrent.SingleThreadEventExecutor#pollTaskFrom
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
// 從任務隊列中取出一個任務,隻要其不是一個喚醒任務,則直接傳回
for (;;) {
Runnable task = taskQueue.poll();
if (task == WAKEUP_TASK) {
continue;
}
return task;
}
}
3.2 執行目前周遊的任務
safeExecute
//io.netty.util.concurrent.AbstractEventExecutor#safeExecute
protected static void safeExecute(Runnable task) {
try {
// 任務的run()最終在這裡執行了
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
3.2 執行tailTasks中的收尾任務
afterRunningAllTasks
//io.netty.channel.SingleThreadEventLoop#afterRunningAllTasks
protected void afterRunningAllTasks() {
//tailTasks就是那個尾部任務隊列
runAllTasksFrom(tailTasks);
}
//io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasksFrom
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
// 從任務隊列中擷取一個任務
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
// task為null,說明taskQueue中的任務全部執行完畢
if (task == null) {
return true;
}
}
}
//io.netty.util.concurrent.SingleThreadEventExecutor#pollTaskFrom
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
// 從任務隊列中取出一個任務,隻要其不是一個喚醒任務,則直接傳回
for (;;) {
Runnable task = taskQueue.poll();
if (task == WAKEUP_TASK) {
continue;
}
return task;
}
}