天天看点

Netty :Netty介绍 & 实现简易多人聊天室

Netty :Netty介绍 & 实现简易多人聊天室

博主在去年二月份就介绍了​

​Java​

​​原生网络编程​

​API​

​​,并用它们实现了多种​

​I/O​

​网络编程模型的简易多人聊天室:

  • ​​Java网络编程-Socket编程初涉一(简易客户端-服务器)​​
  • ​​Java网络编程-Socket编程初涉二(基于BIO模型的简易多人聊天室)​​
  • ​​Java网络编程-Socket编程初涉三(伪异步I/O模型的简易多人聊天室)​​
  • ​​Java网络编程-Socket编程初涉四(NIO模型的简易多人聊天室)​​
  • ​​Java网络编程-Socket编程初涉五(AIO模型的简易客户端-服务器)​​
  • ​​Java网络编程-Socket编程初涉六(AIO模型的简易多人聊天室)​​

使用​

​Java​

​​原生网络编程​

​API​

​​还是比较繁琐的,如果需要更换​

​I/O​

​​网络编程模型来适应不同的业务场景,重构工作量还是很大的,而​

​Netty​

​​对​

​Java​

​​原生网络编程​

​API​

​​进行了封装和扩展,使得不同​

​I/O​

​​网络编程模型之间的转换需要的代码改动非常少,并且性能非常高,​

​Netty​

​​的实现原理留到以后再介绍,将​

​Netty​

​​提供的组件使用熟练后,它的实现原理才能理解的更透彻,​

​Netty​

​​的实现原理涉及到了​

​Linux​

​​内核部分,比如零拷贝、​

​I/O​

​​多路复用等,以及​

​DMA​

​等硬件。

Netty介绍

EventLoopGroup、EventLoop

​Netty​

​​的调度模块称为​

​EventLoopGroup​

​​,​

​Netty​

​​提供了​

​NioEventLoopGroup​

​​、​

​OioEventLoopGroup​

​​、​

​EpollEventLoopGroup​

​​(在​

​Linux​

​下可用)等多种实现。

Netty :Netty介绍 & 实现简易多人聊天室

​EventLoopGroup​

​​是一组​

​EventLoop​

​​的抽象,一个​

​EventLoopGroup​

​​当中会包含一个或多个​

​EventLoop​

​​,如下图所示(图来自​

​《Netty In Action》​

​):

Netty :Netty介绍 & 实现简易多人聊天室
Netty :Netty介绍 & 实现简易多人聊天室

Channel

​EventLoop​

​​在它的整个生命周期中只会与一个​

​Thread​

​​(真正的​

​I/O​

​​线程)绑定。所有由​

​EventLoop​

​​处理的​

​I/O​

​​事件都将在它所关联的​

​Thread​

​​上进行处理。一个​

​Channel​

​​在它的整个生命周期中只会注册在一个​

​EventLoop​

​​上。也就是说一个​

​Channel​

​​上绑定的所有方法只会由同一个线程执行。一个​

​EventLoop​

​​在运行过程当中会被分配给一个或多个​

​Channel​

​。

​Channel​

​​可以有一个父​

​Channel​

​​,这取决于它是如何创建的。 例如,被​

​ServerSocketChannel​

​​接受的​

​SocketChannel​

​​(当客户端与服务端建立连接时,在服务端创建与客户端通信的​

​Channel​

​​)将在​

​parent()​

​​上返回​

​ServerSocketChannel​

​​作为其父​

​Channel​

​​。​

​Channel​

​​用于连接字节缓冲区和另一端的实体,这个实体可以是​

​Socket​

​​,也可以是​

​File​

​​,在​

​NIO​

​​网络编程模型中,服务端和客户端进行​

​IO​

​​通信的媒介就是​

​Channel​

​​。​

​Netty​

​​对​

​Java​

​​原生的​

​ServerSocketChannel​

​​进行了封装和增强,相对于原生的​

​Channel​

​​, ​

​Netty​

​​的​

​Channel​

​增加了如下组件(但不限于这些组件):

  • ​ChannelId​

    ​:标识唯一身份信息。
  • ​ChannelPipeline​

    ​​:处理或拦截​

    ​Channel​

    ​​入站事件和出站操作的​

    ​ChannelHandler​

    ​​的列表。 ​

    ​ChannelPipeline​

    ​​实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式以及 ​

    ​ChannelPipeline​

    ​​中的​

    ​ChannelHandler​

    ​​如何互相交互。每个​

    ​Channel​

    ​​都有自己的​

    ​ChannelPipeline​

    ​​,并在创建新​

    ​Channel​

    ​时自动创建。
  • ​EventLoop​

    ​​:用来处理​

    ​Channel I/O​

    ​​事件的​

    ​EventLoop​

    ​。
  • ​ChannelConfig​

    ​​:​

    ​Channel​

    ​的配置参数(例如接收缓冲区大小)。

ChannelHandler

​ChannelHandler​

​​会处理​

​I/O​

​​事件或拦截​

​I/O​

​​操作,并将其转发到​

​ChannelPipeline​

​​中的下一个​

​ChannelHandler​

​​。​

​ChannelHandler​

​本身并没有提供很多方法,通常需要实现其子类型之一:

  • ​ChannelInboundHandler​

    ​​:处理入站​

    ​I/O​

    ​事件的抽象。
  • ​ChannelOutboundHandler​

    ​​:处理出站​

    ​I/O​

    ​操作的抽象。

为了方便,​

​Netty​

​提供了以下适配器类:

  • ​ChannelInboundHandlerAdapter​

    ​​:处理入站​

    ​I/O​

    ​事件的一种简单实现。
  • ​ChannelOutboundHandlerAdapter​

    ​​:处理出站​

    ​I/O​

    ​操作的一种简单实现。

Bootstrap、ServerBootStrap

​Netty​

​​的启动类分为客户端启动类和服务端启动类,分别是​

​BootStrap​

​​和​

​ServerBootStrap​

​​。它们都是​

​AbstractBootStrap​

​​的子类,总的来说它们都是​

​Netty​

​​中的辅助类,提供了链式配置方法,方便了​

​Channel​

​的引导和启动。

简易多人聊天室

博主接下来用​

​Netty​

​​实现一个​

​NIO​

​​网络编程模型的简易多人聊天室,来介绍​

​Netty​

​的基本使用。

首先需要导入​

​Netty​

​​的依赖(博主使用​

​4.1.70.Final​

​版本):

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.70.Final</version>
        </dependency>      

服务端

package com.kaven.netty.nio;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.concurrent.atomic.AtomicReference;

public class Server {

    private static final int PORT = 8080;

    public static void main(String[] args) throws InterruptedException {
        final ServerHandler serverHandler = new ServerHandler();
        // 主线程组,用于接受客户端的连接,但是不做任何处理,跟老板一样
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 从线程组,主线程组会把任务丢给它,让从线程组去做相应的处理
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup , workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(PORT)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(serverHandler);
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind().sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    @ChannelHandler.Sharable
    static class ServerHandler extends ChannelInboundHandlerAdapter {

        private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            clientChannels.add(channel);

            String sendMsg = "客户[" + channel.remoteAddress() + "]上线\n";
            System.out.print(sendMsg);
            clientChannels.forEach(clientChannel -> {
                if(clientChannel != channel) {
                    clientChannel.writeAndFlush(sendMsg);
                }
                else {
                    clientChannel.writeAndFlush("欢迎您上线\n");
                }
            });
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            if(clientChannels.contains(channel)) {
                clientChannels.remove(channel);

                String sendMsg = "客户[" + channel.remoteAddress() + "]异常下线\n";
                System.out.print(sendMsg);
                clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));
            }
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Channel channel = ctx.channel();

            AtomicReference<String> sendMsg = new AtomicReference<>("客户[" + channel.remoteAddress() + "]消息: " + msg + "\n");
            if(msg instanceof String && msg.equals("quit")) {
                clientChannels.remove(channel);
                channel.close();
                sendMsg.set("客户[" + channel.remoteAddress() + "]下线\n");
                System.out.print(sendMsg.get());
            }
            clientChannels.forEach(clientChannel -> {
                if(clientChannel != channel) {
                    clientChannel.writeAndFlush(sendMsg.get());
                }
            });
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            Channel channel = ctx.channel();
            clientChannels.remove(channel);

            String msg = cause.getMessage();
            String sendMsg = "客户[" + channel.remoteAddress() + "]异常: " + msg + "\n";
            System.out.print(sendMsg);
            clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));
        }
    }
}      

当有客户端连接时,服务端会使用​

​ChannelInitializer​

​​初始化与客户端通信的​

​Channel​

​​,并且在​

​Channel​

​​的​

​ChannelPipeline​

​​ 中添加​

​ChannelHandler​

​​,这里主要是添加​

​StringDecoder​

​​(将接收到的​

​ByteBuf​

​​解码为​

​String​

​​,是一种​

​ChannelInboundHandlerAdapter​

​​)、​

​StringEncoder​

​​(将请求的​

​String​

​​编码为​

​ByteBuf​

​​,是一种​

​ChannelOutboundHandlerAdapter​

​​)以及自实现的​

​ChannelInboundHandlerAdapter​

​(处理与客户端之间的通信)。

.childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(serverHandler);
                        }
                    });      

​@ChannelHandler.Sharable​

​​注解表示可以将该​

​ChannelHandler​

​​的同一实例多次添加到一个或多个​

​ChannelPipeline​

​​中,而不会出现竞争条件。如果未指定此注解,则每次将​

​ChannelHandler​

​​实例添加到​

​ChannelPipeline​

​中时都必须创建一个新的实例,因为它具有成员变量等非共享状态。

​ChannelGroup​

​​是一个线程安全的​

​Set​

​​ ,包含开放的​

​Channel​

​​并提供对它们的各种批量操作。 使用​

​ChannelGroup​

​​ ,可以将​

​Channel​

​​分类为一个有意义的组(例如,基于每个服务或每个状态来分组),以便实现广播的功能,关闭的​

​Channel​

​​会自动从集合中删除(博主是手动删除的,只是为了演示怎么删除​

​Channel​

​​),因此无需担心​

​ChannelGroup​

​​中​

​Channel​

​​的生命周期。 一个​

​Channel​

​​可以属于多个​

​ChannelGroup​

​ 。

Netty :Netty介绍 &amp; 实现简易多人聊天室
private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);      

客户端

package com.kaven.netty.nio;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;
import java.util.Scanner;

public class Client {

    private static final int PORT = 8080;

    public static void main(String[] args) throws InterruptedException {

        final ClientHandler clientHandler = new ClientHandler();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(PORT))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(clientHandler);
                        }
                    });

            ChannelFuture channelFuture = bootstrap.connect().sync();
            Channel channel = channelFuture.channel();
            //客户端发送消息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String msg = scanner.nextLine();
                //通过客户端把输入内容发送到服务端
                channel.writeAndFlush(msg).sync();
                if(msg.equals("quit")) {
                    channel.close().sync();
                    break;
                }
            }
            channelFuture.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    @ChannelHandler.Sharable
    static class ClientHandler extends SimpleChannelInboundHandler<String> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.print(msg);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            Channel channel = ctx.channel();
            String msg = cause.getMessage();
            System.out.print("群聊[" + channel.remoteAddress() + "]异常: " + msg);
        }
    }
}      

​SimpleChannelInboundHandler​

​​抽象类继承了​

​ChannelInboundHandlerAdapter​

​​类,并且默认会自动释放所有处理的消息,在这种情况下,如果需要将消息传递给​

​ChannelPipeline​

​​中的下一个​

​ChannelHandler​

​​,需要使用​

​ReferenceCountUtil.retain(Object)​

​​ (​

​Netty​

​资源管理部分,博主以后也会详细介绍)。

Netty :Netty介绍 &amp; 实现简易多人聊天室

测试

Netty :Netty介绍 &amp; 实现简易多人聊天室
Netty :Netty介绍 &amp; 实现简易多人聊天室
Netty :Netty介绍 &amp; 实现简易多人聊天室
Netty :Netty介绍 &amp; 实现简易多人聊天室

更换I/O网络编程模型

在​

​Netty​

​​中更换​

​I/O​

​网络编程模型非常方便,只需要修改服务端和客户端相应的地方即可,如下图所示:

Netty :Netty介绍 &amp; 实现简易多人聊天室
Netty :Netty介绍 &amp; 实现简易多人聊天室
OioEventLoopGroup        OioServerSocketChannel       OioSocketChannel
NioEventLoopGroup        NioServerSocketChannel       NioSocketChannel
EpollEventLoopGroup      EpollServerSocketChannel     EpollSocketChannel
KQueueEventLoopGroup     KQueueServerSocketChannel    KQueueSocketChannel      

​Oio​

​​(​

​Old-Blocking-IO​

​​)就是​

​BIO​

​网络编程模型。

​Epoll​

​:

Netty :Netty介绍 &amp; 实现简易多人聊天室

​KQueue​

​: