Netty :Netty介紹 & 實作簡易多人聊天室
部落客在去年二月份就介紹了
Java
原生網絡程式設計
API
,并用它們實作了多種
I/O
網絡程式設計模型的簡易多人聊天室:
- Java網絡程式設計-Socket程式設計初涉一(簡易用戶端-伺服器)
- Java網絡程式設計-Socket程式設計初涉二(基于BIO模型的簡易多人聊天室)
- Java網絡程式設計-Socket程式設計初涉三(僞異步I/O模型的簡易多人聊天室)
- Java網絡程式設計-Socket程式設計初涉四(NIO模型的簡易多人聊天室)
- Java網絡程式設計-Socket程式設計初涉五(AIO模型的簡易用戶端-伺服器)
- Java網絡程式設計-Socket程式設計初涉六(AIO模型的簡易多人聊天室)
使用
Java
原生網絡程式設計
API
還是比較繁瑣的,如果需要更換
I/O
網絡程式設計模型來适應不同的業務場景,重構工作量還是很大的,而
Netty
對
Java
原生網絡程式設計
API
進行了封裝和擴充,使得不同
I/O
網絡程式設計模型之間的轉換需要的代碼改動非常少,并且性能非常高,
Netty
的實作原理留到以後再介紹,将
Netty
提供的元件使用熟練後,它的實作原理才能了解的更透徹,
Netty
的實作原理涉及到了
Linux
核心部分,比如零拷貝、
I/O
多路複用等,以及
DMA
等硬體。
Netty介紹
EventLoopGroup、EventLoop
Netty
的排程子產品稱為
EventLoopGroup
,
Netty
提供了
NioEventLoopGroup
、
OioEventLoopGroup
、
EpollEventLoopGroup
(在
Linux
下可用)等多種實作。
EventLoopGroup
是一組
EventLoop
的抽象,一個
EventLoopGroup
當中會包含一個或多個
EventLoop
,如下圖所示(圖來自
《Netty In Action》
):
Channel
EventLoop
在它的整個生命周期中隻會與一個
Thread
(真正的
I/O
線程)綁定。所有由
EventLoop
處理的
I/O
事件都将在它所關聯的
Thread
上進行處理。一個
Channel
在它的整個生命周期中隻會注冊在一個
EventLoop
上。也就是說一個
Channel
上綁定的所有方法隻會由同一個線程執行。一個
EventLoop
在運作過程當中會被配置設定給一個或多個
Channel
。
Channel
可以有一個父
Channel
,這取決于它是如何建立的。 例如,被
ServerSocketChannel
接受的
SocketChannel
(當用戶端與服務端建立連接配接時,在服務端建立與用戶端通信的
Channel
)将在
parent()
上傳回
ServerSocketChannel
作為其父
Channel
。
Channel
用于連接配接位元組緩沖區和另一端的實體,這個實體可以是
Socket
,也可以是
File
,在
NIO
網絡程式設計模型中,服務端和用戶端進行
IO
通信的媒介就是
Channel
。
Netty
對
Java
原生的
ServerSocketChannel
進行了封裝和增強,相對于原生的
Channel
,
Netty
的
Channel
增加了如下元件(但不限于這些元件):
-
:辨別唯一身份資訊。ChannelId
-
:處理或攔截ChannelPipeline
入站事件和出站操作的Channel
的清單。 ChannelHandler
實作了一種進階形式的攔截過濾器模式,使使用者可以完全控制事件的處理方式以及 ChannelPipeline
中的ChannelPipeline
如何互互相動。每個ChannelHandler
都有自己的Channel
,并在建立新ChannelPipeline
時自動建立。Channel
-
:用來處理EventLoop
事件的Channel I/O
。EventLoop
-
:ChannelConfig
的配置參數(例如接收緩沖區大小)。Channel
ChannelHandler
ChannelHandler
會處理
I/O
事件或攔截
I/O
操作,并将其轉發到
ChannelPipeline
中的下一個
ChannelHandler
。
ChannelHandler
本身并沒有提供很多方法,通常需要實作其子類型之一:
-
:處理入站ChannelInboundHandler
事件的抽象。I/O
-
:處理出站ChannelOutboundHandler
操作的抽象。I/O
為了友善,
Netty
提供了以下擴充卡類:
-
:處理入站ChannelInboundHandlerAdapter
事件的一種簡單實作。I/O
-
:處理出站ChannelOutboundHandlerAdapter
操作的一種簡單實作。I/O
Bootstrap、ServerBootStrap
Netty
的啟動類分為用戶端啟動類和服務端啟動類,分别是
BootStrap
和
ServerBootStrap
。它們都是
AbstractBootStrap
的子類,總的來說它們都是
Netty
中的輔助類,提供了鍊式配置方法,友善了
Channel
的引導和啟動。
簡易多人聊天室
部落客接下來用
Netty
實作一個
NIO
網絡程式設計模型的簡易多人聊天室,來介紹
Netty
的基本使用。
首先需要導入
Netty
的依賴(部落客使用
4.1.70.Final
版本):
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.70.Final</version>
</dependency>
服務端
package com.kaven.netty.nio;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.atomic.AtomicReference;
public class Server {
private static final int PORT = 8080;
public static void main(String[] args) throws InterruptedException {
final ServerHandler serverHandler = new ServerHandler();
// 主線程組,用于接受用戶端的連接配接,但是不做任何處理,跟老闆一樣
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 從線程組,主線程組會把任務丢給它,讓從線程組去做相應的處理
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup , workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(PORT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(serverHandler);
}
});
ChannelFuture channelFuture = serverBootstrap.bind().sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@ChannelHandler.Sharable
static class ServerHandler extends ChannelInboundHandlerAdapter {
private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
clientChannels.add(channel);
String sendMsg = "客戶[" + channel.remoteAddress() + "]上線\n";
System.out.print(sendMsg);
clientChannels.forEach(clientChannel -> {
if(clientChannel != channel) {
clientChannel.writeAndFlush(sendMsg);
}
else {
clientChannel.writeAndFlush("歡迎您上線\n");
}
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
if(clientChannels.contains(channel)) {
clientChannels.remove(channel);
String sendMsg = "客戶[" + channel.remoteAddress() + "]異常下線\n";
System.out.print(sendMsg);
clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
AtomicReference<String> sendMsg = new AtomicReference<>("客戶[" + channel.remoteAddress() + "]消息: " + msg + "\n");
if(msg instanceof String && msg.equals("quit")) {
clientChannels.remove(channel);
channel.close();
sendMsg.set("客戶[" + channel.remoteAddress() + "]下線\n");
System.out.print(sendMsg.get());
}
clientChannels.forEach(clientChannel -> {
if(clientChannel != channel) {
clientChannel.writeAndFlush(sendMsg.get());
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
clientChannels.remove(channel);
String msg = cause.getMessage();
String sendMsg = "客戶[" + channel.remoteAddress() + "]異常: " + msg + "\n";
System.out.print(sendMsg);
clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));
}
}
}
當有用戶端連接配接時,服務端會使用
ChannelInitializer
初始化與用戶端通信的
Channel
,并且在
Channel
的
ChannelPipeline
中添加
ChannelHandler
,這裡主要是添加
StringDecoder
(将接收到的
ByteBuf
解碼為
String
,是一種
ChannelInboundHandlerAdapter
)、
StringEncoder
(将請求的
String
編碼為
ByteBuf
,是一種
ChannelOutboundHandlerAdapter
)以及自實作的
ChannelInboundHandlerAdapter
(處理與用戶端之間的通信)。
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(serverHandler);
}
});
@ChannelHandler.Sharable
注解表示可以将該
ChannelHandler
的同一執行個體多次添加到一個或多個
ChannelPipeline
中,而不會出現競争條件。如果未指定此注解,則每次将
ChannelHandler
執行個體添加到
ChannelPipeline
中時都必須建立一個新的執行個體,因為它具有成員變量等非共享狀态。
ChannelGroup
是一個線程安全的
Set
,包含開放的
Channel
并提供對它們的各種批量操作。 使用
ChannelGroup
,可以将
Channel
分類為一個有意義的組(例如,基于每個服務或每個狀态來分組),以便實作廣播的功能,關閉的
Channel
會自動從集合中删除(部落客是手動删除的,隻是為了示範怎麼删除
Channel
),是以無需擔心
ChannelGroup
中
Channel
的生命周期。 一個
Channel
可以屬于多個
ChannelGroup
。
private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
用戶端
package com.kaven.netty.nio;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
import java.util.Scanner;
public class Client {
private static final int PORT = 8080;
public static void main(String[] args) throws InterruptedException {
final ClientHandler clientHandler = new ClientHandler();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(PORT))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(clientHandler);
}
});
ChannelFuture channelFuture = bootstrap.connect().sync();
Channel channel = channelFuture.channel();
//用戶端發送消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.nextLine();
//通過用戶端把輸入内容發送到服務端
channel.writeAndFlush(msg).sync();
if(msg.equals("quit")) {
channel.close().sync();
break;
}
}
channelFuture.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
@ChannelHandler.Sharable
static class ClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.print(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
String msg = cause.getMessage();
System.out.print("群聊[" + channel.remoteAddress() + "]異常: " + msg);
}
}
}
SimpleChannelInboundHandler
抽象類繼承了
ChannelInboundHandlerAdapter
類,并且預設會自動釋放所有處理的消息,在這種情況下,如果需要将消息傳遞給
ChannelPipeline
中的下一個
ChannelHandler
,需要使用
ReferenceCountUtil.retain(Object)
(
Netty
資源管理部分,部落客以後也會詳細介紹)。
測試
更換I/O網絡程式設計模型
在
Netty
中更換
I/O
網絡程式設計模型非常友善,隻需要修改服務端和用戶端相應的地方即可,如下圖所示:
OioEventLoopGroup OioServerSocketChannel OioSocketChannel
NioEventLoopGroup NioServerSocketChannel NioSocketChannel
EpollEventLoopGroup EpollServerSocketChannel EpollSocketChannel
KQueueEventLoopGroup KQueueServerSocketChannel KQueueSocketChannel
Oio
(
Old-Blocking-IO
)就是
BIO
網絡程式設計模型。
Epoll
:
KQueue
: