概述
具體業務跟下面文章的一樣,隻是下面文章用的是NIO,這裡使用Netty改造。
https://blog.csdn.net/qq_40837310/article/details/111248486
目的是為了熟悉Netty基本API使用。
服務端編碼:
public class ServerGroupChatServer {
private int port;
public ServerGroupChatServer(int port){
this.port = port;
}
public void start() throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
//設定TCP等待隊列為128
.option(ChannelOption.SO_BACKLOG,128)
//長連接配接
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加handler
socketChannel.pipeline().addLast(new MyNettyChannelHandler());
}
});
ChannelFuture bindFuture = serverBootstrap.bind("127.0.0.1", port);
//添加一個監聽器,監聽綁定情況
bindFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
boolean success = future.isSuccess();
if (success)
System.out.println("服務綁定成功");
else{
System.out.println("服務綁定失敗");
}
}
});
bindFuture.channel().closeFuture().sync();
}finally {
//關閉
bossGroup.shutdownGracefully();
//關閉
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
ServerGroupChatServer serverGroupChatServer = new ServerGroupChatServer(9898);
serverGroupChatServer.start();
}
}
服務端的Handler:
public class MyNettyChannelHandler extends ChannelInboundHandlerAdapter {
private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//進入這裡表示channel已經處于非激活狀态,關閉,在這裡提示該用戶端下線。
System.out.println("用戶端:" + ctx.channel().remoteAddress() + "已離線");
channels.remove(ctx.channel());
super.channelInactive(ctx);
}
//READ事件觸發時調用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//按照原業務,讀取到消息就進行轉發
for (Channel channel:channels){
//排除自身
if (channel == ctx.channel())
continue;
ByteBuf byteBuf = (ByteBuf) msg;
byteBuf.retain();
//發送
channel.writeAndFlush(byteBuf);
}
ctx.fireChannelRead(msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//進入這裡就證明連接配接以建立,提醒其他使用者此用戶端已上線
//ChannelGroup的寫是裡面所有channel 的寫
channels.writeAndFlush(Unpooled.copiedBuffer("用戶端:" + ctx.channel().remoteAddress() + " 已上線。", CharsetUtil.UTF_8));
//把目前通道加到集合中
channels.add(ctx.channel());
super.channelActive(ctx);
}
//觸發異常關閉用戶端
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireChannelActive();
}
}
用戶端代碼:
public class NettyGroupChatClient {
private int serverPort;
private String clientName;
private SocketChannel socketChannel;
public NettyGroupChatClient(int serverPort,String clientName){
this.clientName = clientName;
this.serverPort = serverPort;
}
public void start() throws InterruptedException {
NioEventLoopGroup worker = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new MyNettyChannelHandlerClient());
//sc.pipeline().addLast(new OUT1());
//sc.pipeline().addLast(new OUT2());
socketChannel = sc;
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", serverPort).sync();
//添加一個監聽器,當連接配接成功時就給服務端每三秒發送一次資料
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()){
System.out.println("連接配接成功");
new Thread(()->{
while (true) {
socketChannel.writeAndFlush(Unpooled.copiedBuffer("Hello, I am " + clientName, CharsetUtil.UTF_8));
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
else
System.out.println("連接配接失敗");
}
});
channelFuture.channel().closeFuture().sync();
}finally {
worker.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
NettyGroupChatClient Mike = new NettyGroupChatClient(9898,"John");
Mike.start();
}
}
用戶端的Handler
public class MyNettyChannelHandlerClient extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
}
運作結果:
用戶端1:
用戶端2:
用戶端3:
都能過接收其他兩個用戶端發的消息。
把三個用戶端關閉後:
服務端控制台: