Netty :Netty介绍 & 实现简易多人聊天室
博主在去年二月份就介绍了
Java
原生网络编程
API
,并用它们实现了多种
I/O
网络编程模型的简易多人聊天室:
- Java网络编程-Socket编程初涉一(简易客户端-服务器)
- Java网络编程-Socket编程初涉二(基于BIO模型的简易多人聊天室)
- Java网络编程-Socket编程初涉三(伪异步I/O模型的简易多人聊天室)
- Java网络编程-Socket编程初涉四(NIO模型的简易多人聊天室)
- Java网络编程-Socket编程初涉五(AIO模型的简易客户端-服务器)
- Java网络编程-Socket编程初涉六(AIO模型的简易多人聊天室)
使用
Java
原生网络编程
API
还是比较繁琐的,如果需要更换
I/O
网络编程模型来适应不同的业务场景,重构工作量还是很大的,而
Netty
对
Java
原生网络编程
API
进行了封装和扩展,使得不同
I/O
网络编程模型之间的转换需要的代码改动非常少,并且性能非常高,
Netty
的实现原理留到以后再介绍,将
Netty
提供的组件使用熟练后,它的实现原理才能理解的更透彻,
Netty
的实现原理涉及到了
Linux
内核部分,比如零拷贝、
I/O
多路复用等,以及
DMA
等硬件。
Netty介绍
EventLoopGroup、EventLoop
Netty
的调度模块称为
EventLoopGroup
,
Netty
提供了
NioEventLoopGroup
、
OioEventLoopGroup
、
EpollEventLoopGroup
(在
Linux
下可用)等多种实现。
EventLoopGroup
是一组
EventLoop
的抽象,一个
EventLoopGroup
当中会包含一个或多个
EventLoop
,如下图所示(图来自
《Netty In Action》
):
Channel
EventLoop
在它的整个生命周期中只会与一个
Thread
(真正的
I/O
线程)绑定。所有由
EventLoop
处理的
I/O
事件都将在它所关联的
Thread
上进行处理。一个
Channel
在它的整个生命周期中只会注册在一个
EventLoop
上。也就是说一个
Channel
上绑定的所有方法只会由同一个线程执行。一个
EventLoop
在运行过程当中会被分配给一个或多个
Channel
。
Channel
可以有一个父
Channel
,这取决于它是如何创建的。 例如,被
ServerSocketChannel
接受的
SocketChannel
(当客户端与服务端建立连接时,在服务端创建与客户端通信的
Channel
)将在
parent()
上返回
ServerSocketChannel
作为其父
Channel
。
Channel
用于连接字节缓冲区和另一端的实体,这个实体可以是
Socket
,也可以是
File
,在
NIO
网络编程模型中,服务端和客户端进行
IO
通信的媒介就是
Channel
。
Netty
对
Java
原生的
ServerSocketChannel
进行了封装和增强,相对于原生的
Channel
,
Netty
的
Channel
增加了如下组件(但不限于这些组件):
-
:标识唯一身份信息。ChannelId
-
:处理或拦截ChannelPipeline
入站事件和出站操作的Channel
的列表。 ChannelHandler
实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式以及 ChannelPipeline
中的ChannelPipeline
如何互相交互。每个ChannelHandler
都有自己的Channel
,并在创建新ChannelPipeline
时自动创建。Channel
-
:用来处理EventLoop
事件的Channel I/O
。EventLoop
-
:ChannelConfig
的配置参数(例如接收缓冲区大小)。Channel
ChannelHandler
ChannelHandler
会处理
I/O
事件或拦截
I/O
操作,并将其转发到
ChannelPipeline
中的下一个
ChannelHandler
。
ChannelHandler
本身并没有提供很多方法,通常需要实现其子类型之一:
-
:处理入站ChannelInboundHandler
事件的抽象。I/O
-
:处理出站ChannelOutboundHandler
操作的抽象。I/O
为了方便,
Netty
提供了以下适配器类:
-
:处理入站ChannelInboundHandlerAdapter
事件的一种简单实现。I/O
-
:处理出站ChannelOutboundHandlerAdapter
操作的一种简单实现。I/O
Bootstrap、ServerBootStrap
Netty
的启动类分为客户端启动类和服务端启动类,分别是
BootStrap
和
ServerBootStrap
。它们都是
AbstractBootStrap
的子类,总的来说它们都是
Netty
中的辅助类,提供了链式配置方法,方便了
Channel
的引导和启动。
简易多人聊天室
博主接下来用
Netty
实现一个
NIO
网络编程模型的简易多人聊天室,来介绍
Netty
的基本使用。
首先需要导入
Netty
的依赖(博主使用
4.1.70.Final
版本):
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.70.Final</version>
</dependency>
服务端
package com.kaven.netty.nio;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.atomic.AtomicReference;
public class Server {
private static final int PORT = 8080;
public static void main(String[] args) throws InterruptedException {
final ServerHandler serverHandler = new ServerHandler();
// 主线程组,用于接受客户端的连接,但是不做任何处理,跟老板一样
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 从线程组,主线程组会把任务丢给它,让从线程组去做相应的处理
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup , workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(PORT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(serverHandler);
}
});
ChannelFuture channelFuture = serverBootstrap.bind().sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@ChannelHandler.Sharable
static class ServerHandler extends ChannelInboundHandlerAdapter {
private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
clientChannels.add(channel);
String sendMsg = "客户[" + channel.remoteAddress() + "]上线\n";
System.out.print(sendMsg);
clientChannels.forEach(clientChannel -> {
if(clientChannel != channel) {
clientChannel.writeAndFlush(sendMsg);
}
else {
clientChannel.writeAndFlush("欢迎您上线\n");
}
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
if(clientChannels.contains(channel)) {
clientChannels.remove(channel);
String sendMsg = "客户[" + channel.remoteAddress() + "]异常下线\n";
System.out.print(sendMsg);
clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
AtomicReference<String> sendMsg = new AtomicReference<>("客户[" + channel.remoteAddress() + "]消息: " + msg + "\n");
if(msg instanceof String && msg.equals("quit")) {
clientChannels.remove(channel);
channel.close();
sendMsg.set("客户[" + channel.remoteAddress() + "]下线\n");
System.out.print(sendMsg.get());
}
clientChannels.forEach(clientChannel -> {
if(clientChannel != channel) {
clientChannel.writeAndFlush(sendMsg.get());
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
clientChannels.remove(channel);
String msg = cause.getMessage();
String sendMsg = "客户[" + channel.remoteAddress() + "]异常: " + msg + "\n";
System.out.print(sendMsg);
clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));
}
}
}
当有客户端连接时,服务端会使用
ChannelInitializer
初始化与客户端通信的
Channel
,并且在
Channel
的
ChannelPipeline
中添加
ChannelHandler
,这里主要是添加
StringDecoder
(将接收到的
ByteBuf
解码为
String
,是一种
ChannelInboundHandlerAdapter
)、
StringEncoder
(将请求的
String
编码为
ByteBuf
,是一种
ChannelOutboundHandlerAdapter
)以及自实现的
ChannelInboundHandlerAdapter
(处理与客户端之间的通信)。
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(serverHandler);
}
});
@ChannelHandler.Sharable
注解表示可以将该
ChannelHandler
的同一实例多次添加到一个或多个
ChannelPipeline
中,而不会出现竞争条件。如果未指定此注解,则每次将
ChannelHandler
实例添加到
ChannelPipeline
中时都必须创建一个新的实例,因为它具有成员变量等非共享状态。
ChannelGroup
是一个线程安全的
Set
,包含开放的
Channel
并提供对它们的各种批量操作。 使用
ChannelGroup
,可以将
Channel
分类为一个有意义的组(例如,基于每个服务或每个状态来分组),以便实现广播的功能,关闭的
Channel
会自动从集合中删除(博主是手动删除的,只是为了演示怎么删除
Channel
),因此无需担心
ChannelGroup
中
Channel
的生命周期。 一个
Channel
可以属于多个
ChannelGroup
。
private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
客户端
package com.kaven.netty.nio;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
import java.util.Scanner;
public class Client {
private static final int PORT = 8080;
public static void main(String[] args) throws InterruptedException {
final ClientHandler clientHandler = new ClientHandler();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(PORT))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(clientHandler);
}
});
ChannelFuture channelFuture = bootstrap.connect().sync();
Channel channel = channelFuture.channel();
//客户端发送消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.nextLine();
//通过客户端把输入内容发送到服务端
channel.writeAndFlush(msg).sync();
if(msg.equals("quit")) {
channel.close().sync();
break;
}
}
channelFuture.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
@ChannelHandler.Sharable
static class ClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.print(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
String msg = cause.getMessage();
System.out.print("群聊[" + channel.remoteAddress() + "]异常: " + msg);
}
}
}
SimpleChannelInboundHandler
抽象类继承了
ChannelInboundHandlerAdapter
类,并且默认会自动释放所有处理的消息,在这种情况下,如果需要将消息传递给
ChannelPipeline
中的下一个
ChannelHandler
,需要使用
ReferenceCountUtil.retain(Object)
(
Netty
资源管理部分,博主以后也会详细介绍)。
测试
更换I/O网络编程模型
在
Netty
中更换
I/O
网络编程模型非常方便,只需要修改服务端和客户端相应的地方即可,如下图所示:
OioEventLoopGroup OioServerSocketChannel OioSocketChannel
NioEventLoopGroup NioServerSocketChannel NioSocketChannel
EpollEventLoopGroup EpollServerSocketChannel EpollSocketChannel
KQueueEventLoopGroup KQueueServerSocketChannel KQueueSocketChannel
Oio
(
Old-Blocking-IO
)就是
BIO
网络编程模型。
Epoll
:
KQueue
: