天天看點

Netty :Netty介紹 & 實作簡易多人聊天室

Netty :Netty介紹 & 實作簡易多人聊天室

部落客在去年二月份就介紹了​

​Java​

​​原生網絡程式設計​

​API​

​​,并用它們實作了多種​

​I/O​

​網絡程式設計模型的簡易多人聊天室:

  • ​​Java網絡程式設計-Socket程式設計初涉一(簡易用戶端-伺服器)​​
  • ​​Java網絡程式設計-Socket程式設計初涉二(基于BIO模型的簡易多人聊天室)​​
  • ​​Java網絡程式設計-Socket程式設計初涉三(僞異步I/O模型的簡易多人聊天室)​​
  • ​​Java網絡程式設計-Socket程式設計初涉四(NIO模型的簡易多人聊天室)​​
  • ​​Java網絡程式設計-Socket程式設計初涉五(AIO模型的簡易用戶端-伺服器)​​
  • ​​Java網絡程式設計-Socket程式設計初涉六(AIO模型的簡易多人聊天室)​​

使用​

​Java​

​​原生網絡程式設計​

​API​

​​還是比較繁瑣的,如果需要更換​

​I/O​

​​網絡程式設計模型來适應不同的業務場景,重構工作量還是很大的,而​

​Netty​

​​對​

​Java​

​​原生網絡程式設計​

​API​

​​進行了封裝和擴充,使得不同​

​I/O​

​​網絡程式設計模型之間的轉換需要的代碼改動非常少,并且性能非常高,​

​Netty​

​​的實作原理留到以後再介紹,将​

​Netty​

​​提供的元件使用熟練後,它的實作原理才能了解的更透徹,​

​Netty​

​​的實作原理涉及到了​

​Linux​

​​核心部分,比如零拷貝、​

​I/O​

​​多路複用等,以及​

​DMA​

​等硬體。

Netty介紹

EventLoopGroup、EventLoop

​Netty​

​​的排程子產品稱為​

​EventLoopGroup​

​​,​

​Netty​

​​提供了​

​NioEventLoopGroup​

​​、​

​OioEventLoopGroup​

​​、​

​EpollEventLoopGroup​

​​(在​

​Linux​

​下可用)等多種實作。

Netty :Netty介紹 & 實作簡易多人聊天室

​EventLoopGroup​

​​是一組​

​EventLoop​

​​的抽象,一個​

​EventLoopGroup​

​​當中會包含一個或多個​

​EventLoop​

​​,如下圖所示(圖來自​

​《Netty In Action》​

​):

Netty :Netty介紹 & 實作簡易多人聊天室
Netty :Netty介紹 & 實作簡易多人聊天室

Channel

​EventLoop​

​​在它的整個生命周期中隻會與一個​

​Thread​

​​(真正的​

​I/O​

​​線程)綁定。所有由​

​EventLoop​

​​處理的​

​I/O​

​​事件都将在它所關聯的​

​Thread​

​​上進行處理。一個​

​Channel​

​​在它的整個生命周期中隻會注冊在一個​

​EventLoop​

​​上。也就是說一個​

​Channel​

​​上綁定的所有方法隻會由同一個線程執行。一個​

​EventLoop​

​​在運作過程當中會被配置設定給一個或多個​

​Channel​

​。

​Channel​

​​可以有一個父​

​Channel​

​​,這取決于它是如何建立的。 例如,被​

​ServerSocketChannel​

​​接受的​

​SocketChannel​

​​(當用戶端與服務端建立連接配接時,在服務端建立與用戶端通信的​

​Channel​

​​)将在​

​parent()​

​​上傳回​

​ServerSocketChannel​

​​作為其父​

​Channel​

​​。​

​Channel​

​​用于連接配接位元組緩沖區和另一端的實體,這個實體可以是​

​Socket​

​​,也可以是​

​File​

​​,在​

​NIO​

​​網絡程式設計模型中,服務端和用戶端進行​

​IO​

​​通信的媒介就是​

​Channel​

​​。​

​Netty​

​​對​

​Java​

​​原生的​

​ServerSocketChannel​

​​進行了封裝和增強,相對于原生的​

​Channel​

​​, ​

​Netty​

​​的​

​Channel​

​增加了如下元件(但不限于這些元件):

  • ​ChannelId​

    ​:辨別唯一身份資訊。
  • ​ChannelPipeline​

    ​​:處理或攔截​

    ​Channel​

    ​​入站事件和出站操作的​

    ​ChannelHandler​

    ​​的清單。 ​

    ​ChannelPipeline​

    ​​實作了一種進階形式的攔截過濾器模式,使使用者可以完全控制事件的處理方式以及 ​

    ​ChannelPipeline​

    ​​中的​

    ​ChannelHandler​

    ​​如何互互相動。每個​

    ​Channel​

    ​​都有自己的​

    ​ChannelPipeline​

    ​​,并在建立新​

    ​Channel​

    ​時自動建立。
  • ​EventLoop​

    ​​:用來處理​

    ​Channel I/O​

    ​​事件的​

    ​EventLoop​

    ​。
  • ​ChannelConfig​

    ​​:​

    ​Channel​

    ​的配置參數(例如接收緩沖區大小)。

ChannelHandler

​ChannelHandler​

​​會處理​

​I/O​

​​事件或攔截​

​I/O​

​​操作,并将其轉發到​

​ChannelPipeline​

​​中的下一個​

​ChannelHandler​

​​。​

​ChannelHandler​

​本身并沒有提供很多方法,通常需要實作其子類型之一:

  • ​ChannelInboundHandler​

    ​​:處理入站​

    ​I/O​

    ​事件的抽象。
  • ​ChannelOutboundHandler​

    ​​:處理出站​

    ​I/O​

    ​操作的抽象。

為了友善,​

​Netty​

​提供了以下擴充卡類:

  • ​ChannelInboundHandlerAdapter​

    ​​:處理入站​

    ​I/O​

    ​事件的一種簡單實作。
  • ​ChannelOutboundHandlerAdapter​

    ​​:處理出站​

    ​I/O​

    ​操作的一種簡單實作。

Bootstrap、ServerBootStrap

​Netty​

​​的啟動類分為用戶端啟動類和服務端啟動類,分别是​

​BootStrap​

​​和​

​ServerBootStrap​

​​。它們都是​

​AbstractBootStrap​

​​的子類,總的來說它們都是​

​Netty​

​​中的輔助類,提供了鍊式配置方法,友善了​

​Channel​

​的引導和啟動。

簡易多人聊天室

部落客接下來用​

​Netty​

​​實作一個​

​NIO​

​​網絡程式設計模型的簡易多人聊天室,來介紹​

​Netty​

​的基本使用。

首先需要導入​

​Netty​

​​的依賴(部落客使用​

​4.1.70.Final​

​版本):

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.70.Final</version>
        </dependency>      

服務端

package com.kaven.netty.nio;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.concurrent.atomic.AtomicReference;

public class Server {

    private static final int PORT = 8080;

    public static void main(String[] args) throws InterruptedException {
        final ServerHandler serverHandler = new ServerHandler();
        // 主線程組,用于接受用戶端的連接配接,但是不做任何處理,跟老闆一樣
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 從線程組,主線程組會把任務丢給它,讓從線程組去做相應的處理
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup , workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(PORT)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(serverHandler);
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind().sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    @ChannelHandler.Sharable
    static class ServerHandler extends ChannelInboundHandlerAdapter {

        private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            clientChannels.add(channel);

            String sendMsg = "客戶[" + channel.remoteAddress() + "]上線\n";
            System.out.print(sendMsg);
            clientChannels.forEach(clientChannel -> {
                if(clientChannel != channel) {
                    clientChannel.writeAndFlush(sendMsg);
                }
                else {
                    clientChannel.writeAndFlush("歡迎您上線\n");
                }
            });
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            if(clientChannels.contains(channel)) {
                clientChannels.remove(channel);

                String sendMsg = "客戶[" + channel.remoteAddress() + "]異常下線\n";
                System.out.print(sendMsg);
                clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));
            }
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Channel channel = ctx.channel();

            AtomicReference<String> sendMsg = new AtomicReference<>("客戶[" + channel.remoteAddress() + "]消息: " + msg + "\n");
            if(msg instanceof String && msg.equals("quit")) {
                clientChannels.remove(channel);
                channel.close();
                sendMsg.set("客戶[" + channel.remoteAddress() + "]下線\n");
                System.out.print(sendMsg.get());
            }
            clientChannels.forEach(clientChannel -> {
                if(clientChannel != channel) {
                    clientChannel.writeAndFlush(sendMsg.get());
                }
            });
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            Channel channel = ctx.channel();
            clientChannels.remove(channel);

            String msg = cause.getMessage();
            String sendMsg = "客戶[" + channel.remoteAddress() + "]異常: " + msg + "\n";
            System.out.print(sendMsg);
            clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));
        }
    }
}      

當有用戶端連接配接時,服務端會使用​

​ChannelInitializer​

​​初始化與用戶端通信的​

​Channel​

​​,并且在​

​Channel​

​​的​

​ChannelPipeline​

​​ 中添加​

​ChannelHandler​

​​,這裡主要是添加​

​StringDecoder​

​​(将接收到的​

​ByteBuf​

​​解碼為​

​String​

​​,是一種​

​ChannelInboundHandlerAdapter​

​​)、​

​StringEncoder​

​​(将請求的​

​String​

​​編碼為​

​ByteBuf​

​​,是一種​

​ChannelOutboundHandlerAdapter​

​​)以及自實作的​

​ChannelInboundHandlerAdapter​

​(處理與用戶端之間的通信)。

.childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(serverHandler);
                        }
                    });      

​@ChannelHandler.Sharable​

​​注解表示可以将該​

​ChannelHandler​

​​的同一執行個體多次添加到一個或多個​

​ChannelPipeline​

​​中,而不會出現競争條件。如果未指定此注解,則每次将​

​ChannelHandler​

​​執行個體添加到​

​ChannelPipeline​

​中時都必須建立一個新的執行個體,因為它具有成員變量等非共享狀态。

​ChannelGroup​

​​是一個線程安全的​

​Set​

​​ ,包含開放的​

​Channel​

​​并提供對它們的各種批量操作。 使用​

​ChannelGroup​

​​ ,可以将​

​Channel​

​​分類為一個有意義的組(例如,基于每個服務或每個狀态來分組),以便實作廣播的功能,關閉的​

​Channel​

​​會自動從集合中删除(部落客是手動删除的,隻是為了示範怎麼删除​

​Channel​

​​),是以無需擔心​

​ChannelGroup​

​​中​

​Channel​

​​的生命周期。 一個​

​Channel​

​​可以屬于多個​

​ChannelGroup​

​ 。

Netty :Netty介紹 &amp; 實作簡易多人聊天室
private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);      

用戶端

package com.kaven.netty.nio;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;
import java.util.Scanner;

public class Client {

    private static final int PORT = 8080;

    public static void main(String[] args) throws InterruptedException {

        final ClientHandler clientHandler = new ClientHandler();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(PORT))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(clientHandler);
                        }
                    });

            ChannelFuture channelFuture = bootstrap.connect().sync();
            Channel channel = channelFuture.channel();
            //用戶端發送消息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String msg = scanner.nextLine();
                //通過用戶端把輸入内容發送到服務端
                channel.writeAndFlush(msg).sync();
                if(msg.equals("quit")) {
                    channel.close().sync();
                    break;
                }
            }
            channelFuture.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    @ChannelHandler.Sharable
    static class ClientHandler extends SimpleChannelInboundHandler<String> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.print(msg);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            Channel channel = ctx.channel();
            String msg = cause.getMessage();
            System.out.print("群聊[" + channel.remoteAddress() + "]異常: " + msg);
        }
    }
}      

​SimpleChannelInboundHandler​

​​抽象類繼承了​

​ChannelInboundHandlerAdapter​

​​類,并且預設會自動釋放所有處理的消息,在這種情況下,如果需要将消息傳遞給​

​ChannelPipeline​

​​中的下一個​

​ChannelHandler​

​​,需要使用​

​ReferenceCountUtil.retain(Object)​

​​ (​

​Netty​

​資源管理部分,部落客以後也會詳細介紹)。

Netty :Netty介紹 &amp; 實作簡易多人聊天室

測試

Netty :Netty介紹 &amp; 實作簡易多人聊天室
Netty :Netty介紹 &amp; 實作簡易多人聊天室
Netty :Netty介紹 &amp; 實作簡易多人聊天室
Netty :Netty介紹 &amp; 實作簡易多人聊天室

更換I/O網絡程式設計模型

在​

​Netty​

​​中更換​

​I/O​

​網絡程式設計模型非常友善,隻需要修改服務端和用戶端相應的地方即可,如下圖所示:

Netty :Netty介紹 &amp; 實作簡易多人聊天室
Netty :Netty介紹 &amp; 實作簡易多人聊天室
OioEventLoopGroup        OioServerSocketChannel       OioSocketChannel
NioEventLoopGroup        NioServerSocketChannel       NioSocketChannel
EpollEventLoopGroup      EpollServerSocketChannel     EpollSocketChannel
KQueueEventLoopGroup     KQueueServerSocketChannel    KQueueSocketChannel      

​Oio​

​​(​

​Old-Blocking-IO​

​​)就是​

​BIO​

​網絡程式設計模型。

​Epoll​

​:

Netty :Netty介紹 &amp; 實作簡易多人聊天室

​KQueue​

​:

繼續閱讀