這裡寫目錄标題
- Netty 源碼解讀
- EventLoop源碼
- 1.執行個體化NioEventLoopGroup
- 2.eventLoop的執行邏輯
- 關注我日常更新分享Java程式設計技術!希望大家都能早日走上人生巅峰!

可以添加小助手vx:xiehuangbao1123 領取java全套學習資料
Netty 源碼解讀
EventLoop源碼
1.執行個體化NioEventLoopGroup
我們丢棄服務的代碼直接使用的無參的構造函數進行執行個體化的,那麼我們就從這個構造函數開始看吧,見io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup()
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
//看看這個地方建立了一個SelectorProvider那你是不是想到了Java NIO中的Selector了呢
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
往super裡跟,看看還幹了什麼,見:io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object…)
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//看這個地方,如果用的無參構造函數這個地方傳進來的都是0,但最終這個線程數為DEFAULT_EVENT_LOOP_THREADS
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
可以看到如果不傳線程數預設的線程數是DEFAULT_EVENT_LOOP_THREADS,那這個又是多少呢?點進去看定義可以看到是計算機核數的2倍:
Runtime.getRuntime().availableProcessors() * 2
。
繼續往super裡面跟進,見io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object…)
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
//執行個體化executor
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//EventExecutor數組
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//執行個體化數組中的成員,我們跟進去看看
children[i] = newChild(executor, args);
success = true;
}
...
}
//給這個evetLoop建立一個選擇器,選擇器你可以了解為有多個eventLoop時,Channel選擇運作在哪個eventLoop的線程上,感興趣的話後續我們也寫個專題講講。
chooser = chooserFactory.newChooser(children);
}
我們繼續跟進
newChild
方法,見,io.netty.channel.nio.NioEventLoopGroup#newChild
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
//這個地方就執行個體化了一個eventLoop,我們繼續跟進構造函數看看
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
跟進構造函數,見,io.netty.channel.nio.NioEventLoop#NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
//這個super我們就不繼續跟進了,你看傳參executor就可以想到把executor儲存成成員變量為後續給這個eventLoop啟動一個線程做鋪墊了
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
//我們主要看這,可以看到Java NIO的selector在這裡被建立并且成為了eventLoop的成員變量了
selector = openSelector();
selectStrategy = strategy;
}
綜上,我們回顧一下執行個體化NioEventLoopGroup都幹了什麼:建立了一個大小為計算機核數2倍的eventLoop數組并執行個體化了每個eventLoop,最核心的就是建立了eventLoop的成員變量:executor和selector
2.eventLoop的執行邏輯
還記得在上述章節我們講了會在建立完ServerSocketChannel的之後會注冊到其中一個eventLoop上,見io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//在這個地方eventLoop開始執行,我們跟進去看
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
我們跟進這個execute方法,見io.netty.util.concurrent.SingleThreadEventExecutor#execute
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
//看這個地方,因為目前我們再main線程裡,是以執行這個方法啟用一個新的線程
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
//上面調的是這個方法
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
//繼續跟進
doStartThread();
}
}
}
//調的這裡
private void doStartThread() {
assert thread == null;
//最後是通過這個ThreadPerTaskExecutor來啟動的新的線程
executor.execute(new Runnable() {
executor.execute(new Runnable() {
@Override
public void run() {
//首先把目前的這個線程儲存在目前這個eventLoop的成員變臉thread中,看這樣就完成了一個eventLoop和一個thread的綁定
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//接下來我們主要看這個地方到底幹了什麼
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...
}
}
});
}
我們繼續跟進這個
SingleThreadEventExecutor.this.run()
;見io.netty.channel.nio.NioEventLoop#run
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
//(1)這個select有沒有很眼熟呢?回想一下Java NIO
select(wakenUp.getAndSet(false));
...
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
//(2)這裡是不是也眼熟?
processSelectedKeys();
} finally {
//(3)我們也看看這裡幹了什麼
runAllTasks();
}
}
...
}
}
- select
我們跟進這個方法見io.netty.channel.nio.NioEventLoop#select
private void select(boolean oldWakenUp) throws IOException {
//拿到Java NIO的selector
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
//這段是不是很熟悉,就是通過for循環不斷輪詢是否有時間發生
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
...
}
}
}
上邊的代碼我們先隻關心Netty其實就是調用了Java NIO的selector開始輪詢是否有時間發生。之是以它寫的比較複雜是為了避免一個比較有名的一個空輪詢的bug,那在這裡我們先不讨論,以後可以成立獨立的專題來講述一下。
- processSelectedKeys()
一旦有事件發生我們就可以通過
processSelectedKeys()
處理事件了,跟進見:io.netty.channel.nio.NioEventLoop#processSelectedKeys
private void processSelectedKeys() {
//有些人可能會好奇為啥這個selectedKeys不是在selector的成員變量被Neety單拿出來了,其實這個地方Netty針對這個selector的selectedKeys做了一些優化,我們也不細說,也可以單獨成立個專題說,我們還是先串流程。
if (selectedKeys != null) {
//執行這裡
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
//這個地方和Java NIO依次處理發生的事件也同樣一樣
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//因為我們建立了NioServerSocketChannel是以我們主要看這裡,要處理事件了
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
...
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//得到目前AbstractNioChannel中的unsafe成員變量
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
try {
//下面的代碼是不是又很熟悉了?就是Java NIO處理各種不同僚件的代碼了
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//這個地方我們還記的在Java NIO中幹了什麼嗎?建立SocketChannel并注冊讀事件對不對?我們會在下一章讨論這裡,即Netty是如何處理新連接配接接入的
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
綜上,eventLoop的執行已經講解完,我們回顧上一章并聯合Java NIO想想都做了什麼?首先建立了NioServerSocketChannel,然後開啟selector輪詢事件,最後在selector上注冊了OP_ACCEPT事件和Java NIO我們寫的流程差不多呢?這樣記是不是好記多了呢?