接上篇Netty 源碼分析之 三 我就是大名鼎鼎的 EventLoop(一)
Netty 的 IO 處理循環
在 Netty 中, 一個 EventLoop 需要負責兩個工作, 第一個是作為 IO 線程, 負責相應的 IO 操作; 第二個是作為任務線程, 執行 taskQueue 中的任務.
接下來我們先從 IO 操縱方面入手, 看一下 TCP 資料是如何從 Java NIO Socket 傳遞到我們的 handler 中的.
Netty 是 Reactor 模型的一個實作, 并且是基于 Java NIO 的, 那麼從 Java NIO 的前生今世 之四 NIO Selector 詳解 中我們知道, Netty 中必然有一個 Selector 線程, 用于不斷調用 Java NIO 的 Selector.select 方法, 查詢目前是否有就緒的 IO 事件. 回顧一下在 Java NIO 中所講述的 Selector 的使用流程:
- 通過 Selector.open() 打開一個 Selector.
- 将 Channel 注冊到 Selector 中, 并設定需要監聽的事件(interest set)
- 不斷重複:
- 調用 select() 方法
- 調用 selector.selectedKeys() 擷取 selected keys
- 疊代每個 selected key:
- 1) 從 selected key 中擷取 對應的 Channel 和附加資訊(如果有的話)
- 2) 判斷是哪些 IO 事件已經就緒了, 然後處理它們. 如果是 OP_ACCEPT 事件, 則調用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 擷取 SocketChannel, 并将它設定為 非阻塞的, 然後将這個 Channel 注冊到 Selector 中.
- 3) 根據需要更改 selected key 的監聽事件.
- 4) 将已經處理過的 key 從 selected keys 集合中删除.
上面的使用流程用代碼來展現就是:
/**
* @author xiongyongshun
* @Email [email protected]
* @version 1.0
* @created 16/8/1 13:13
*/
public class NioEchoServer {
private static final int BUF_SIZE = 256;
private static final int TIMEOUT = 3000;
public static void main(String args[]) throws Exception {
// 打開服務端 Socket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 打開 Selector
Selector selector = Selector.open();
// 服務端 Socket 監聽8080端口, 并配置為非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
// 将 channel 注冊到 selector 中.
// 通常我們都是先注冊一個 OP_ACCEPT 事件, 然後在 OP_ACCEPT 到來時, 再将這個 Channel 的 OP_READ
// 注冊到 Selector 中.
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 通過調用 select 方法, 阻塞地等待 channel I/O 可操作
if (selector.select(TIMEOUT) == 0) {
System.out.print(".");
continue;
}
// 擷取 I/O 操作就緒的 SelectionKey, 通過 SelectionKey 可以知道哪些 Channel 的哪類 I/O 操作已經就緒.
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
// 當擷取一個 SelectionKey 後, 就要将它删除, 表示我們已經對這個 IO 事件進行了處理.
keyIterator.remove();
if (key.isAcceptable()) {
// 當 OP_ACCEPT 事件到來時, 我們就有從 ServerSocketChannel 中擷取一個 SocketChannel,
// 代表用戶端的連接配接
// 注意, 在 OP_ACCEPT 事件中, 從 key.channel() 傳回的 Channel 是 ServerSocketChannel.
// 而在 OP_WRITE 和 OP_READ 中, 從 key.channel() 傳回的是 SocketChannel.
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
//在 OP_ACCEPT 到來時, 再将這個 Channel 的 OP_READ 注冊到 Selector 中.
// 注意, 這裡我們如果沒有設定 OP_READ 的話, 即 interest set 仍然是 OP_CONNECT 的話, 那麼 select 方法會一直直接傳回.
clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
}
if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buf = (ByteBuffer) key.attachment();
long bytesRead = clientChannel.read(buf);
if (bytesRead == -1) {
clientChannel.close();
} else if (bytesRead > 0) {
key.interestOps(OP_READ | SelectionKey.OP_WRITE);
System.out.println("Get data length: " + bytesRead);
}
}
if (key.isValid() && key.isWritable()) {
ByteBuffer buf = (ByteBuffer) key.attachment();
buf.flip();
SocketChannel clientChannel = (SocketChannel) key.channel();
clientChannel.write(buf);
if (!buf.hasRemaining()) {
key.interestOps(OP_READ);
}
buf.compact();
}
}
}
}
}
還記得不, 上面操作的第一步 通過 Selector.open() 打開一個 Selector 我們已經在第一章的 Channel 執行個體化 這一小節中已經提到了, Netty 中是通過調用 SelectorProvider.openSocketChannel() 來打開一個新的 Java NIO SocketChannel:
private static SocketChannel newSocket(SelectorProvider provider) {
...
return provider.openSocketChannel();
}
第二步 将 Channel 注冊到 Selector 中, 并設定需要監聽的事件(interest set) 的操作我們在第一章 channel 的注冊過程 中也分析過了, 我們在來回顧一下, 在用戶端的 Channel 注冊過程中, 會有如下調用鍊:
Bootstrap.initAndRegister ->
AbstractBootstrap.initAndRegister ->
MultithreadEventLoopGroup.register ->
SingleThreadEventLoop.register ->
AbstractUnsafe.register ->
AbstractUnsafe.register0 ->
AbstractNioChannel.doRegister
在 AbstractUnsafe.register 方法中調用了 register0 方法:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 省略條件判斷和錯誤處理
AbstractChannel.this.eventLoop = eventLoop;
register0(promise);
}
register0 方法代碼如下:
private void register0(ChannelPromise promise) {
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
}
register0 又調用了 AbstractNioChannel.doRegister:
@Override
protected void doRegister() throws Exception {
// 省略錯誤處理
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}
在這裡 javaChannel() 傳回的是一個 Java NIO SocketChannel 對象, 我們将此 SocketChannel 注冊到前面第一步擷取的 Selector 中.
那麼接下來的第三步的循環是在哪裡實作的呢? 第三步的操作就是我們今天分析的關鍵, 下面我會一步一步向讀者展示出來.
thread 的 run 循環
在 EventLoop 的啟動 一小節中, 我們已經了解到了, 當 EventLoop.execute 第一次被調用時, 就會觸發 startThread() 的調用, 進而導緻了 EventLoop 所對應的 Java 線程的啟動. 接着我們來更深入一些, 來看一下此線程啟動後都會做什麼東東吧.
下面是此線程的 run() 方法, 我已經把一些異常處理和收尾工作的代碼都去掉了. 這個 run 方法可以說是十分簡單, 主要就是調用了 SingleThreadEventExecutor.this.run() 方法. 而 SingleThreadEventExecutor.run() 是一個抽象方法, 它的實作在 NioEventLoop 中.
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...
}
}
});
繼續跟蹤到 NioEventLoop.run() 方法, 其源碼如下:
@Override
protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
...
}
}
}
啊哈, 看到了上面代碼的 for(;;) 所構成的死循環了沒? 原來 NioEventLoop 事件循環的核心就是這裡!
現在我們把上面所提到的 Selector 使用步驟的第三步的部分也找到了.
這個 run 方法可以說是 Netty NIO 的核心, 屬于重中之重, 把它分析明白了, 那麼對 Netty 的事件循環機制也就了解了大部分了. 讓我們一鼓作氣, 繼續分析下去吧!
IO 事件的輪詢
首先, 在 run 方法中, 第一步是調用 hasTasks() 方法來判斷目前任務隊列中是否有任務:
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}
這個方法很簡單, 僅僅是檢查了一下 taskQueue 是否為空. 至于 taskQueue 是什麼呢, 其實它就是存放一系列的需要由此 EventLoop 所執行的任務清單. 關于 taskQueue, 我們這裡暫時不表, 等到後面再來詳細分析它.
當 taskQueue 不為空時, 就執行到了 if 分支中的 selectNow() 方法. 然而當 taskQueue 為空時, 執行的是 select(oldWakenUp) 方法. 那麼 selectNow() 和 select(oldWakenUp) 之間有什麼差別呢? 來看一下, selectNow() 的源碼如下:
void selectNow() throws IOException {
try {
selector.selectNow();
} finally {
// restore wakup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}
首先調用了 selector.selectNow() 方法, 這裡 selector 是什麼大家還有印象不? 我們在第一章 Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭 (用戶端) 時對它有過介紹, 這個 selector 字段正是 Java NIO 中的多路複用器 Selector. 那麼這裡 selector.selectNow() 就很好了解了, selectNow() 方法會檢查目前是否有就緒的 IO 事件, 如果有, 則傳回就緒 IO 事件的個數; 如果沒有, 則傳回0.
注意, selectNow() 是立即傳回的, 不會阻塞目前線程.
當 selectNow() 調用後, finally 語句塊中會檢查 wakenUp 變量是否為 true, 當為 true 時, 調用 selector.wakeup() 喚醒 select() 的阻塞調用.
看了 if 分支的 selectNow 方法後, 我們再來看一下 else 分支的 select(oldWakenUp) 方法.
其實 else 分支的 select(oldWakenUp) 方法的處理邏輯比較複雜, 而我們這裡的目的暫時不是分析這個方法調用的具體工作, 是以我這裡長話短說, 隻列出我們我們關注的内如:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
...
int selectedKeys = selector.select(timeoutMillis);
...
} catch (CancelledKeyException e) {
...
}
}
在這個 select 方法中, 調用了 selector.select(timeoutMillis), 而這個調用是會阻塞住目前線程的, timeoutMillis 是阻塞的逾時時間.
到來這裡, 我們可以看到, 當 hasTasks() 為真時, 調用的的 selectNow() 方法是不會阻塞目前線程的, 而當 hasTasks() 為假時, 調用的 select(oldWakenUp) 是會阻塞目前線程的.
這其實也很好了解: 當 taskQueue 中沒有任務時, 那麼 Netty 可以阻塞地等待 IO 就緒事件; 而當 taskQueue 中有任務時, 我們自然地希望所送出的任務可以盡快地執行, 是以 Netty 會調用非阻塞的 selectNow() 方法, 以保證 taskQueue 中的任務盡快可以執行.
IO 事件的處理
在 NioEventLoop.run() 方法中, 第一步是通過 select/selectNow 調用查詢目前是否有就緒的 IO 事件. 那麼當有 IO 事件就緒時, 第二步自然就是處理這些 IO 事件啦.
首先讓我們來看一下 NioEventLoop.run 中循環的剩餘部分:
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
上面列出的代碼中, 有兩個關鍵的調用, 第一個是 processSelectedKeys() 調用, 根據字面意思, 我們可以猜出這個方法肯定是查詢就緒的 IO 事件, 然後處理它; 第二個調用是 runAllTasks(), 這個方法我們也可以一眼就看出來它的功能就是運作 taskQueue 中的任務.
這裡的代碼還有一個十分有意思的地方, 即 ioRatio. 那什麼是 ioRatio呢? 它表示的是此線程配置設定給 IO 操作所占的時間比(即運作 processSelectedKeys 耗時在整個循環中所占用的時間). 例如 ioRatio 預設是 50, 則表示 IO 操作和執行 task 的所占用的線程執行時間比是 1 : 1. 當知道了 IO 操作耗時和它所占用的時間比, 那麼執行 task 的時間就可以很友善的計算出來了:
設 IO 操作耗時為 ioTime, ioTime 占的時間比例為 ioRatio, 則:
ioTime / ioRatio = taskTime / taskRatio
taskRatio = 100 - ioRatio
=> taskTime = ioTime * (100 - ioRatio) / ioRatio
根據上面的公式, 當我們設定 ioRate = 70 時, 則表示 IO 運作耗時占比為70%, 即假設某次循環一共耗時為 100ms, 那麼根據公式, 我們知道 processSelectedKeys() 方法調用所耗時大概為70ms(即 IO 耗時), 而 runAllTasks() 耗時大概為 30ms(即執行 task 耗時).
當 ioRatio 為 100 時, Netty 就不考慮 IO 耗時的占比, 而是分别調用 processSelectedKeys()、runAllTasks(); 而當 ioRatio 不為 100時, 則執行到 else 分支, 在這個分支中, 首先記錄下 processSelectedKeys() 所執行的時間(即 IO 操作的耗時), 然後根據公式, 計算出執行 task 所占用的時間, 然後以此為參數, 調用 runAllTasks().
我們這裡先分析一下 processSelectedKeys() 方法調用, runAllTasks() 我們留到下一節再分析.
processSelectedKeys() 方法的源碼如下:
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
這個方法中, 會根據 selectedKeys 字段是否為空, 而分别調用 processSelectedKeysOptimized 或 processSelectedKeysPlain. selectedKeys 字段是在調用 openSelector() 方法時, 根據 JVM 平台的不同, 而有設定不同的值, 在我所調試這個值是不為 null 的. 其實 processSelectedKeysOptimized 方法 processSelectedKeysPlain 沒有太大的差別, 為了簡單起見, 我們以 processSelectedKeysOptimized 為例分析一下源碼的工作流程吧.
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
...
}
}
其實你别看它代碼挺多的, 但是關鍵的點就兩個: 疊代 selectedKeys 擷取就緒的 IO 事件, 然後為每個事件都調用 processSelectedKey 來處理它.
這裡正好完美對應上了我們提到的 Selector 的使用流程中的第三步裡操作.
還有一點需要注意的是, 我們可以調用 selectionKey.attach(object) 給一個 selectionKey 設定一個附加的字段, 然後可以通過 Object attachedObj = selectionKey.attachment() 擷取它. 上面代代碼正是通過了 k.attachment() 來擷取一個附加在 selectionKey 中的對象, 那麼這個對象是什麼呢? 它又是在哪裡設定的呢? 我們再來回憶一下 SocketChannel 是如何注冊到 Selector 中的:
在用戶端的 Channel 注冊過程中, 會有如下調用鍊:
Bootstrap.initAndRegister ->
AbstractBootstrap.initAndRegister ->
MultithreadEventLoopGroup.register ->
SingleThreadEventLoop.register ->
AbstractUnsafe.register ->
AbstractUnsafe.register0 ->
AbstractNioChannel.doRegister
最後的 AbstractNioChannel.doRegister 方法會調用 SocketChannel.register 方法注冊一個 SocketChannel 到指定的 Selector:
@Override
protected void doRegister() throws Exception {
// 省略錯誤處理
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}
特别注意一下 register 的第三個參數, 這個參數是設定 selectionKey 的附加對象的, 和調用 selectionKey.attach(object) 的效果一樣. 而調用 register 所傳遞的第三個參數是 this, 它其實就是一個
NioSocketChannel
的執行個體. 那麼這裡就很清楚了, 我們在将 SocketChannel 注冊到 Selector 中時, 将 SocketChannel 所對應的 NioSocketChannel 以附加字段的方式添加到了selectionKey 中.
再回到 processSelectedKeysOptimized 方法中, 當我們擷取到附加的對象後, 我們就調用 processSelectedKey 來處理這個 IO 事件:
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
processSelectedKey 方法源碼如下:
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
...
try {
int readyOps = k.readyOps();
// 可讀事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
// 可寫事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// 連接配接建立事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
這個代碼是不是很熟悉啊? 完全是 Java NIO 的 Selector 的那一套處理流程嘛!
processSelectedKey 中處理了三個事件, 分别是:
- OP_READ, 可讀事件, 即 Channel 中收到了新資料可供上層讀取.
- OP_WRITE, 可寫事件, 即上層可以向 Channel 寫入資料.
- OP_CONNECT, 連接配接建立事件, 即 TCP 連接配接已經建立, Channel 處于 active 狀态.
下面我們分别根據這三個事件來看一下 Netty 是怎麼處理的吧.
OP_READ 處理
當就緒的 IO 事件是 OP_READ, 代碼會調用 unsafe.read() 方法, 即:
// 可讀事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
unsafe 這個字段, 我們已經和它打了太多的交道了, 在第一章 Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭 (用戶端) 中我們已經對它進行過濃墨重彩地分析了, 最後我們确定了它是一個 NioSocketChannelUnsafe 執行個體, 負責的是 Channel 的底層 IO 操作.
我們可以利用 Intellij IDEA 提供的 Go To Implementations 功能, 尋找到這個方法的實作. 最後我們發現這個方法沒有在 NioSocketChannelUnsafe 中實作, 而是在它的父類 AbstractNioByteChannel 實作的, 它的實作源碼如下:
@Override
public final void read() {
...
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
int totalReadAmount = 0;
boolean readPendingReset = false;
do {
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);
// 檢查讀取結果.
...
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
...
totalReadAmount += localReadAmount;
// 檢查是否是配置了自動讀取, 如果不是, 則立即退出循環.
...
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
} finally {
}
}
read() 源碼比較長, 我為了篇幅起見, 删除了部分代碼, 隻留下了主幹. 不過我建議讀者朋友們自己一定要看一下 read() 源碼, 這對了解 Netty 的 EventLoop 十分有幫助.
上面 read 方法其實歸納起來, 可以認為做了如下工作:
- 配置設定 ByteBuf
- 從 SocketChannel 中讀取資料
- 調用 pipeline.fireChannelRead 發送一個 inbound 事件.
前面兩點沒什麼好說的, 第三點 pipeline.fireChannelRead 讀者朋友們看到了有沒有會心一笑地感覺呢? 反正我看到這裡時是有的. pipeline.fireChannelRead 正好就是我們在第二章 Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (二) 中分析的 inbound 事件起點. 當調用了 pipeline.fireIN_EVT() 後, 那麼就産生了一個 inbound 事件, 此事件會以 head -> customContext -> tail 的方向依次流經 ChannelPipeline 中的各個 handler.
調用了 pipeline.fireChannelRead 後, 就是 ChannelPipeline 中所需要做的工作了, 這些我們已經在第二章中有過詳細讨論, 這裡就展開了.
OP_WRITE 處理
OP_WRITE 可寫事件代碼如下. 這裡代碼比較簡單, 沒有詳細分析的必要了.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
OP_CONNECT 處理
最後一個事件是 OP_CONNECT, 即 TCP 連接配接已建立事件.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
OP_CONNECT 事件的進行中, 隻做了兩件事情:
- 正如代碼中的注釋所言, 我們需要将 OP_CONNECT 從就緒事件集中清除, 不然會一直有 OP_CONNECT 事件.
- 調用 unsafe.finishConnect() 通知上層連接配接已建立
unsafe.finishConnect() 調用最後會調用到 pipeline().fireChannelActive(), 産生一個 inbound 事件, 通知 pipeline 中的各個 handler TCP 通道已建立(即 ChannelInboundHandler.channelActive 方法會被調用)
到了這裡, 我們整個 NioEventLoop 的 IO 操作部分已經了解完了, 接下來的一節我們要重點分析一下 Netty 的任務隊列機制.
Netty 的任務隊列機制
我們已經提到過, 在Netty 中, 一個 NioEventLoop 通常需要肩負起兩種任務, 第一個是作為 IO 線程, 處理 IO 操作; 第二個就是作為任務線程, 處理 taskQueue 中的任務. 這一節的重點就是分析一下 NioEventLoop 的任務隊列機制的.
Task 的添加
普通 Runnable 任務
NioEventLoop 繼承于 SingleThreadEventExecutor, 而
SingleThreadEventExecutor
中有一個 Queue<Runnable> taskQueue 字段, 用于存放添加的 Task. 在 Netty 中, 每個 Task 都使用一個實作了 Runnable 接口的執行個體來表示.
例如當我們需要将一個 Runnable 添加到 taskQueue 中時, 我們可以進行如下操作:
EventLoop eventLoop = channel.eventLoop();
eventLoop.execute(new Runnable() {
@Override
public void run() {
System.out.println("Hello, Netty!");
}
});
當調用 execute 後, 實際上是調用到了 SingleThreadEventExecutor.execute() 方法, 它的實作如下:
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
而添加任務的 addTask 方法的源碼如下:
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (isShutdown()) {
reject();
}
taskQueue.add(task);
}
是以實際上,
taskQueue
是存放着待執行的任務的隊列.
schedule 任務
除了通過 execute 添加普通的 Runnable 任務外, 我們還可以通過調用 eventLoop.scheduleXXX 之類的方法來添加一個定時任務.
EventLoop 中實作任務隊列的功能在超類
SingleThreadEventExecutor
實作的, 而 schedule 功能的實作是在
SingleThreadEventExecutor
的父類, 即
AbstractScheduledEventExecutor
中實作的.
在
AbstractScheduledEventExecutor
中, 有以 scheduledTaskQueue 字段:
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
scheduledTaskQueue 是一個隊列(Queue), 其中存放的元素是 ScheduledFutureTask. 而 ScheduledFutureTask 我們很容易猜到, 它是對 Schedule 任務的一個抽象.
我們來看一下
AbstractScheduledEventExecutor
所實作的 schedule 方法吧:
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<Void>(
this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
這是其中一個重載的 schedule, 當一個 Runnable 傳遞進來後, 會被封裝為一個 ScheduledFutureTask 對象, 這個對象會記錄下這個 Runnable 在何時運作、已何種頻率運作等資訊.
當建構了 ScheduledFutureTask 後, 會繼續調用 另一個重載的 schedule 方法:
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new OneTimeTask() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
在這個方法中, ScheduledFutureTask 對象就會被添加到 scheduledTaskQueue 中了.
任務的執行
當一個任務被添加到 taskQueue 後, 它是怎麼被 EventLoop 執行的呢?
讓我們回到 NioEventLoop.run() 方法中, 在這個方法裡, 會分别調用 processSelectedKeys() 和 runAllTasks() 方法, 來進行 IO 事件的處理和 task 的處理. processSelectedKeys() 方法我們已經分析過了, 下面我們來看一下 runAllTasks() 中到底有什麼名堂吧.
runAllTasks 方法有兩個重載的方法, 一個是無參數的, 另一個有一個參數的. 首先來看一下無參數的 runAllTasks:
protected boolean runAllTasks() {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
return true;
}
}
}
我們前面已經提到過, EventLoop 可以通過調用 EventLoop.execute 來将一個 Runnable 送出到 taskQueue 中, 也可以通過調用 EventLoop.schedule 來送出一個 schedule 任務到 scheduledTaskQueue 中. 在此方法的一開始調用的 fetchFromScheduledTaskQueue() 其實就是将 scheduledTaskQueue 中已經可以執行的(即定時時間已到的 schedule 任務) 拿出來并添加到 taskQueue 中, 作為可執行的 task 等待被排程執行.
它的源碼如下:
private void fetchFromScheduledTaskQueue() {
if (hasScheduledTasks()) {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
break;
}
taskQueue.add(scheduledTask);
}
}
}
接下來 runAllTasks() 方法就會不斷調用 task = pollTask() 從 taskQueue 中擷取一個可執行的 task, 然後調用它的 run() 方法來運作此 task.
注意
, 因為 EventLoop 既需要執行 IO 操作, 又需要執行 task, 是以我們在調用 EventLoop.execute 方法送出任務時, 不要送出耗時任務, 更不能送出一些會造成阻塞的任務, 不然會導緻我們的 IO 線程得不到排程, 影響整個程式的并發量.
本文由 yongshun 發表于個人部落格, 采用 署名-相同方式共享 3.0 中國大陸許可協定.
Email: yongshun1228@gmail .com
本文标題為: Netty 源碼分析之 三 我就是大名鼎鼎的 EventLoop(二)
本文連結為: https://segmentfault.com/a/1190000007403937