概述
流經網絡的資料總是具有相同的類型:位元組,這些位元組如何傳輸主要取決于我們所說的網絡傳輸。使用者并不關心傳輸的細節,隻在乎位元組是否被可靠地發送和接收
如果使用 Java 網絡程式設計,你會發現,某些時候當你需要支援高并發連接配接,随後你嘗試将阻塞傳輸切換為非阻塞傳輸,那麼你會因為這兩種 API 的截然不同而遇到問題。Netty 提供了一個通用的 API,這使得轉換更加簡單。
傳統的傳輸方式
這裡介紹僅使用 JDK API 來實作應用程式的阻塞(OIO)和非阻塞版本(NIO)
阻塞網絡程式設計如下:
public class PlainOioServer {
public void server(int port) throws IOException {
// 将伺服器綁定到指定端口
final ServerSocket socket = new ServerSocket(port);
try {
while (true) {
// 接收連接配接
final Socket clientSocket = socket.accept();
System.out.println("Accepted connection from " + clientSocket);
// 建立一個新的線程來處理連接配接
new Thread(() -> {
OutputStream out;
try {
out = clientSocket.getOutputStream();
// 将消息寫給已連接配接的用戶端
out.write("Hi\r\n".getBytes(StandardCharsets.UTF_8));
out.flush();
// 關閉連接配接x
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
這段代碼可以進行中等數量的并發用戶端,但随着并發連接配接的增多,你決定改用異步網絡程式設計,但異步的 API 是完全不同的
非阻塞版本如下:
public class PlainNioServer {
public void server(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket ssocket = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
// 将伺服器綁定到標明的端口
ssocket.bind(address);
// 打開 Selector 來處理 Channel
Selector selector = Selector.open();
// 将 ServerSocket 注冊到 Selector 以接受連接配接
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
final ByteBuffer msg = ByteBuffer.wrap("Hi\r\n".getBytes());
while (true) {
try {
// 等待需要處理的新事件,阻塞将一直持續到下一個傳入事件
selector.select();
} catch (IOException e) {
e.printStackTrace();
break;
}
Set<SelectionKey> readKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
// 檢查事件是否是一個新的已經就緒可以被接受的連接配接
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
// 接受用戶端,并将它注冊到選擇器
client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
System.out.println("Accepted connection from " + client);
}
// 檢查套接字是否已經準備好寫資料
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
while (buffer.hasRemaining()) {
// 将資料寫到已連接配接的用戶端
if (client.write(buffer) == 0) {
break;
}
}
client.close();
}
} catch (IOException exception) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
cex.printStackTrace();
}
}
}
}
}
}
可以看到,阻塞和非阻塞的代碼是截然不同的。如果為了實作非阻塞而完全重寫程式,無疑十分困難
基于 Netty 的傳輸
使用 Netty 的阻塞網絡處理如下:
public class NettyOioServer {
public void server(int port) throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("Hi\n\r", StandardCharsets.UTF_8));
EventLoopGroup group = new OioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
// 使用阻塞模式
.channel(OioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new SimpleChannelInboundHandler<>() {
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.writeAndFlush(buf.duplicate())
.addListener(ChannelFutureListener.CLOSE);
}
});
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
而非阻塞版本和阻塞版本幾乎一模一樣,隻需要改動兩處地方
EventLoopGroup group = new NioEventLoopGroup();
b.group(group).channel(NioServerSocketChannel.class);
傳輸 API
傳輸 API 的核心是 interface Channel,它被用于所有的 IO 操作。每個 Channel 都将被配置設定一個 ChannelPipeline 和 ChannelConfig,ChannelConfig 包含了該 Channel 的所有配置設定,ChannelPipeline 持有所有将應用于入站和出站資料以及事件的 ChannelHandler 執行個體
除了通路所配置設定的 ChannelPipeline 和 ChannelConfig 之外,也可以利用 Channel 的其他方法
方法名 | 描述 |
---|---|
eventLoop | 傳回配置設定給 Channel 的 EventLoop |
pipeline | 傳回配置設定給 Channel 的 ChannelPipeline |
isActive | 如果 Channel 活動的,傳回 true |
localAddress | 傳回本地的 SocketAddress |
remoteAddress | 傳回遠端的 SocketAddress |
write | 将資料寫到遠端節點 |
flush | 将之前已寫的資料沖刷到底層傳輸 |
writeAndFlush | 等同于調用 write() 并接着調用 flush() |
内置的傳輸
Netty 内置了一些可開箱即用的傳輸,但它們所支援的協定不盡相同,是以你必須選擇一個和你的應用程式所使用協定相容的傳輸
名稱 | 包 | 描述 |
---|---|---|
NIO | io.netty.channel.socket.nio | 使用 java.nio.channels 包作為基礎 |
Epoll | io.netty.channel.epoll | 由 JNI 驅動的 epoll() 和非阻塞 IO,可支援隻有在 Linux 上可用的多種特性,比 NIO 傳輸更快,且完全非阻塞 |
OIO | io.netty.channel.socket.oio | 使用 java.net 包作為基礎 |
Local | io.netty.channel.local | 可以在 VM 内部通過管道進行通信的本地傳輸 |
Embedded | io.netty.channel.embedded | Embedded 傳輸,允許使用 ChannelHandler 而不需要一個真正的基于網絡的傳輸,主要用于測試 |