異步化的 Netty
Netty 在官網首頁有這麼一句話介紹自己
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.
異步的特性甚至還擺在事件驅動之前,可見其重要性。Netty 的異步操作在代碼中随處可見,幾個比較重要的地方傳回都是
ChannelFuture
接口。先來重溫下在什麼地方會遇到異步接口。
第一處,也是最為常見,在服務端引導程式綁定監聽端口的地方,代碼如下
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class);
ChannelFuture sync = serverBootstrap.bind(2323).sync();
bind
方法傳回的
ChannelFuture
對象有兩種使用方式:
- 第一種,在允許阻塞的上下文中,可以直接使用
或者sync
方法等待異步任務完成。await
- 第二種,目前上下文不能阻塞的情況,可以調用
的ChannelFuture
方法注冊一個回調函數。該回調函數會被異步任務被完成後觸發。addListener
第二處使用傳回異步任務的地方則是緊随監聽端口綁定成功之後,為了不讓main方法退出,需要去等待服務端程式的關閉,代碼如下
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class);
ChannelFuture sync = serverBootstrap.bind(2323).sync();
sync.channel().closeFuture().sync();
通過
sync.channel()
的調用獲得了綁定監聽端口成功的服務端通道。而後通過
closeFuture
方法獲得了該服務端通道的關閉異步任務。隻有在服務端通道關閉後,該異步任務才會完成。通常而言,服務端通道關閉就意味着整個網絡服務應用的下線。是以在這裡等待通道的關閉實質就是等待整體應用的結束。
這裡的等待是有着實質的重要作用的,一般而言,我們在初始化
ServerBootstrap
都會傳入工作線程池,也就是
EventLoopGroup
對象。這些線程池在服務端通道關閉後,其内部的任務隊列可能還剩餘一些任務沒有完成。此時為了資料的正确性考慮,不能強制關閉整個程式,否則就可能造成資料不一緻或其他異常。是以需要在
EventLoopGroup
上執行優雅關閉,也就是調用
shutdownGracefully
方法。該方法會首先切換
EventLoopGroup
到關閉狀态進而拒絕新的任務的加入,然後在任務隊列的任務都處理完成後,停止線程的運作。進而確定整體應用是在正常有序的狀态下退出的。
一般而言,在服務端的代碼中我們的寫法都是:
public static void main(String[] args)
{
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try
{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
serverBootstrap.channel(NioServerSocketChannel.class);
ChannelFuture bind = serverBootstrap.bind(2356);
bind.sync();
Channel serverChannel = bind.channel();
serverChannel.closeFuture().sync();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
如果沒有
serverChannel.closeFuture().sync();
就會直接結束
main
方法,然後執行
finally
中的内容,這會導緻運作中的應用中斷。根據上文的介紹,除了使用
sync
等待,還可以添加監聽器,在監聽器中進行線程池的優雅關閉。不過相對來說,
sync
等待這種寫法會比較常見和簡潔一些。
第三處則是在資料寫出的地方,先看執行個體代碼
public static void main(String[] args)
{
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
final AtomicInteger count = new AtomicInteger();
try
{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(new ChannelInboundHandlerAdapter()
{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
ChannelFuture future = ctx.write(msg);
future.addListener(new ChannelFutureListener()
{
@Override
public void operationComplete(ChannelFuture future) throws Exception
{
//消息數量統計
count.incrementAndGet();
}
});
}
});
}
});
ChannelFuture bind = serverBootstrap.bind(2356);
bind.sync();
Channel serverChannel = bind.channel();
serverChannel.closeFuture().sync();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
這個例子中我們實作簡單的消息發出的總數的功能。可以注意到,我們将計數的增加放在了任務的監聽器之中實作。
這是因為執行
io.netty.channel.ChannelOutboundInvoker#write(java.lang.Object)
方法,該方法是一個異步方法,直接傳回了
ChannelFuture
執行個體,當方法傳回的時候,消息可能還沒有寫入到Socket發送緩沖區。如果在方法傳回的時候就進行累加,累加的結果就和實際情況存在偏差了。
而在異步任務的監聽器中進行累加,當方法
operationComplete
被調用時,資料已經被寫入socket發送緩存區。此時進行計數累加的結果就是真正的消息發出的總數了(不考慮 TCP 通道中斷的情況下)。
異步的好處顯而易見,不讓線程阻塞在 IO 操作上,可以盡可能的利用CPU 資源。不過異步并不是“免費午餐”,支援異步實作需要背後高效合理的線程模式設計。這也是下文要分析的内容。
從《Scalable IO in Java》看線程模型
在作業系統支援 IO 多路複用能力後,針對這種能力,衍生了專門使用其的程式設計模型,也就是
Reactor pattern
。網絡上的翻譯都是反應堆模式,但是覺得一點都不達意,也沒有找到好的翻譯,是以下文就直接稱呼為 reactor 模式。
在 Java1.4 支援 NIO 後,并發界的大佬 Doug Lea 發了一個ppt,《Scalable IO In Java》。在其中闡述了使用如何将reactor 模式應用在 NIO 的程式設計上。一口吃不成胖子,一步步來看下線程模型是如何變化的。
早期的時候,隻有 BIO 模式,也就是一個線程服務一個用戶端的模型。使用圖來表達的話,就類似于
一個服務端線程阻塞在 ServerSocket 的
accept
方法,一旦方法傳回,有用戶端連結建立,則建立一個 handler 處理這個連接配接的資料讀取,解碼,業務計算,編碼,響應資料發送。通常而言,一個 handler 運作在一個獨立的線程中。
簡單粗暴好了解,唯一的問題就是這種模式擴充性很差,随着用戶端數量的增多,建立的線程也越來越多,而線程的建立消耗記憶體資源,線程的排程和上下文儲存更是消耗許多 CPU 資源的。一旦線程建立的太多了,甚至會有個拐點,處理效率斷崖式下跌。
這種模型在 JDK1.4 之前是唯一的選擇。在 JDK 提供了 NIO 之後,情況有了徹底的改觀。Reactor 模式也開始登場。首先來看下,基礎reactor 模式,如下圖
在之前的文章我們介紹過,基于 IO 複用能力,一個
Selector
可以監控數以千計的用戶端連接配接。基礎 Reactor 模式也是如此,使用一個多路同步監控器來監控多個連接配接上的 IO 事件。這些 IO 事件可以包括連接配接的接口和建立(accept),連接配接可讀(readready),連接配接可寫(writeready)。是以這個多路同步監控器可以監控服務端通道以及在接受用戶端後建立的用戶端通道。
當多路同步監控器監控到 IO 事件發生時,則會将事件傳遞給派發器。而派發器則會将事件傳遞給合适的事件處理器執行處理,也就是handler,具體仍然是處理讀取,解碼,計算,編碼,發送等邏輯。
基礎 Reactor 模式中,多路同步監控器,派發器,事件處理器全部運作在同一個線程中,這個線程稱之為 Reactor 線程。隻不過由于 IO 多路複用的能力,是以一個線程也可以支撐數以千計的連接配接。這個模式當中,多路同步監控器這個角色由 NIO 中的
selector
來承擔,而派發器和事件處理器則是使用者自行實作的。
顯然,基礎 Reactor 模式無法有效利用多核 CPU。由于 IO 複用和非阻塞式 IO 的存在,使得基于 Reactor 模式下,io 事件的處理不再是阻塞式,可以有效的利用 CPU。但是解碼,計算和編碼則無法預計。為此,可以将非 IO 動作:解碼、計算、編碼這三個動作從 handler 中剝離,使用單獨的 Processor 處理。并且讓 Processor 運作在獨立的線程中,以此來提高 reactor 線程的運作效率。通常來說, processor 是運作線上程池中,doug lea 給這個起了個名字,worker thread pools。
演進後的模型如下圖
随着連接配接數的增多,僅僅依靠一個 Reactor 處理讀寫事件也會顯得效率不夠以及對 CPU 的利用不充分了。此時,可以将reactor線程擴充。考慮到隻有一個服務端通道,且其 IO 事件隻有用戶端的連接配接事件;而用戶端通道的事件主要是讀事件和寫事件,與服務端通道存在明顯的區分。是以将 Reactor 區分為 2 類:執行服務端通道的接入類和執行用戶端通道的讀寫類。細化來說,此時存在 2 組 reactor 線程:
- 主 Reactor 線程,隻有一個,負責處理服務端通道上的 IO 事件,也就是用戶端的接入。
- 子 Reactor 線程,通常多個,負責處理用戶端通道上的 IO 事件,也就是用戶端連結的讀寫就緒。
簡單而言,就是主 Reactor 在收到用戶端接入時,選擇一個子 Reactor 線程,将用戶端連結分發給它,進行後續的讀寫處理。而子Reactor 線程在遇到非 IO 工作時,繼續分發給 Worker thread pool 處理。
使用圖來表達這個模式就是
在 Doug lea 的 PPT 中将隻增加了 Worker thread pools 的模式和多線程 Reactor 模式統稱為 Reactor 模式的多線程版本。但是在大部分的中文部落格中将前者稱之為多線程 Reactor 模式,将後者稱之為主從 Reactor模式,未能查找到這種起名的來源,不過後文會沿用這種傳統,将上述三種模式稱之為:單線程 Reactor 模式,多線程 Reactor 模式,主從 Reactor 模式。
Netty 的線程模型
Netty 可以通過配置,來實作不同的線程模型。而且需要改動的代碼相當的少。首先來看第一種,單線程 Reactor 模式,對應的代碼如下
class HelloWorld
{
public static void main(String[] args)
{
EventLoopGroup boss = new NioEventLoopGroup(1);
try
{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss).channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(new DecoderHandler());
}
});
ChannelFuture sync = serverBootstrap.bind(2323).sync();
sync.channel().closeFuture().sync();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
boss.shutdownGracefully();
}
}
}
在
main
方法的第一行中,我們将 Boss 線程組的大小設定為 1,這意味着該
NioEventLoopGroup
中的線程隻有 1 個。而後續 Netty的服務引導程式的 Group 配置中,我們隻傳遞了該 Group。這使得在Netty 發生的所有操作都是運作在這個線程上。此時,Netty 的線程模式就是單線程 Reactor 模式。當然,這種配置方式比較少出現在實踐中。
更正常的配置方式是建立兩個
EventLoopGroup
,并且将之配置到
ServerBootStrap
。如下
class HelloWorld
{
public static void main(String[] args)
{
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try
{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(new DecoderHandler());
}
});
ChannelFuture sync = serverBootstrap.bind(2323).sync();
sync.channel().closeFuture().sync();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
比第一個程式多了一個
worker
的
EventLoopGroup
。預設情況下,
NioEventLoopGroup
的線程數是核心數的 2 倍。在配置的時候也與第一個不同,同時傳遞了 2 個進去
serverBootstrap.group(boss, worker)
。Boss 組用于服務端通道處理用戶端接入就緒事件,Worker 組用于處理用戶端通道讀寫就緒事件。簡單而言,就是 Boss 組線程監聽着服務端的接入就緒事件,并且在處理成功後将接入的用戶端通道分發給 Worker 組。之後worker組就監控在其上的用戶端通道的讀寫就緒事件。
此時在用戶端通道上的讀寫,編解碼,計算都是運作在 Worker 組的線程中。為了避免并發問題,一個通道隻會綁定在一個線程上。Netty 将這種方式稱之為串行化設計。在這種配置模式下,串行化設計可以了解為一個通道上的所有 ChannelHandler 都運作同一個線程上,避免了上下文切換,減少了同步的損耗,同時應用整體又是并行的。實踐證明,這種模式的性能是十分高效的。
每一個
NioEventLoopGroup
都管理着一定數量的
NioEventLoop
線程,而一個
NioEventLoop
都會持有一個
Selector
對象,也就是
NioEventLoop
線程實際上就是reactor線程。是以上述的這種配置模式下,Netty 此時的模式比較接近于沒有使用 Worker thread Pool 的主從 reactor 模式。
當然,Netty 也提供了 Worker thread pool 模式的支援。但是這種方式比較少用,Netty 官網不能提到,社群中也沒有描述。具體的代碼如下
class HelloWorld
{
public static void main(String[] args)
{
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
final EventLoopGroup childWorker = new NioEventLoopGroup();
try
{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(childWorker,new DecoderHandler());
}
});
ChannelFuture sync = serverBootstrap.bind(2323).sync();
sync.channel().closeFuture().sync();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
boss.shutdownGracefully();
worker.shutdownGracefully();
childWorker.shutdownGracefully();
}
}
}
代碼主要的改變就是增加了一個
childWorker
組。并且在用戶端通道的管道對象添加ChannelHandler時,選擇關聯一個
EventExecutorGroup
。這意味對應的
ChannelHandler
運作在關聯的這個
EventExecutorGroup
的某個線程中(這個關聯關系是在add類方法中被确定的)。
如果每一個處理器都被額外的
EventExecutorGroup
關聯,那麼一個通道上除了讀寫調用工作在通道關聯的 Reactor 線程上,剩餘的
ChannelHandler
都可以工作在自定義線程上。此種情況,就是《Scalable IO In Java》提到的 Worker thread pools 模式。更貼近于多線程 Reactor 模式。在這種模式下,串行化則有了另外一種含義,那就是:一個
Channel
上的某個具體的
ChannelHandler
總是運作在一個固定的線程中,不會被并發,所有對該
Channelhandler
的調用都是串行的。
綜述
上面讨論了 reactor 模式及其多線程版本,以及 Netty 不同的設定對應的不同模式。在 Netty 中有一個設計原則就是避免對一個通道的并發操作,甚至于避免對一個通道上的一個具體的
Channelhandler
的并發操作。對
ChannelHandler
的調用都是串行執行的,是以使用者在實作業務代碼的時候就需要考慮并發安全的問題,簡化了代碼的處理。為了實作這個串行設計的目标,Netty 中的通道和 ChannelHandler 都被綁定到一個具體的線程上。在沒有顯示綁定的情況,
ChannelHandler
會被綁定到其關聯的通道綁定的線程上。
了解了這一點,對于為什麼 Netty 許多操作都是傳回一個異步任務對象就很容易了。因為如果目前線程不是需要操作的通道或者
ChannelHandler
綁定的線程,則 Netty 都會為目前操作生成一個對象,投入到其綁定的線程的任務隊列,讓線程自行取出并且執行。而投入完畢的時候任務并不會馬上完成,是以隻能傳回一個異步任務對象給調用者。而如果操作線程就是目前通道或者
ChannelHandler
綁定的線程則可以執行具體的操作而不用将操作包裝為任務進行投遞。但是為了接口的統一,此時也是傳回一個異步任務對象。隻不過這個傳回的異步任務對象,在傳回的時候就已經是已完成的狀态了。
總結與思考
本文讨論了《Scalable IO In Java》中提到的幾種在 NIO 使用場景下的線程模式變種,詳細分析了其變化和演進的思路和修改點。并且以Netty 自身的支援為切入,分析了 Netty 的線程模型,以及 Netty 如何通過參數變化來支援不同的線程模型。對線程模型的了解,也就能了解Netty中的一些并發安全保證和異步化接口背後的原理。
關于 Netty 還有一塊很重要的内容,也是其主要的 API 來源,就是事件驅動。Netty 在官網對自己的描述就是一個事件驅動的架構。下一篇文章,我們就會來詳細的講解 Netty 中的事件究竟是個怎麼回事以及如何在基于事件的模型下開發 Netty 程式。