天天看點

使用Netty編寫一個簡單的群聊系統

概述

具體業務跟下面文章的一樣,隻是下面文章用的是NIO,這裡使用Netty改造。

https://blog.csdn.net/qq_40837310/article/details/111248486

目的是為了熟悉Netty基本API使用。

服務端編碼:

public class ServerGroupChatServer {


    private  int port;

    public ServerGroupChatServer(int port){
        this.port = port;
    }


    public void start() throws InterruptedException {

        NioEventLoopGroup bossGroup = new NioEventLoopGroup();

        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    //設定TCP等待隊列為128
                    .option(ChannelOption.SO_BACKLOG,128)
                    //長連接配接
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                        	//添加handler
                            socketChannel.pipeline().addLast(new MyNettyChannelHandler());
                        }
                    });


            ChannelFuture bindFuture = serverBootstrap.bind("127.0.0.1", port);
            //添加一個監聽器,監聽綁定情況
            bindFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    boolean success = future.isSuccess();
                    if (success)
                        System.out.println("服務綁定成功");
                    else{
                        System.out.println("服務綁定失敗");
                    }
                }
            });

            bindFuture.channel().closeFuture().sync();

        }finally {
        	//關閉
            bossGroup.shutdownGracefully();
            //關閉
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        ServerGroupChatServer serverGroupChatServer = new ServerGroupChatServer(9898);
        serverGroupChatServer.start();
    }
}
           

服務端的Handler:

public class MyNettyChannelHandler extends ChannelInboundHandlerAdapter {

    private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //進入這裡表示channel已經處于非激活狀态,關閉,在這裡提示該用戶端下線。
        System.out.println("用戶端:" + ctx.channel().remoteAddress() + "已離線");
        channels.remove(ctx.channel());
        super.channelInactive(ctx);
    }

    //READ事件觸發時調用
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //按照原業務,讀取到消息就進行轉發
        for (Channel channel:channels){
            //排除自身
            if (channel == ctx.channel())
                continue;
            ByteBuf byteBuf = (ByteBuf) msg;
            byteBuf.retain();
            //發送
            channel.writeAndFlush(byteBuf);
        }

        ctx.fireChannelRead(msg);
    }



    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //進入這裡就證明連接配接以建立,提醒其他使用者此用戶端已上線
        //ChannelGroup的寫是裡面所有channel 的寫
        channels.writeAndFlush(Unpooled.copiedBuffer("用戶端:" + ctx.channel().remoteAddress() + " 已上線。", CharsetUtil.UTF_8));
        //把目前通道加到集合中
        channels.add(ctx.channel());
        super.channelActive(ctx);
    }

    //觸發異常關閉用戶端
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireChannelActive();
    }
}

           

用戶端代碼:

public class NettyGroupChatClient {

    private  int serverPort;

    private String clientName;

    private SocketChannel socketChannel;

    public NettyGroupChatClient(int serverPort,String clientName){
        this.clientName = clientName;
        this.serverPort = serverPort;
    }

    public void start() throws InterruptedException {
        NioEventLoopGroup worker = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();

        try {
            bootstrap.group(worker)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new MyNettyChannelHandlerClient());
                            //sc.pipeline().addLast(new OUT1());
                            //sc.pipeline().addLast(new OUT2());
                            socketChannel = sc;
                        }
                    });

            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort).sync();
            //添加一個監聽器,當連接配接成功時就給服務端每三秒發送一次資料
            channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    if (future.isSuccess()){
                        System.out.println("連接配接成功");
                        new Thread(()->{
                            while (true) {
                                socketChannel.writeAndFlush(Unpooled.copiedBuffer("Hello, I am " + clientName, CharsetUtil.UTF_8));
                                try {
                                    TimeUnit.SECONDS.sleep(3);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }
                        }).start();
                     }
                     else
                        System.out.println("連接配接失敗");
                    }


            });
            channelFuture.channel().closeFuture().sync();
        }finally {
            worker.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        NettyGroupChatClient Mike = new NettyGroupChatClient(9898,"John");
        Mike.start();


    }

}
           

用戶端的Handler

public class MyNettyChannelHandlerClient extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println(byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
}

           

運作結果:

用戶端1:

使用Netty編寫一個簡單的群聊系統

用戶端2:

使用Netty編寫一個簡單的群聊系統

用戶端3:

使用Netty編寫一個簡單的群聊系統

都能過接收其他兩個用戶端發的消息。

把三個用戶端關閉後:

服務端控制台:

使用Netty編寫一個簡單的群聊系統

繼續閱讀