3.2 Netty 服務端開發
- TimeServer.java
public class TimeServer {
public void bind(int port) throws Exception {
// 配置服務端的NIO線程組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHandler());
// 綁定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服務端監聽端口關閉
f.channel().closeFuture().sync();
} finally {
// 優雅退出,釋放線程池資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler
extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeServeHandler());
}
}
public static void main(String[] args) throws Exception{
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用預設值
}
}
new TimeServer().bind(port);
}
}
我 們 從 bind 方 法 開 始 學 習 , 在 代 碼 第 5~6 行 創 建 了 兩 個 NioEventLoopGroup 實 例 。NioEventLoopGroup 是 個 線 程 組 , 它 包 含 了 一 組 NIO 線 程 , 專 門 用 于 網 絡 事 件 的 處 理 , 實際 上 它 們 就 是 Reactor 線 程 組 。 這 裡 創 建 兩 個 的 原 因 是 一 個 用 于 服 務 端 接 受 客 戶 端 的 連 接 ,另 一 個 用 于 進 行 SocketChanneI 的 網 絡 讀 寫 。 第 8行 創 建 ServerBootstrap 對 象 , 它 是 Netty用 于 啟 動 NIO 服 務 端 的 輔 助 啟 動 類 , 目 的 是 降 低 服 務 端 的 開 發 複 雜 度 。 第 9行 調 用ServerBootstrap 的 group 方 法 , 将 兩 個 NIO 線 程 組 當 作 入 參 傳 遞 到 ServerBootstrap 中 。 接着 設 置 創 建 的 Channel 為 NioServerSocketChannel, 它 的 功 能 對 應 于 JDK NIO 類 庫中的ServerSocketChannel 類 。 然 後 配 置 NioServerSocketCh annel 的 TC P 參 數 , 此 處 将 它 的 backlog
設 置 為 1024 , 最 後 綁 定 I/O 事 件 的 處 理 類 ChildChannelHandler , 它 的 作 用 類 似 于 Reactor模 式 中 的 Handler 類 , 主 要 用 于 處 理 網 絡 1 / 0 事 件 , 例 如 記 錄 日 志 、 對 消 息 進 行 編 解 碼 等 。
服 務 端 啟 動 輔 助 類 配 置 完 成 之 後 , 調 用 它 的 bind 方 法 綁 定 監 聽 端 口 , 随 後 , 調 用 它的 同 步 阻 塞 方 法 sync 等 待 綁 定 操 作 完 成 。 完 成 之 後 Netty 會 返 回 一 個 ChannelFuture , 它的 功 能 類 似 于 JDK 的
java.util.concurrent.Future
, 主 要 用 于 異 步 操 作 的 通 知 回 調 。
第 16行 使 用
f.channel().closeFuture().sync()
方 法 進 行 阻 塞 , 等 待 服 務 端 鲢 路 關 閉 之 後main 函 數 才 退 出 。
第 19~20 行 調 用 NIO 線 程 組 的 shutdownGracefully 進 行 優 雅 退 出 , 它 會 釋放跟shutdownGracefuIIy 相 關 聯 的 資 源 。
- TimeServeHandler.java
public class TimeServeHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
System.out.println("The time server receive order : " + body);
String currentTime =
"QUERY TIME ORDER".equalsIgnoreCase(body)
? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
第 5 行 做 類 型 轉 換 , 将 msg 轉 換 成 Netty 的 ByteBuf 對 象 。 ByteBuf 類 似 于 JDK 中 的java.nio.ByteBuffer 對 象 , 不 過 它 提 供 了 更 加 強 大 和 靈 活 的 功 能 。 通 過 ByteBuf 的readabl e Bytes 方 法 可 以 獲 取 緩 沖 區 可 讀 的 字 節 數 , 根 據 可 讀 的 字 節 數 創 建 byte 數 組 , 通 過ByteBuf 的 read Bytes 方 法 将 緩 沖 區 中 的 字 節 數 組 複 制 到 新 建 的 byte 數 組 中 , 最 後 通 過 new
string 構 造 函 數 獲 取 請 求 消 息 。 這 時 對 請 求 消 息 進 行 判 斷 , 如 果 是 "QUERY TIME ORDER"則 創 建 應 答 消 息 , 通 過 ChanneIHandIerContext 的 write 方 法 異 步 發 送 應 答 消 息 給 客 戶 端 。
第 20 行 我 們 發 現 還 調 用 了
ChannelHandlerContext
的 flush 方 法 , 它 的 作 用 是 将 消 息發 送 隊 列 中 的 消 息 寫 入 到 SocketChannel 中 發 送 給 對 方 。 從 性 能 角 度 考 慮 , 為 了 防 止 頻 繁地 喚 醒 SeIector 進 行 消 息 發 送 , Netty 的 write 方 法 并 不 直 接 将 消 息 寫 入 SocketChanneI 中 ,調 用 write 方 法 隻 是 把 待 發 送 的 消 息 放 到 發 送 緩 沖 數 組 中 , 再 通 過 調 用 flush 方 法 , 将 發 送緩 沖 區 中 的 消 息 全 部 寫 到 SocketChannel 中 。
第 25 行 , 當 發 生 異 常 時 , 關 閉 ChanneIHandIerContext , 釋 放 和 ChanneIHandIerContext相 關 聯 的 句 柄 等 資 源 。通 過 對 代 碼 進 行 統 計 分 析 可 以 看 出 , 不 到 30 行 的 業 務 邏 輯 代 碼 , 即 完 成 了 NIO 服 務端 的 丌 發 , 相 比 于 傳 統 基 于 JDK NIO 原 生 類 庫 的 服 務 端 , 代 碼 量 大 大 減 少 , 開發難 度 也降 低 了 很 多 。
3.3 Netty 用戶端開發
- TimeClient.java
public class TimeClient {
public void connect(int port, String host) throws Exception {
// 配置用戶端NIO 線程組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
// 發起異步連接配接操作
ChannelFuture f = b.connect(host, port).sync();
// 當代用戶端鍊路關閉
f.channel().closeFuture().sync();
} finally {
// 優雅退出,釋放NIO線程組
group.shutdownGracefully();
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用預設值
}
}
new TimeClient().connect(port, "127.0.0.1");
}
}
我 們 從 connect 方 法 講 起 , 在 第 5行 首 先 創 建 客 戶 端 處 理 I/O 讀 寫 的 NioEventLoopGroup 線 程 組 , 然 後 繼 續 創 建 客 戶 端 輔 助 啟 動 類 Bootstrap , 随 後 需 要 對 其 進 行 配 置 。 與 服務 端 不 同 的 是 , 它 的 ChanneI 需 要 設 置 為 N ioSocketChanneI , 然 後 為 其 添 加 HandIer. 此 處為 了 簡 單 直 接 創 建 匿 名 内 部 類 , 實 現 initChannel 方 法 , 其 作 用 是 當 創 建 NioSocketChanneI
成 功 之 後 , 在 進 行 初 始 化 時 , 将 它 的 ChannelHandler 設 置 到 ChannelPipeline 中 , 用 于 處 理網 絡 I/O 事 件 。
客 戶 端 啟 動 輔 助 類 設 置 完 成 之 後 , 調 用 connect 方 法 發 起 異 步 連 接 , 然 後 調 用 同 步 方法 等 待 連 接 成 功 。
最 後 , 當 客 戶 端 連 接 關 閉 之 後 , 客 戶 端 主 函 數 退 出 , 退 出 之 前 釋 放 NIO 線 程 組 的 資 源 。
- TimeClientHandler.java
public class TimeClientHandler extends ChannelHandlerAdapter {
private static final Logger logger =
Logger.getLogger(TimeClientHandler.class.getName());
private final ByteBuf firstMessage;
// Creates a client-side handler.
public TimeClientHandler() {
byte[] req = "QUERY TIME ORDER".getBytes();
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
}
}
這 裡 重 點 關 注 三 個 方 法 : channelActive 、 channelRead 和 exceptionCaught0 當 客 戶 端和 服 務 端 TCP 鍊 路 建 立 成 功 之 後 , Netty 的 NIO 線 程 會 調 用 channelActive 方 法 , 發 送 查詢 時 間 的 指 令 給 服 務 端 , 調 用 ChannelHandlerContext 的 writeAndFlush 方 法 将 請 求 消 息 發送 給 服 務 端 。
當 服 務 端 返 回 應 答 消 息 時 , channelRead 方 法 被 調 用 , 第 39 ~ 43 行 從 Netty 的 ByteBuf中 讀 取 并 打 印 應 答 消 息 。
第 21~26 行 , 當 發 生 異 常 時 , 打 印 異 常 日 志 , 釋 放 客 戶 端 資 源 。