一、概念
早期的 Java API 隻支援由本地系統套接字庫提供所謂的阻塞函數來支援網絡程式設計。由于是阻塞 I/O ,要管理多個并發用戶端,需要為每個新的用戶端Socket 建立一個 Thread 。這将導緻一系列的問題,第一,在任何時候都可能有大量的線程處于休眠狀态(不可能每時每刻都有對應的并發數);第二,需要為每個線程的調用棧都配置設定記憶體;第三,JVM 線上程的上下文切換所帶來的開銷會帶來麻煩。
Java 在 2002 年引入了非阻塞 I/O,位于 JDK 1.4 的 java.nio 包中。class java.nio.channels.Selector 是Java 的非阻塞 I/O 實作的關鍵。它使用了事件通知以确定在一組非阻塞套接字中有哪些已經就緒能夠進行 I/O 相關的操作。因為可以在任何的時間檢查任意的讀操作或者寫操作的完成狀态,是以如圖 1-2 所示,一個單一的線程便可以處理多個并發的連接配接。

盡管可以直接使用 Java NIO API,但是在高負載下可靠和高效地處理和排程 I/O 操作是一項繁瑣而且容易出錯的任務,最好還是留給高性能的網絡程式設計專家——Netty。
Netty 是一款異步的事件驅動的網絡應用程式架構,支援快速的開發可維護的高性能的瞄向協定的服務端和用戶端。它駕馭了Java進階API的能力,并将其隐藏在一個易于使用的API之後。首先,它的基于 Java NIO 的異步的和事件驅動的實作,保證了高負載下應用程式性能的最大化和可伸縮性。其次, Netty 也包含了一組設計模式,将應用程式邏輯從網絡層解耦,簡化了開發過程, 同時也最大限度地提高了可測試性、子產品化以及代碼的可重用性。
tips:面向對象的基本概念—> 用較簡單的抽象隐藏底層實作的複雜性。
二、核心元件
- Channel
Channel是Java NIO的一個基本構造。可以看作是傳入或傳出資料的載體。是以,它可以被打開或關閉,連接配接或者斷開連接配接。以下是常用的Channel:
-- EmbeddedChannel
-- LocalServerChannel
-- NioDatagramChannel
-- NioSctpChannel
-- NioSocketChannel
- 回調
當一個回調被觸發時,相應的事件可以被一個interface-ChannelHandler的實作處理。
- Future
Netty中所有的I/O操作都是異步的。因為一個操作可能不會立即傳回,是以我們需要一種在之後的某個時間點确定其結果的方法。
Future 和 回調 是互相補充的機制,提供了另一種在操作完成時通知應用程式的方式。這個對象可以看作是一個異步操作結果的占位符;它将在未來的某個時刻完成,并提供對其結果的通路。
Netty 提供了ChannelFuture,用于在執行異步操作的時候使用。每個Netty的出站I/O操作都會傳回一個ChannelFuture。ChannelFuture能夠注冊一個或者多個ChannelFutureListener 執行個體。監聽器的回調方法operationComplete(),将會在對應的操作完成時被調用。
- ChannelHandler
Netty 的主要元件是ChannelHandler,它充當了所有處理入站和出站資料的應用程式邏輯的容器。
Netty 使用不同的事件來通知我們狀态的改變或者是操作的狀态,每個事件都可以被分發給ChannelHandler類中某個使用者實作的方法。Netty提供了大量預定義的可以開箱即用的ChannelHandler實作,包括用于各種協定的ChannelHandler。
現在,事件可以被分發給ChannelHandler類中某個使用者實作的方法。那麼,如果 ChannelHandler 處理完成後不直接傳回給用戶端,而是傳遞給下一個ChannelHandler 繼續處理呢?那麼就要說到 ChannelPipeline !
ChannelPipeline 提供了 ChannelHandler鍊 的容器,并定義了用于在該鍊上傳播入站和出站事件流的API。使得事件流經 ChannelPipeline 是 ChannelHandler 的工作,它們是在應用程式的初始化或者引導階段被安裝的。這些對象接收事件、執行他們所實作的處理邏輯,并将資料傳遞給鍊中的下一個ChannelHandler:
1、一個ChannelInitializer的實作被注冊到了ServerBootstrap中。
2、當 ChannelInitializer.initChannel()方法被調用時, ChannelInitializer将在 ChannelPipeline 中安裝一組自定義的 ChannelHandler。
3、ChannelInitializer 将它自己從 ChannelPipeline 中移除。
- EventLoop
EventLoop 定義了Netty的核心抽象,用來處理連接配接的生命周期中所發生的事件,在内部,将會為每個Channel配置設定一個EventLoop。
EventLoop本身隻由一個線程驅動,其處理了一個Channel的所有I/O事件,并且在該EventLoop的整個生命周期内都不會改變。這個簡單而強大的設計消除了你可能有的在ChannelHandler實作中需要進行同步的任何顧慮。
這裡需要說到,EventLoop的管理是通過EventLoopGroup來實作的。還要一點要注意的是,用戶端引導類是 Bootstrap,隻需要一個EventLoopGroup。服務端引導類是 ServerBootstrap,通常需要兩個 EventLoopGroup,一個用來接收用戶端連接配接,一個用來處理 I/O 事件(也可以隻使用一個 EventLoopGroup,此時其将在兩個場景下共用同一個 EventLoopGroup)。
1、一個 EventLoopGroup 包含一個或者多個 EventLoop;
2、一個 EventLoop 在它的生命周期内隻和一個 Thread 綁定;
3、所有由 EventLoop 處理的 I/O 事件都将在它專有的Thread 上被處理;
4、一個 Channel 在它的生命周期内隻注冊于一個EventLoop;
5、NIO中,一個 EventLoop 配置設定給多個 Channel(面對多個Channel,一個 EventLoop 按照事件觸發,順序執行); OIO中,一個 EventLoop 配置設定給一個 Channel。
tips:Netty 應用程式的一個一般準則:盡可能的重用 EventLoop,以減少線程建立所帶來的開銷。
- Bootstrap 和 ServerBootstrap
BootStarp 和 ServerBootstrap 被稱為引導類,指對應用程式進行配置,并使他運作起來的過程。Netty處理引導的方式是使你的應用程式和網絡層相隔離。
BootStrap 是用戶端的引導類,Bootstrap 在調用 bind()(連接配接UDP)和 connect()(連接配接TCP)方法時,會新建立一個 Channel,僅建立一個單獨的、沒有父 Channel 的 Channel 來實作所有的網絡交換。
ServerBootstrap 是服務端的引導類,ServerBootstarp 在調用 bind() 方法時會建立一個 ServerChannel 來接受來自用戶端的連接配接,并且該 ServerChannel 管理了多個子 Channel 用于同用戶端之間的通信。
三、執行個體
所有的Netty服務端/用戶端都至少需要兩個部分:
1、至少一個ChannelHandler —— 該元件實作了對資料的處理。
2、引導 —— 這是配置伺服器的啟動代碼。
服務端:
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws InterruptedException {
final EchoServerHandler serverHandler = new EchoServerHandler();
//1、建立EventLoopGroup以進行事件的處理,如接受新連接配接以及讀/寫資料
EventLoopGroup group = new NioEventLoopGroup();
try {
//2、建立ServerBootstrap,引導和綁定伺服器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group, group)
//3、指定所使用的NIO傳輸Channel
.channel(NioServerSocketChannel.class)
//4、使用指定的端口設定套接字位址
.localAddress(new InetSocketAddress(port))
//5、添加一個 EchoServerHandler 到子 Channel的 ChannelPipeline
//當一個新的連接配接被接受時,一個新的子Channel将會被建立,而 ChannelInitializer 将會把一個你的EchoServerHandler 的執行個體添加到該 Channel 的 ChannelPipeline 中
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(serverHandler);
}
});
//6、異步地綁定伺服器,調用sync()方法阻塞等待直到綁定完成
ChannelFuture channelFuture = bootstrap.bind().sync();
System.out.println(EchoServer.class.getName() + "started and listening for connections on" + channelFuture.channel().localAddress());
//7、擷取 Channel 的 CloseFuture,并且阻塞目前線程直到它完成
channelFuture.channel().closeFuture().sync();
} finally {
//8、關閉 EventLoopGroup 釋放所有的資源
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoServer(9999).start();
}
}
@ChannelHandler.Sharable //辨別一個Channel-Handler 可以被多個Channel安全的共享
public class EchoServerHandler extends ChannelHandlerAdapter {
/**
* 對于每個傳入的消息都要調用
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8));
//将接收到的消息寫給發送者,而不沖刷出站消息
//ChannelHandlerContext 發送消息。導緻消息向下一個ChannelHandler流動
//Channel 發送消息将會導緻消息從 ChannelPipeline的尾端開始流動
ctx.write(in);
}
/**
* 通知 ChannelHandlerAdapter 最後一次對channel-Read()的調用是目前批量讀取中的最後一條消息
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//暫存于ChannelOutboundBuffer中的消息,在下一次調用flush()或者writeAndFlush()方法時将會嘗試寫出到套接字
//将這份暫存消息沖刷到遠端節點,并且關閉該Channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
/**
* 在讀取操作期間,有異常抛出時會調用
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
EchoServerHandler.java
用戶端:
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
//建立Bootstrap
Bootstrap bootstrap = new Bootstrap();
//指定 EventLoopGroup 以處理用戶端事件;适應于NIO的實作
bootstrap.group(group)
//适用于NIO傳輸的Channel類型
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
//在建立Channel時,向ChannelPipeline中添加一個EchoClientHandler執行個體
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
//連接配接到遠端節點,阻塞等待直到連接配接完成
ChannelFuture channelFuture = bootstrap.connect().sync();
//阻塞,直到Channel 關閉
channelFuture.channel().closeFuture().sync();
} finally {
//關閉線程池并且釋放所有的資源
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoClient("127.0.0.1", 9999).start();
System.out.println("------------------------------------");
new EchoClient("127.0.0.1", 9999).start();
System.out.println("------------------------------------");
new EchoClient("127.0.0.1", 9999).start();
}
}
@ChannelHandler.Sharable //标記該類的執行個體可以被多個Channel共享
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/**
* 當從伺服器接收到一條消息時被調用
*
* @param ctx
* @param msg ByteBuf (Netty 的位元組容器) 作為一個面向流的協定,TCP 保證了位元組數組将會按照伺服器發送它們的順序接收
* @throws Exception
*/
@Override
protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("Client" + ctx.channel().remoteAddress() + "connected");
System.out.println(msg.toString(CharsetUtil.UTF_8));
}
/**
* 在到伺服器的連接配接已經建立之後将被調用
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock!", CharsetUtil.UTF_8));
}
/**
* 在處理過程中引發異常時被調用
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
EchoClientHandler.java
四、結語
帶着一陣迷糊就開始了Netty學習之旅,學到現在還是對Netty一堆專有名詞頭大!沒辦法,隻好硬着頭皮學下去了,畢竟,熟讀唐詩三百首,不會作詩也會吟嘛!
來總結下,一個Netty服務端處理用戶端連接配接的過程:
1、建立一個channel同該使用者端進行綁定;
2、channel從EventLoopGroup獲得一個EventLoop,并注冊到該EventLoop,channel生命周期内都和該EventLoop在一起(注冊時獲得selectionKey);
3、channel同使用者端進行網絡連接配接、關閉和讀寫,生成相對應的event(改變selectinKey資訊),觸發eventloop排程線程進行執行;
4、ChannelPipeline 找到對應 ChannelHandler 方法處理使用者邏輯。
我們項目中使用的 Netty 服務端啟動類:
public class NettyServer {
public static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
private static Integer LISTENER_PORT = PropertiesLoader.getResourcesLoader().getInteger("nettyPort");
private int port;
EventLoopGroup boss = null;
EventLoopGroup worker = null;
ServerBootstrap serverBootstrap = null;
public static NettyServer nettyServer = null;
public static NettyServer getInstance() {
if (nettyServer == null) {
synchronized (NettyServer.class) {
if (nettyServer == null) {
nettyServer = new NettyServer(LISTENER_PORT==null?9999:LISTENER_PORT);
}
}
}
return nettyServer;
}
/**
* 構造函數
*
* @param port 端口
*/
private NettyServer(int port) {
this.port = port;
}
/**
* 綁定
*
* @throws InterruptedException
*/
public void init() throws InterruptedException {
try {
//建立兩個線程池
//目前伺服器CPU為單核8線程,調整線程為8
boss = new NioEventLoopGroup(8);
worker = new NioEventLoopGroup(8);
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);//兩個工作線程
serverBootstrap.channel(NioServerSocketChannel.class);
//重用緩沖區
serverBootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//自動調整下一次緩沖區建立時配置設定的空間大小,避免記憶體的浪費
serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
//當伺服器請求處理線程全滿時,用于臨時存放已完成三次握手的請求的隊列的最大長度,預設值50。
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
//用于啟用或關于Nagle算法。如果要求高實時性,有資料發送時就馬上發送,就将該選項設定為true關閉Nagle算法;如果要減少發送次數減少網絡互動,就設定為false等累積一定大小後再發送。預設為false。
serverBootstrap.option(ChannelOption.TCP_NODELAY, true);
//是否啟用心跳保活機制
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
//支援tcp協定
//bootstrap.childHandler(new TcpChannelInitializer());
//支援webSocket協定
serverBootstrap.childHandler(new WebSocketChannelInitializer());
ChannelFuture f = serverBootstrap.bind(port).sync();
if (f.isSuccess()) {
logger.info("netty server start...");
}
//等到服務端監聽端口關閉
f.channel().closeFuture().sync();
} finally {
//優雅釋放線程資源
boss.shutdownGracefully().sync();
worker.shutdownGracefully().sync();
}
}
/**
* 銷毀netty相關資源
*/
public void destroy() {
try {
if (boss != null) {
boss.shutdownGracefully();
}
if (worker != null) {
worker.shutdownGracefully();
}
if (serverBootstrap != null) {
serverBootstrap = null;
}
} catch (Exception e) {
logger.error("netty close err:" + e.getMessage(), e);
}
}
}
NettyServer.java
tips: ServerBootstrap 中增加了一個方法childHandler(),它的目的是添加 ChannelHandler ;Bootstrap 中添加 ChannelHandler 用 handler() 方法。
參考資料:《Netty IN ACTION》
示範源代碼:https://github.com/JMCuixy/NettyDemo