SpringBoot项目使用Netty实现WebSocket
最终效果图
- 用户AB对话效果
- 用户上下线通知
- 历史消息记录可以滚动
项目Gitee地址
参考过的文章
搭建项目
- 创建一个SpringBoot项目:spring-boot-websocket-demo1
- 项目的pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.11</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.study.websocket</groupId>
<artifactId>spring-boot-websocket-demo1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-websocket-demo1</name>
<description>spring-boot-websocket-demo1</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- spring-boot-starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- spring-boot-starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- netty-all(Netty依赖) -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.28.Final</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
搭建项目-前端
- 前端代码在项目的spring-boot-websocket-demo1\webapp路径下,这里不再详细描述前端
启动类
- 给启动类添加@ComponentScan注解扫描组件
java复制代码package com.study.websocket.demo1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan("com.study.websocket")
public class SpringBootWebsocketDemo1Application {
public static void main(String[] args) {
SpringApplication.run(SpringBootWebsocketDemo1Application.class, args);
}
}
创建服务器的入口:WebSocketServer
- 在这里从配置文件中读取WebSocket服务器的端口和请求路径
WebSocketServer代码
java复制代码package com.study.websocket.demo1.server;
import com.study.websocket.demo1.handler.WebSocketServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Objects;
/**
* @author 启年
* @date 2023-05-12 19:56
*/
@Slf4j
@Component
public class WebSocketServer {
/**
* Netty 服务的端口号
*/
@Value("${websocket.netty.port:19999}")
public int port;
@Value("${websocket.netty.path:/websocket}")
public String webSocketPath;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
/**
* 启动WebSocket服务器
*/
private void start() throws InterruptedException {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
Channel channel = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketServerInitializer(webSocketPath))
.bind(port)
.sync()
.channel();
log.debug("服务端启动成功,端口号:{}", port);
channel
.closeFuture()
.sync();
}
/**
* 释放资源
* PreDestroy注解:在容器销毁该组件之前被调用
* 注解使用前提:该类的实例必须是由容器创建和管理的,如 Spring、JavaEE 容器等。
*/
@PreDestroy
public void destroy() {
if (Objects.nonNull(bossGroup)) {
bossGroup.shutdownGracefully();
}
if (Objects.nonNull(workerGroup)) {
bossGroup.shutdownGracefully();
}
}
/**
* 初始化WebSocketServer(调用init())
* PostConstruct注解:用于指示一个方法在容器创建该组件之后立即调用
* 注解使用前提:该类的实例必须是由容器创建和管理的,如 Spring、JavaEE 容器等。
*/
@PostConstruct
public void init() {
//这里要新开一个线程,否则会阻塞原本的controller等业务
new Thread(() -> {
try {
start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
创建服务器的初始化类:WebSocketServerInitializer
WebSocketServerInitializer代码
java复制代码package com.study.websocket.demo1.handler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* @author 启年
* @date 2023-05-12 22:55
*/
@Slf4j
@AllArgsConstructor
public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> {
/**
* WebSocket 服务的接口地址
*/
public String webSocketPath;
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
log.debug("服务的接口地址:{}", webSocketPath);
ChannelPipeline pipeline = ch.pipeline();
//自定义的Handler-心跳检测
pipeline.addLast(new WebSocketIdleStateHandler());
//HTTP协议编解码器,用于处理HTTP请求和响应的编码和解码。其主要作用是将HTTP请求和响应消息转换为Netty的ByteBuf对象,并将其传递到下一个处理器进行处理。
pipeline.addLast(new HttpServerCodec());
//用于HTTP服务端,将来自客户端的HTTP请求和响应消息聚合成一个完整的消息,以便后续的处理。
pipeline.addLast(new HttpObjectAggregator(65536));
//用于对WebSocket消息进行压缩和解压缩操作。
pipeline.addLast(new WebSocketServerCompressionHandler());
//可以对整个WebSocker通信进行初始化(当Http请求中有升级为WebSocker的请求时),以及握手处理
pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, null, true));
//自定义的Handler-处理WebSocket文本类型的消息
pipeline.addLast(new WebSocketTextHandler());
}
}
创建管理连接的配置类-NettyConfig
java复制代码创建
- 使用ConcurrentMap<String, Channel>存储在线连接的userId与channel的对应管理
java复制代码package com.study.websocket.demo1.config;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Netty配置类
*
* @author 启年
* @date 2023-05-12 19:32
*/
@Configuration
public class NettyConfig {
/**
* 存储所有在线的客户端Channel
*/
private static final ChannelGroup onlineChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 存储所有在线的UserId与之对应的Channel
*/
private static final ConcurrentMap<String, Channel> onlineUserChannelMap = new ConcurrentHashMap<>();
/**
* 获取所有在线的客户端Channel
*/
public static ChannelGroup getOnlineChannelGroup() {
return onlineChannelGroup;
}
/**
* 获取所有在线的UserId与之对应的Channel
*/
public static ConcurrentMap<String, Channel> getOnlineUserChannelMap() {
return onlineUserChannelMap;
}
}
创建Handler-心跳检测:WebSocketIdleStateHandler
- 用于检测客户端指定时间内未发送心跳数据,则强制关闭客户端的连接
WebSocketIdleStateHandler代码
java复制代码package com.study.websocket.demo1.handler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* WebSocket服务端的心跳检测
*
* @author 启年
* @date 2023-05-14 10:05
*/
@Slf4j
public class WebSocketIdleStateHandler extends IdleStateHandler {
/**
* 默认的读空闲时间
*/
private static final int DEFAULT_READER_IDLE_TIME = 5;
/**
* 默认30秒读空闲断开客户端
*/
public WebSocketIdleStateHandler() {
super(DEFAULT_READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
}
/**
* 指定心跳时间(秒)
*
* @param readerIdleTimeSeconds 读空闲时间
* @param writerIdleTimeSeconds 写空闲时间
* @param allIdleTimeSeconds 读写空闲时间
*/
public WebSocketIdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS);
}
/**
* 指定心跳时间及时间单位
*
* @param readerIdleTime 读空闲时间
* @param writerIdleTime 写空闲时间
* @param allIdleTime 读写空闲时间
* @param unit 时间单位
*/
public WebSocketIdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
super(readerIdleTime, writerIdleTime, allIdleTime, unit);
}
/**
* 当空闲事件触发时执行
*/
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
//如果是读空闲
if (evt.state().equals(IdleState.READER_IDLE)) {
Channel channel = ctx.channel();
log.debug("服务端未检测到客户端【{}】的心跳包,强制关闭客户端!", channel.id());
channel.close();
}
super.channelIdle(ctx,evt);
}
}
创建Handler-WebSocket文本消息处理:WebSocketTextHandler
- 在这里处理客户端的连接、断开以及心跳、客户端连接、消息发送等事件
WebSocketTextHandler代码
java复制代码package com.study.websocket.demo1.handler;
import com.alibaba.fastjson.JSON;
import com.study.websocket.demo1.config.NettyConfig;
import com.study.websocket.demo1.entity.HeartbeatMessage;
import com.study.websocket.demo1.entity.RegisterMessage;
import com.study.websocket.demo1.entity.TextMessage;
import com.study.websocket.demo1.enums.MessageTypeEnum;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Arrays;
import java.util.Objects;
/**
* 处理WbeSocket的文本消息
*
* @author 启年
* @date 2023-05-12 20:04
*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class WebSocketTextHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 存储在 Channel 中的 attr 属性名(存储用户Id)
*/
public static final String USER_ID = "userId";
public WebSocketTextHandler() {
log.debug("WebSocketTextHandler 启动...");
}
/**
* 在新的 Channel 被添加到 ChannelPipeline 中时被调用。这通常发生在连接建立时,即 Channel 已经被成功绑定并注册到 EventLoop 中。
* 在连接建立时被调用一次
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//将新连接的客户端Channel存储起来
NettyConfig.getOnlineChannelGroup().add(channel);
log.debug("新客户端建立链接 --> {},在线用户数量:{}", channel.id(), NettyConfig.getOnlineChannelGroup().size());
}
/**
* 在 WebSocket 连接断开时,Netty 会自动触发 channelInactive 事件,并将该事件交给事件处理器进行处理。
* 在 channelInactive 事件的处理过程中,会调用 handlerRemoved 方法,用于进行一些资源释放等操作,确保 WebSocket 连接正常断开。
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
//移除断开的客户端的Channel
Channel channel = ctx.channel();
cleanChannel(channel);
log.debug("客户端断开链接 --> {},在线用户数量:{}", channel.id(), NettyConfig.getOnlineChannelGroup().size());
}
/**
* 处理客户端非正常断开(WebSocket 连接发生异常时调用)
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//获取断开连接的客户端的Channel
Channel channel = ctx.channel();
//移除断开的客户端的Channel
cleanChannel(channel);
log.debug("客户端异常断开 --> {},在线用户数量:{}", channel.id(), NettyConfig.getOnlineChannelGroup().size());
//当发生异常时,手动关闭Channel
channel.close();
}
/**
* 当 Channel 的连接建立并准备好接收数据时被调用。这意味着连接已经成功建立,可以开始发送和接收数据了。
* 在每次连接激活时被调用
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
/**
* 当接收到前端发送的WebSocket时处理
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//接收到的文本信息的二进制形式
ByteBuf content = msg.content();
//接收到的文本信息
String text = msg.text();
//获取到该条消息的标识,前端的字段必须后端的枚举名大小写一致
//根据消息类型进行不同业务处理
String type = JSON.parseObject(text).get("type").toString();
MessageTypeEnum messageTypeEnum = MessageTypeEnum.valueOf(type);
//普通文本消息
if (MessageTypeEnum.TEXT.compareTo(messageTypeEnum) == 0) {
//发送消息
sendMsg(text);
} else if (MessageTypeEnum.HEARTBEAT.compareTo(messageTypeEnum) == 0) {
HeartbeatMessage heartbeatMessage = JSON.parseObject(text, HeartbeatMessage.class);
String userId = heartbeatMessage.getUserId();
//接收到客户端的心跳
log.debug("来自【{}】的心跳", userId);
} else if (MessageTypeEnum.REGISTER.compareTo(messageTypeEnum) == 0) {
//注册
register(ctx, text);
}
}
/**
* 将连接的客户端注册到服务端中
*
* @param ctx
* @param text
*/
private void register(ChannelHandlerContext ctx, String text) {
RegisterMessage registerMessage = JSON.parseObject(text, RegisterMessage.class);
String userId = registerMessage.getUserId();
//注册客户端
//给 Channel 绑定一个存储 UserId 的 AttributeKey
Channel channel = ctx.channel();
//设置一个名为 userId 的 AttributeKey
AttributeKey<Object> userIdKey = AttributeKey.valueOf("userId");
//将 Channel 的 attr 设置一个名为 userId
channel
//在 Channel 中寻找名为 userIdKey 的 AttributeKey
.attr(userIdKey)
//给这个 AttributeKey 设置值
.set(userId);
//当自定义属性在属性集合中不存在时才进行添加
//.setIfAbsent(userId);
//将UserId与Channel建立联系
NettyConfig.getOnlineUserChannelMap().put(userId, channel);
log.debug("在线用户 --> {}", NettyConfig.getOnlineUserChannelMap().keySet());
//通知所有用户都上线了
NettyConfig.getOnlineChannelGroup().writeAndFlush(new TextWebSocketFrame(
"用户:【" + userId + "】上线啦!"
));
}
/**
* 给指定的用户发送消息
*
* @param textMessageJson
*/
private void sendMsg(String textMessageJson) {
//获取接收到的消息的实体类
TextMessage textMessage = JSON.parseObject(textMessageJson, TextMessage.class);
String userId = textMessage.getUserId();
String userMsg = textMessage.getMsg();
String receiver = textMessage.getReceiver();
//给指定的用户发送消息
Channel receiverChannel = NettyConfig.getOnlineUserChannelMap().get(receiver);
if (Objects.nonNull(receiverChannel)) {
//TODO 这里可以设计为结构化的数据,以返回JSON数据便于解析
receiverChannel.writeAndFlush(new TextWebSocketFrame(userId + ":" + userMsg));
}
log.debug("用户【{}】给【{}】发送的消息:{}", userId, receiver, userMsg);
//TODO 服务端给客户端回复消息(可以设计为失败时返回)
//channel.writeAndFlush(new TextWebSocketFrame("服务端已接收到消息"));
}
/**
* 删除断开连接的客户端在程序中的数据
*
* @param channel 断开连接的客户端的 Channel
*/
private void cleanChannel(Channel channel) {
//获取客户端 Channel 中存储的名为 userId 的 Attribute
Attribute<String> userIdKey = channel.attr(AttributeKey.valueOf(USER_ID));
String userId = userIdKey.get();
//从 ChannelGroup 中移除断开的 Channel
NettyConfig.getOnlineChannelGroup().remove(channel);
//从 Map 中移除 UserId 与 Channel 的对照关系
NettyConfig.getOnlineUserChannelMap().remove(userId);
//通知所有用户某用户下线了
NettyConfig.getOnlineChannelGroup().writeAndFlush(new TextWebSocketFrame(
"用户:【" + userId + "】下线啦!"
));
}
/**
* 检查给定的字符串是否不是空串、空格、null
*
* @param strs 需要检查的字符串
*/
private boolean checkHasText(String... strs) {
return Arrays.stream(strs).sequential().allMatch(StringUtils::hasText);
}
}
创建实体类
HeartbeatMessage
java复制代码package com.study.websocket.demo1.entity;
import lombok.Data;
import java.io.Serializable;
/**
* @author 启年
* @date 2023-05-14 13:39
*/
@Data
public class HeartbeatMessage implements Serializable {
private static final long serialVersionUID = 1290124171105321491L;
/**
* 发送心跳消息的用户Id
*/
private String userId;
}
RegisterMessage
java复制代码package com.study.websocket.demo1.entity;
import lombok.Data;
import java.io.Serializable;
/**
* @author 启年
* @date 2023-05-14 13:40
*/
@Data
public class RegisterMessage implements Serializable {
private static final long serialVersionUID = -4953615574208683170L;
/**
* 注册到服务端的用户Id
*/
private String userId;
}
TextMessage
java复制代码package com.study.websocket.demo1.entity;
import lombok.Data;
import java.io.Serializable;
/**
* 文本消息实体类
*
* @author 启年
* @date 2023-05-12 20:16
*/
@Data
public class TextMessage implements Serializable {
private static final long serialVersionUID = -4851870722684661727L;
/**
* 发送消息的用户Id
*/
private String userId;
/**
* 消息的接收者
*/
private String receiver;
/**
* 用户发送的消息
*/
private String msg;
}
User
java复制代码package com.study.websocket.demo1.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author 启年
* @date 2023-05-13 12:47
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
private static final long serialVersionUID = 3147392908880170895L;
/**
* 用户Id
*/
private String userId;
/**
* 用户名
*/
private String username;
}
创建枚举-MessageTypeEnum
java复制代码package com.study.websocket.demo1.enums;
/**
* 消息类型枚举
*
* @author 启年
* @date 2023-05-14 13:36
*/
public enum MessageTypeEnum {
TEXT("普通文本消息"),
HEARTBEAT("心跳数据"),
REGISTER("注册数据");
MessageTypeEnum(String desc) {
}
}
创建ISendMessageService
ISendMessageService代码
java复制代码package com.study.websocket.demo1.service;
/**
* @author 启年
* @date 2023-05-12
*/
public interface ISendMessageService {
/**
* 根据 UserId 将信息发送给指定的用户
*
* @param userId 发送消息的用户Id
* @param receiver 接收消息的用户Id
* @param msg 要发送的消息
*/
void sendMsgToUserByUserId(String userId, String receiver, String msg);
/**
* 给所有的在线用户发送消息
*
* @param msg 要发送的消息
*/
void sendMsgToGroup(String msg);
}
创建ISendMessageService实现类:SendMessageService
java复制代码package com.study.websocket.demo1.service.impl;
import com.study.websocket.demo1.config.NettyConfig;
import com.study.websocket.demo1.service.ISendMessageService;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Objects;
/**
* @author 启年
* @date 2023-05-12 23:31
*/
@Slf4j
@Service
public class SendMessageService implements ISendMessageService {
@Override
public void sendMsgToUserByUserId(String userId, String receiver, String msg) {
//根据UserId获取对应的Channel
Channel channel = NettyConfig.getOnlineUserChannelMap().get(receiver);
if (Objects.isNull(channel)) {
throw new RuntimeException("UserId:" + receiver + "不存在");
}
TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(userId + ":" + msg);
//将消息发送给指定的用户
channel.writeAndFlush(textWebSocketFrame);
log.debug(textWebSocketFrame.text());
}
@Override
public void sendMsgToGroup(String msg) {
//给所有在线的用户发送消息
NettyConfig.getOnlineChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
}
}
控制器类-消息发送-SendMsgController
java复制代码package com.study.websocket.demo1.controller;
import com.study.websocket.demo1.service.ISendMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletResponse;
import java.util.HashMap;
import java.util.Map;
/**
* @author 启年
* @date 2023-05-12 23:33
*/
@RestController
@RequestMapping("/send")
public class SendMsgController {
@Autowired
private ISendMessageService sendMessageService;
/**
* 单发消息:根据UserId给某个用户发送消息
*/
@PostMapping("/user")
public Map<String, Object> sendMsgToUserByUserId(
@RequestParam("userId") String userId,
@RequestParam("receiver") String receiver,
@RequestParam("msg") String msg) {
sendMessageService.sendMsgToUserByUserId(userId, receiver,msg);
Map<String, Object> response = new HashMap<>();
response.put("code", HttpServletResponse.SC_OK);
response.put("msg", "给" + userId + "的消息发送成功");
return response;
}
/**
* 群发消息:给所有在线的客户端发送消息
*/
@PostMapping("/group")
public Map<String, Object> sendMsgToGroup(@RequestParam("msg") String msg) {
sendMessageService.sendMsgToGroup(msg);
Map<String, Object> response = new HashMap<>();
response.put("code", HttpServletResponse.SC_OK);
response.put("msg", "群发消息成功");
return response;
}
}
控制器类-用户数据-UserController
java复制代码package com.study.websocket.demo1.controller;
import com.study.websocket.demo1.config.NettyConfig;
import com.study.websocket.demo1.entity.User;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author 启年
* @date 2023-05-13 12:45
*/
@RestController
@RequestMapping("/user")
public class UserController {
/**
* 返回在线的UserId
*/
@CrossOrigin(originPatterns = {"http://localhost:8081","http://sso.server.com:9999","http://10.40.129.179:8081"})
@GetMapping("/online/list")
public Map<String, Object> onlineList() {
Map<String, Object> response = new HashMap<>();
List<User> list = new ArrayList<>();
NettyConfig.getOnlineUserChannelMap().forEach((key, value) -> {
User user = new User(key, key);
list.add(user);
});
response.put("code", 200);
response.put("msg", "success");
response.put("data", list);
return response;
}
}
结束
- 此时可以运行前端项目,进行对话
- 也可以使用API接口对某个用户进行消息推送
链接:https://juejin.cn/post/7232905822278729783