天天看点

SpringBoot项目使用Netty实现WebSocket服务器+Vue前端聊天

SpringBoot项目使用Netty实现WebSocket

最终效果图

  • 用户AB对话效果
    • 用户A的界面
    • 用户B的界面
  • 用户上下线通知
  • 历史消息记录可以滚动
SpringBoot项目使用Netty实现WebSocket服务器+Vue前端聊天
SpringBoot项目使用Netty实现WebSocket服务器+Vue前端聊天
  • 客户端发送心跳

项目Gitee地址

  • gitee.com/lyzya/netty…

参考过的文章

  • juejin.cn/post/684490…

搭建项目

  • 创建一个SpringBoot项目:spring-boot-websocket-demo1
  • 项目的pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.11</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.study.websocket</groupId>
    <artifactId>spring-boot-websocket-demo1</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-websocket-demo1</name>
    <description>spring-boot-websocket-demo1</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <!-- spring-boot-starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!-- spring-boot-starter-test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- netty-all(Netty依赖) -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.28.Final</version>
        </dependency>

        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

           

搭建项目-前端

  • 前端代码在项目的spring-boot-websocket-demo1\webapp路径下,这里不再详细描述前端

启动类

  • 给启动类添加@ComponentScan注解扫描组件
java复制代码package com.study.websocket.demo1;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan("com.study.websocket")
public class SpringBootWebsocketDemo1Application {
  
  public static void main(String[] args) {
    SpringApplication.run(SpringBootWebsocketDemo1Application.class, args);
  }
  
}

           

创建服务器的入口:WebSocketServer

  • 在这里从配置文件中读取WebSocket服务器的端口和请求路径

WebSocketServer代码

java复制代码package com.study.websocket.demo1.server;

import com.study.websocket.demo1.handler.WebSocketServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Objects;

/**
 * @author 启年
 * @date 2023-05-12 19:56
 */
@Slf4j
@Component
public class WebSocketServer {
  
  /**
   * Netty 服务的端口号
   */
  @Value("${websocket.netty.port:19999}")
  public int port;
  
  @Value("${websocket.netty.path:/websocket}")
  public String webSocketPath;
  
  private EventLoopGroup bossGroup;
  
  private EventLoopGroup workerGroup;
  
  /**
   * 启动WebSocket服务器
   */
  private void start() throws InterruptedException {
    bossGroup = new NioEventLoopGroup(1);
    workerGroup = new NioEventLoopGroup();
    
    Channel channel = new ServerBootstrap()
        .group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new WebSocketServerInitializer(webSocketPath))
        .bind(port)
        .sync()
        .channel();
    
    log.debug("服务端启动成功,端口号:{}", port);
    
    channel
        .closeFuture()
        .sync();
  }
  
  /**
   * 释放资源
   * PreDestroy注解:在容器销毁该组件之前被调用
   * 注解使用前提:该类的实例必须是由容器创建和管理的,如 Spring、JavaEE 容器等。
   */
  @PreDestroy
  public void destroy() {
    if (Objects.nonNull(bossGroup)) {
      bossGroup.shutdownGracefully();
    }
    
    if (Objects.nonNull(workerGroup)) {
      bossGroup.shutdownGracefully();
    }
  }
  
  /**
   * 初始化WebSocketServer(调用init())
   * PostConstruct注解:用于指示一个方法在容器创建该组件之后立即调用
   * 注解使用前提:该类的实例必须是由容器创建和管理的,如 Spring、JavaEE 容器等。
   */
  @PostConstruct
  public void init() {
    //这里要新开一个线程,否则会阻塞原本的controller等业务
    new Thread(() -> {
      try {
        start();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }).start();
    
  }
  
}

           

创建服务器的初始化类:WebSocketServerInitializer

  • 用于给初始化Netty的Handler

WebSocketServerInitializer代码

java复制代码package com.study.websocket.demo1.handler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
 * @author 启年
 * @date 2023-05-12 22:55
 */
@Slf4j
@AllArgsConstructor
public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> {
  
  /**
   * WebSocket 服务的接口地址
   */
  public String webSocketPath;
  
  @Override
  protected void initChannel(NioSocketChannel ch) throws Exception {
    log.debug("服务的接口地址:{}", webSocketPath);
    ChannelPipeline pipeline = ch.pipeline();
    //自定义的Handler-心跳检测
    pipeline.addLast(new WebSocketIdleStateHandler());
    //HTTP协议编解码器,用于处理HTTP请求和响应的编码和解码。其主要作用是将HTTP请求和响应消息转换为Netty的ByteBuf对象,并将其传递到下一个处理器进行处理。
    pipeline.addLast(new HttpServerCodec());
    //用于HTTP服务端,将来自客户端的HTTP请求和响应消息聚合成一个完整的消息,以便后续的处理。
    pipeline.addLast(new HttpObjectAggregator(65536));
    //用于对WebSocket消息进行压缩和解压缩操作。
    pipeline.addLast(new WebSocketServerCompressionHandler());
    //可以对整个WebSocker通信进行初始化(当Http请求中有升级为WebSocker的请求时),以及握手处理
    pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, null, true));
    //自定义的Handler-处理WebSocket文本类型的消息
    pipeline.addLast(new WebSocketTextHandler());
  }
  
}


           

创建管理连接的配置类-NettyConfig

java复制代码创建
           
  • 使用ConcurrentMap<String, Channel>存储在线连接的userId与channel的对应管理
java复制代码package com.study.websocket.demo1.config;

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * Netty配置类
 *
 * @author 启年
 * @date 2023-05-12 19:32
 */
@Configuration
public class NettyConfig {
  /**
   * 存储所有在线的客户端Channel
   */
  private static final ChannelGroup onlineChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  
  /**
   * 存储所有在线的UserId与之对应的Channel
   */
  private static final ConcurrentMap<String, Channel> onlineUserChannelMap = new ConcurrentHashMap<>();
  
  /**
   * 获取所有在线的客户端Channel
   */
  public static ChannelGroup getOnlineChannelGroup() {
    return onlineChannelGroup;
  }
  
  /**
   * 获取所有在线的UserId与之对应的Channel
   */
  public static ConcurrentMap<String, Channel> getOnlineUserChannelMap() {
    return onlineUserChannelMap;
  }
  
}

           

创建Handler-心跳检测:WebSocketIdleStateHandler

  • 用于检测客户端指定时间内未发送心跳数据,则强制关闭客户端的连接

WebSocketIdleStateHandler代码

java复制代码package com.study.websocket.demo1.handler;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
 * WebSocket服务端的心跳检测
 *
 * @author 启年
 * @date 2023-05-14 10:05
 */
@Slf4j
public class WebSocketIdleStateHandler extends IdleStateHandler {
  
  /**
   * 默认的读空闲时间
   */
  private static final int DEFAULT_READER_IDLE_TIME = 5;
  
  /**
   * 默认30秒读空闲断开客户端
   */
  public WebSocketIdleStateHandler() {
    super(DEFAULT_READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
  }
  
  /**
   * 指定心跳时间(秒)
   *
   * @param readerIdleTimeSeconds 读空闲时间
   * @param writerIdleTimeSeconds 写空闲时间
   * @param allIdleTimeSeconds    读写空闲时间
   */
  public WebSocketIdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS);
  }
  
  /**
   * 指定心跳时间及时间单位
   *
   * @param readerIdleTime 读空闲时间
   * @param writerIdleTime 写空闲时间
   * @param allIdleTime    读写空闲时间
   * @param unit           时间单位
   */
  public WebSocketIdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    super(readerIdleTime, writerIdleTime, allIdleTime, unit);
  }
  
  /**
   * 当空闲事件触发时执行
   */
  @Override
  protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
    //如果是读空闲
    if (evt.state().equals(IdleState.READER_IDLE)) {
      Channel channel = ctx.channel();
      log.debug("服务端未检测到客户端【{}】的心跳包,强制关闭客户端!", channel.id());
      channel.close();
    }
    
    super.channelIdle(ctx,evt);
  }
  
}

           

创建Handler-WebSocket文本消息处理:WebSocketTextHandler

  • 在这里处理客户端的连接、断开以及心跳、客户端连接、消息发送等事件

WebSocketTextHandler代码

java复制代码package com.study.websocket.demo1.handler;

import com.alibaba.fastjson.JSON;
import com.study.websocket.demo1.config.NettyConfig;
import com.study.websocket.demo1.entity.HeartbeatMessage;
import com.study.websocket.demo1.entity.RegisterMessage;
import com.study.websocket.demo1.entity.TextMessage;
import com.study.websocket.demo1.enums.MessageTypeEnum;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.Arrays;
import java.util.Objects;

/**
 * 处理WbeSocket的文本消息
 *
 * @author 启年
 * @date 2023-05-12 20:04
 */
@Slf4j
@Component
@ChannelHandler.Sharable
public class WebSocketTextHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  
  /**
   * 存储在 Channel 中的 attr 属性名(存储用户Id)
   */
  public static final String USER_ID = "userId";
  
  public WebSocketTextHandler() {
    log.debug("WebSocketTextHandler 启动...");
  }
  
  /**
   * 在新的 Channel 被添加到 ChannelPipeline 中时被调用。这通常发生在连接建立时,即 Channel 已经被成功绑定并注册到 EventLoop 中。
   * 在连接建立时被调用一次
   */
  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    //将新连接的客户端Channel存储起来
    NettyConfig.getOnlineChannelGroup().add(channel);
    log.debug("新客户端建立链接 --> {},在线用户数量:{}", channel.id(), NettyConfig.getOnlineChannelGroup().size());
    
  }
  
  /**
   * 在 WebSocket 连接断开时,Netty 会自动触发 channelInactive 事件,并将该事件交给事件处理器进行处理。
   * 在 channelInactive 事件的处理过程中,会调用 handlerRemoved 方法,用于进行一些资源释放等操作,确保 WebSocket 连接正常断开。
   */
  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    //移除断开的客户端的Channel
    Channel channel = ctx.channel();
    cleanChannel(channel);
    log.debug("客户端断开链接 --> {},在线用户数量:{}", channel.id(), NettyConfig.getOnlineChannelGroup().size());
  }
  
  /**
   * 处理客户端非正常断开(WebSocket 连接发生异常时调用)
   */
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    //获取断开连接的客户端的Channel
    Channel channel = ctx.channel();
    //移除断开的客户端的Channel
    cleanChannel(channel);
    log.debug("客户端异常断开 --> {},在线用户数量:{}", channel.id(), NettyConfig.getOnlineChannelGroup().size());
    //当发生异常时,手动关闭Channel
    channel.close();
  }
  
  /**
   * 当 Channel 的连接建立并准备好接收数据时被调用。这意味着连接已经成功建立,可以开始发送和接收数据了。
   * 在每次连接激活时被调用
   */
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
  }
  
  /**
   * 当接收到前端发送的WebSocket时处理
   */
  @Override
  protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
    
    //接收到的文本信息的二进制形式
    ByteBuf content = msg.content();
    
    //接收到的文本信息
    String text = msg.text();
    //获取到该条消息的标识,前端的字段必须后端的枚举名大小写一致
    //根据消息类型进行不同业务处理
    String type = JSON.parseObject(text).get("type").toString();
    MessageTypeEnum messageTypeEnum = MessageTypeEnum.valueOf(type);
    
    //普通文本消息
    if (MessageTypeEnum.TEXT.compareTo(messageTypeEnum) == 0) {
      //发送消息
      sendMsg(text);
    } else if (MessageTypeEnum.HEARTBEAT.compareTo(messageTypeEnum) == 0) {
      HeartbeatMessage heartbeatMessage = JSON.parseObject(text, HeartbeatMessage.class);
      String userId = heartbeatMessage.getUserId();
      //接收到客户端的心跳
      log.debug("来自【{}】的心跳", userId);
    } else if (MessageTypeEnum.REGISTER.compareTo(messageTypeEnum) == 0) {
      //注册
      register(ctx, text);
    }
    
  }
  
  /**
   * 将连接的客户端注册到服务端中
   *
   * @param ctx
   * @param text
   */
  private void register(ChannelHandlerContext ctx, String text) {
    RegisterMessage registerMessage = JSON.parseObject(text, RegisterMessage.class);
    String userId = registerMessage.getUserId();
    //注册客户端
    //给 Channel 绑定一个存储 UserId 的 AttributeKey
    Channel channel = ctx.channel();
    //设置一个名为 userId 的 AttributeKey
    AttributeKey<Object> userIdKey = AttributeKey.valueOf("userId");
    //将 Channel 的 attr 设置一个名为 userId
    channel
        //在 Channel 中寻找名为 userIdKey 的 AttributeKey
        .attr(userIdKey)
        //给这个 AttributeKey 设置值
        .set(userId);
    //当自定义属性在属性集合中不存在时才进行添加
    //.setIfAbsent(userId);
    
    //将UserId与Channel建立联系
    NettyConfig.getOnlineUserChannelMap().put(userId, channel);
    log.debug("在线用户 --> {}", NettyConfig.getOnlineUserChannelMap().keySet());
    
    //通知所有用户都上线了
    NettyConfig.getOnlineChannelGroup().writeAndFlush(new TextWebSocketFrame(
        "用户:【" + userId + "】上线啦!"
    ));
  }
  
  /**
   * 给指定的用户发送消息
   *
   * @param textMessageJson
   */
  private void sendMsg(String textMessageJson) {
    //获取接收到的消息的实体类
    TextMessage textMessage = JSON.parseObject(textMessageJson, TextMessage.class);
    String userId = textMessage.getUserId();
    String userMsg = textMessage.getMsg();
    String receiver = textMessage.getReceiver();
    //给指定的用户发送消息
    Channel receiverChannel = NettyConfig.getOnlineUserChannelMap().get(receiver);
    if (Objects.nonNull(receiverChannel)) {
      //TODO 这里可以设计为结构化的数据,以返回JSON数据便于解析
      receiverChannel.writeAndFlush(new TextWebSocketFrame(userId + ":" + userMsg));
    }
    log.debug("用户【{}】给【{}】发送的消息:{}", userId, receiver, userMsg);
    //TODO 服务端给客户端回复消息(可以设计为失败时返回)
    //channel.writeAndFlush(new TextWebSocketFrame("服务端已接收到消息"));
  }
  
  /**
   * 删除断开连接的客户端在程序中的数据
   *
   * @param channel 断开连接的客户端的 Channel
   */
  private void cleanChannel(Channel channel) {
    //获取客户端 Channel 中存储的名为 userId 的 Attribute
    Attribute<String> userIdKey = channel.attr(AttributeKey.valueOf(USER_ID));
    String userId = userIdKey.get();
    //从 ChannelGroup 中移除断开的 Channel
    NettyConfig.getOnlineChannelGroup().remove(channel);
    //从 Map 中移除 UserId 与 Channel 的对照关系
    NettyConfig.getOnlineUserChannelMap().remove(userId);
    
    //通知所有用户某用户下线了
    NettyConfig.getOnlineChannelGroup().writeAndFlush(new TextWebSocketFrame(
        "用户:【" + userId + "】下线啦!"
    ));
  }
  
  /**
   * 检查给定的字符串是否不是空串、空格、null
   *
   * @param strs 需要检查的字符串
   */
  private boolean checkHasText(String... strs) {
    return Arrays.stream(strs).sequential().allMatch(StringUtils::hasText);
  }
  
}

           

创建实体类

HeartbeatMessage

java复制代码package com.study.websocket.demo1.entity;

import lombok.Data;

import java.io.Serializable;

/**
 * @author 启年
 * @date 2023-05-14 13:39
 */
@Data
public class HeartbeatMessage implements Serializable {
  
  private static final long serialVersionUID = 1290124171105321491L;
  
  
  /**
   * 发送心跳消息的用户Id
   */
  private String userId;

}

           

RegisterMessage

java复制代码package com.study.websocket.demo1.entity;

import lombok.Data;

import java.io.Serializable;

/**
 * @author 启年
 * @date 2023-05-14 13:40
 */
@Data
public class RegisterMessage implements Serializable {
  
  private static final long serialVersionUID = -4953615574208683170L;
  /**
   * 注册到服务端的用户Id
   */
  private String userId;
}

           

TextMessage

java复制代码package com.study.websocket.demo1.entity;

import lombok.Data;

import java.io.Serializable;

/**
 * 文本消息实体类
 *
 * @author 启年
 * @date 2023-05-12 20:16
 */
@Data
public class TextMessage implements Serializable {
  
  private static final long serialVersionUID = -4851870722684661727L;
  
  /**
   * 发送消息的用户Id
   */
  private String userId;
  
  /**
   * 消息的接收者
   */
  private String receiver;
  
  /**
   * 用户发送的消息
   */
  private String msg;
  
}

           

User

java复制代码package com.study.websocket.demo1.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * @author 启年
 * @date 2023-05-13 12:47
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
  
  private static final long serialVersionUID = 3147392908880170895L;
  
  /**
   * 用户Id
   */
  private String userId;
  
  /**
   * 用户名
   */
  private String username;
  
}

           

创建枚举-MessageTypeEnum

java复制代码package com.study.websocket.demo1.enums;

/**
 * 消息类型枚举
 *
 * @author 启年
 * @date 2023-05-14 13:36
 */
public enum MessageTypeEnum {
  TEXT("普通文本消息"),
  HEARTBEAT("心跳数据"),
  REGISTER("注册数据");
  
  MessageTypeEnum(String desc) {
  }
}

           

创建ISendMessageService

  • 该接口主要提供对Controller层的功能支持

ISendMessageService代码

java复制代码package com.study.websocket.demo1.service;

/**
 * @author 启年
 * @date 2023-05-12
 */
public interface ISendMessageService {
  
  /**
   * 根据 UserId 将信息发送给指定的用户
   *
   * @param userId   发送消息的用户Id
   * @param receiver 接收消息的用户Id
   * @param msg      要发送的消息
   */
  void sendMsgToUserByUserId(String userId, String receiver, String msg);
  
  /**
   * 给所有的在线用户发送消息
   *
   * @param msg 要发送的消息
   */
  void sendMsgToGroup(String msg);
  
}

           

创建ISendMessageService实现类:SendMessageService

  • 具体的实现了对某个用户推送消息以及群发消息
java复制代码package com.study.websocket.demo1.service.impl;

import com.study.websocket.demo1.config.NettyConfig;
import com.study.websocket.demo1.service.ISendMessageService;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.Objects;

/**
 * @author 启年
 * @date 2023-05-12 23:31
 */
@Slf4j
@Service
public class SendMessageService implements ISendMessageService {
  
  @Override
  public void sendMsgToUserByUserId(String userId, String receiver, String msg) {
    //根据UserId获取对应的Channel
    Channel channel = NettyConfig.getOnlineUserChannelMap().get(receiver);
    
    if (Objects.isNull(channel)) {
      throw new RuntimeException("UserId:" + receiver + "不存在");
    }
    TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(userId + ":" + msg);
    //将消息发送给指定的用户
    channel.writeAndFlush(textWebSocketFrame);
    log.debug(textWebSocketFrame.text());
  }
  
  @Override
  public void sendMsgToGroup(String msg) {
    //给所有在线的用户发送消息
    NettyConfig.getOnlineChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
  }
  
}

           

控制器类-消息发送-SendMsgController

java复制代码package com.study.websocket.demo1.controller;

import com.study.websocket.demo1.service.ISendMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletResponse;
import java.util.HashMap;
import java.util.Map;

/**
 * @author 启年
 * @date 2023-05-12 23:33
 */
@RestController
@RequestMapping("/send")
public class SendMsgController {
  
  @Autowired
  private ISendMessageService sendMessageService;
  
  /**
   * 单发消息:根据UserId给某个用户发送消息
   */
  @PostMapping("/user")
  public Map<String, Object> sendMsgToUserByUserId(
      @RequestParam("userId") String userId,
      @RequestParam("receiver") String receiver,
      @RequestParam("msg") String msg) {
    sendMessageService.sendMsgToUserByUserId(userId, receiver,msg);
    Map<String, Object> response = new HashMap<>();
    response.put("code", HttpServletResponse.SC_OK);
    response.put("msg", "给" + userId + "的消息发送成功");
    return response;
  }
  
  /**
   * 群发消息:给所有在线的客户端发送消息
   */
  @PostMapping("/group")
  public Map<String, Object> sendMsgToGroup(@RequestParam("msg") String msg) {
    sendMessageService.sendMsgToGroup(msg);
    Map<String, Object> response = new HashMap<>();
    response.put("code", HttpServletResponse.SC_OK);
    response.put("msg", "群发消息成功");
    return response;
  }
  
}

           

控制器类-用户数据-UserController

java复制代码package com.study.websocket.demo1.controller;

import com.study.websocket.demo1.config.NettyConfig;
import com.study.websocket.demo1.entity.User;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author 启年
 * @date 2023-05-13 12:45
 */
@RestController
@RequestMapping("/user")
public class UserController {
  
  /**
   * 返回在线的UserId
   */
  @CrossOrigin(originPatterns = {"http://localhost:8081","http://sso.server.com:9999","http://10.40.129.179:8081"})
  @GetMapping("/online/list")
  public Map<String, Object> onlineList() {
    Map<String, Object> response = new HashMap<>();
    
    List<User> list = new ArrayList<>();
    NettyConfig.getOnlineUserChannelMap().forEach((key, value) -> {
      User user = new User(key, key);
      list.add(user);
    });
    response.put("code", 200);
    response.put("msg", "success");
    response.put("data", list);
    return response;
  }
  
}

           

结束

  • 此时可以运行前端项目,进行对话
  • 也可以使用API接口对某个用户进行消息推送
链接:https://juejin.cn/post/7232905822278729783