文章目錄
-
- 一、 Netty 模型代碼解析
- 二、 Netty 案例伺服器端代碼
-
- 1 . 伺服器主程式
- 2 . 伺服器自定義 Handler 處理者
- 三、 Netty 案例用戶端代碼
-
- 1 . 用戶端主程式
- 2 . 用戶端自定義 Handler 處理者
- 四、 Netty 案例運作
一、 Netty 模型代碼解析
1 . 線程池 NioEventLoopGroup :
① NioEventLoopGroup 線程池使用場景 : Netty 模型中的 BossGroup 和 WorkerGroup 都是 NioEventLoopGroup 類型的線程池 ;
② NioEventLoopGroup 預設線程個數 : 系統預設每個線程池中的 NioEventLoop 線程數是 CPU 核數
×
\times
× 2 , 下面的代碼可以擷取運作 Netty 程式的裝置的 CPU 核數 ;
// 擷取裝置的 CPU 核數
NettyRuntime.availableProcessors()
③ 指定 NioEventLoopGroup 線程個數 : 如果不想使用 Netty 線程池的預設線程個數 , 可以在 NioEventLoopGroup 構造函數中子星設定線程數 ;
// BossGroup 線程池 : 負責用戶端的連接配接
// 指定線程個數 : 用戶端個數很少, 不用很多線程維護, 這裡指定線程池中線程個數為 1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
2 . NioEventLoopGroup 線程池線程配置設定 :
以用戶端連接配接完成後 , 資料讀寫場景舉例 ;
在 雙核 CPU 的伺服器上 , NioEventLoopGroup 預設有
4
4
4 個線程 ; 按照順序循環配置設定 , 為第
n
n
n 個用戶端配置設定第
n
%
4
n \% 4
n%4 個 NioEventLoop 線程 ;
-
-
用戶端
0 與伺服器進行資料互動在 NioEventLoop
0 線程中 ;
-
用戶端
1
1
1 與伺服器進行資料互動在 NioEventLoop
1
1
1 線程中 ;
-
用戶端
2
2
2 與伺服器進行資料互動在 NioEventLoop
2
2
2 線程中 ;
-
用戶端
3
3
3 與伺服器進行資料互動在 NioEventLoop
3
3
3 線程中 ;
-
用戶端
4
4
4 與伺服器進行資料互動在 NioEventLoop
0 線程中 ;
-
用戶端
5
5
5 與伺服器進行資料互動在 NioEventLoop
1
1
1 線程中 ;
-
用戶端
6
6
6 與伺服器進行資料互動在 NioEventLoop
2
2
2 線程中 ;
-
用戶端
7
7
7 與伺服器進行資料互動在 NioEventLoop
3
3
3 線程中 ;
3 . NioEventLoopGroup 線程池封裝内容 :
① NioEventLoopGroup 中的若幹個 NioEventLoop 線程都封裝在 children 中 , 線程個數是 CPU 核數 2 倍 ;
② 每個 NioEventLoop 線程中封裝了如下内容 :
-
- 選擇器 ( Selector ) , 用于監聽用戶端的讀寫 IO 事件 ;
- 任務隊列 ( taskQueue ) , 用于存儲事件對應的業務邏輯任務 ;
- 線程執行器 ( executor ) , 用于執行線程 ;
4 . ChannelHandlerContext 通道處理者上下文對象封裝内容 :
① 使用者自定義的 處理者 ( Handler ) , 這裡指的是 伺服器端的 ServerHandr ( 自定義 ) , 用戶端的 ClientHandler ( 自定義 ) ;
② 管道 ( ChannelPipeline ) : 其本質是雙向連結清單 , 該 ChannelHandlerContext 可以擷取該連結清單的前一個 ( prev ) , 後一個管道對象 ( next ) ;
③ 管道 與 通道 :
-
- 二者都可以通過 通道處理者上下文 ( ChannelHandlerContext ) 擷取 ;
- 管道 與 通道 都可以互相從對方擷取 ;
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline();
channel = pipeline.channel();
pipeline = channel.pipeline();
④ 管道 ( Pipeline ) 與 通道 ( Channel ) 關聯 : 通過管道可以擷取通道 , 通過通道也可以擷取其對應的管道 ;
5 . 處理者 ( Handler ) :
① 設定 Handler : 給 WorkerGroup 線程池中的 EventLoop 線程對應的管道設定 處理器 ( Handler ) ;
② 自定義 Handler : 一般這個 Handler 都是使用者自定義的類 , 繼承 ChannelInboundHandlerAdapter 類 ;
③ 運作機制 : 在 BossGroup 中連接配接用戶端成功後 , 将 NioSocketChannel 注冊給 WorkerGroup 中的 EventLoop 中的 選擇器 ( Selector ) , 如果監聽到用戶端資料 IO 事件 , 就會調用 管道 ( Pipeline ) 處理該事件 , 管道 ( Pipeline ) 中調用 處理器 ( Handler ) 處理相應的事件 , 該 處理器 ( Handler ) 可以是 Netty 提供的 , 也可以是開發者自定義的 ;
特别注意 : 自定義 Handler 中 , 重寫的 ChannelInboundHandlerAdapter 方法 , 将 super() 語句都删除 ;
二、 Netty 案例伺服器端代碼
1 . 伺服器主程式
package kim.hsl.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Netty 案例伺服器端
*/
public class Server {
public static void main(String[] args) {
// 1. 建立 BossGroup 線程池 和 WorkerGroup 線程池, 其中維護 NioEventLoop 線程
// NioEventLoop 線程中執行無限循環操作
// BossGroup 線程池 : 負責用戶端的連接配接
// 指定線程個數 : 用戶端個數很少, 不用很多線程維護, 這裡指定線程池中線程個數為 1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// WorkerGroup 線程池 : 負責用戶端連接配接的資料讀寫
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 2. 伺服器啟動對象, 需要為該對象配置各種參數
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) // 設定 主從 線程組 , 分别對應 主 Reactor 和 從 Reactor
.channel(NioServerSocketChannel.class) // 設定 NIO 網絡套接字通道類型
.option(ChannelOption.SO_BACKLOG, 128) // 設定線程隊列維護的連接配接個數
.childOption(ChannelOption.SO_KEEPALIVE, true) // 設定連接配接狀态行為, 保持連接配接狀态
.childHandler( // 為 WorkerGroup 線程池對應的 NioEventLoop 設定對應的事件 處理器 Handler
new ChannelInitializer<SocketChannel>() {// 建立通道初始化對象
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 為 管道 Pipeline 設定處理器 Hanedler
ch.pipeline().addLast(new ServerHandr());
}
}
);
System.out.println("伺服器準備完畢 ...");
ChannelFuture cf = null;
try {
// 綁定本地端口, 進行同步操作 , 并傳回 ChannelFuture
cf = bootstrap.bind(8888).sync();
System.out.println("伺服器開始監聽 8888 端口 ...");
// 關閉通道 , 開始監聽操作
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 出現異常後, 優雅的關閉
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2 . 伺服器自定義 Handler 處理者
package kim.hsl.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;
/**
* Handler 處理者, 是 NioEventLoop 線程中處理業務邏輯的類
*
* 繼承 : 該業務邏輯處理者 ( Handler ) 必須繼承 Netty 中的 ChannelInboundHandlerAdapter 類
* 才可以設定給 NioEventLoop 線程
*
* 規範 : 該 Handler 類中需要按照業務邏輯處理規範進行開發
*/
public class ServerHandr extends ChannelInboundHandlerAdapter {
/**
* 讀取資料 : 在伺服器端讀取用戶端發送的資料
* @param ctx
* 通道處理者上下文對象 : 封裝了 管道 ( Pipeline ) , 通道 ( Channel ), 用戶端位址資訊
* 管道 ( Pipeline ) : 注重業務邏輯處理 , 可以關聯很多 Handler
* 通道 ( Channel ) : 注重資料讀寫
* @param msg
* 用戶端上傳的資料
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 檢視 ChannelHandlerContext 中封裝的内容
System.out.println("channelRead : ChannelHandlerContext ctx = " + ctx);
// 将用戶端上傳的資料轉為 ByteBuffer
// 這裡注意該類是 Netty 中的 io.netty.buffer.ByteBuf 類
// 不是 NIO 中的 ByteBuffer
// io.netty.buffer.ByteBuf 性能高于 java.nio.ByteBuffer
ByteBuf byteBuf = (ByteBuf) msg;
// 将 ByteBuf 緩沖區資料轉為字元串, 列印出來
System.out.println(ctx.channel().remoteAddress() + " 接收到用戶端發送的資料 : " +
byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 伺服器端讀取資料完畢後回調的方法
* @param ctx
* 通道處理者上下文對象 : 封裝了 管道 ( Pipeline ) , 通道 ( Channel ), 用戶端位址資訊
* * 管道 ( Pipeline ) : 注重業務邏輯處理 , 可以關聯很多 Handler
* * 通道 ( Channel ) : 注重資料讀寫
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 資料編碼 : 将字元串編碼, 存儲到 io.netty.buffer.ByteBuf 緩沖區中
ByteBuf byteBuf = Unpooled.copiedBuffer("Hello Client", CharsetUtil.UTF_8);
// 寫出并重新整理操作 : 寫出資料到通道的緩沖區 ( write ), 并執行重新整理操作 ( flush )
ctx.writeAndFlush(byteBuf);
}
/**
* 異常處理 , 上面的方法中都抛出了 Exception 異常, 在該方法中進行異常處理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("通道異常, 關閉通道");
//如果出現異常, 就關閉該通道
ctx.close();
}
}
三、 Netty 案例用戶端代碼
1 . 用戶端主程式
package kim.hsl.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
public static void main(String[] args) {
// 用戶端隻需要一個 時間循環組 , 即 NioEventLoopGroup 線程池
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
// 用戶端啟動對象
Bootstrap bootstrap = new Bootstrap();
// 設定相關參數
bootstrap.group(eventLoopGroup) // 設定用戶端的線程池
.channel(NioSocketChannel.class) // 設定用戶端網絡套接字通道類型
.handler( // 設定用戶端的線程池對應的 NioEventLoop 設定對應的事件處理器 Handler
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ClientHandr());
}
}
);
try {
// 開始連接配接伺服器, 并進行同步操作
// ChannelFuture 類分析 , Netty 異步模型
// sync 作用是該方法不會再次阻塞
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
System.out.println("用戶端連接配接伺服器成功 ...");
// 關閉通道, 開始監聽
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 優雅的關閉
eventLoopGroup.shutdownGracefully();
}
}
}
2 . 用戶端自定義 Handler 處理者
package kim.hsl.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* Handler 處理者, 是 NioEventLoop 線程中處理業務邏輯的類
*
* 繼承 : 該業務邏輯處理者 ( Handler ) 必須繼承 Netty 中的 ChannelInboundHandlerAdapter 類
* 才可以設定給 NioEventLoop 線程
*
* 規範 : 該 Handler 類中需要按照業務邏輯處理規範進行開發
*/
public class ClientHandr extends ChannelInboundHandlerAdapter {
/**
* 通道就緒後觸發該方法
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 檢視 ChannelHandlerContext 中封裝的内容
System.out.println("channelActive : ChannelHandlerContext ctx = " + ctx);
// 資料編碼 : 将字元串編碼, 存儲到 io.netty.buffer.ByteBuf 緩沖區中
ByteBuf byteBuf = Unpooled.copiedBuffer("Hello Server", CharsetUtil.UTF_8);
// 寫出并重新整理操作 : 寫出資料到通道的緩沖區 ( write ), 并執行重新整理操作 ( flush )
ctx.writeAndFlush(byteBuf);
System.out.println("用戶端向伺服器端發送 Hello Server 成功");
}
/**
* 讀取資料 : 在伺服器端讀取用戶端發送的資料
* @param ctx
* 通道處理者上下文對象 : 封裝了 管道 ( Pipeline ) , 通道 ( Channel ), 用戶端位址資訊
* 管道 ( Pipeline ) : 注重業務邏輯處理 , 可以關聯很多 Handler
* 通道 ( Channel ) : 注重資料讀寫
* @param msg
* 伺服器傳回的資料
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 檢視 ChannelHandlerContext 中封裝的内容
System.out.println("channelRead : ChannelHandlerContext ctx = " + ctx);
// 将伺服器下發的資料轉為 ByteBuffer
// 這裡注意該類是 Netty 中的 io.netty.buffer.ByteBuf 類
// 不是 NIO 中的 ByteBuffer
// io.netty.buffer.ByteBuf 性能高于 java.nio.ByteBuffer
ByteBuf byteBuf = (ByteBuf) msg;
// 将 ByteBuf 緩沖區資料轉為字元串, 列印出來
System.out.println(ctx.channel().remoteAddress() + " 伺服器傳回的資料 : " +
byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 異常處理 , 上面的方法中都抛出了 Exception 異常, 在該方法中進行異常處理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("通道異常, 關閉通道");
//如果出現異常, 就關閉該通道
ctx.close();
}
}
四、 Netty 案例運作
1 . 運作伺服器端 : 伺服器啟動 , 監聽 8888 端口 ;
2 . 運作用戶端 : 用戶端連接配接伺服器的 8888 端口 , 并向伺服器端寫出 Hello Server 字元串 , 之後便接到伺服器端回送的 Hello Client 字元串資訊 ;
3 . 檢視用戶端 : 伺服器端接收到用戶端資訊 , 向用戶端寫出 Hello Client 字元串 ;