SpringBoot項目使用Netty實作WebSocket
最終效果圖
- 使用者AB對話效果
- 使用者上下線通知
- 曆史消息記錄可以滾動
項目Gitee位址
參考過的文章
搭建項目
- 建立一個SpringBoot項目:spring-boot-websocket-demo1
- 項目的pom.xml檔案
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.11</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.study.websocket</groupId>
<artifactId>spring-boot-websocket-demo1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-websocket-demo1</name>
<description>spring-boot-websocket-demo1</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- spring-boot-starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- spring-boot-starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- netty-all(Netty依賴) -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.28.Final</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
搭建項目-前端
- 前端代碼在項目的spring-boot-websocket-demo1\webapp路徑下,這裡不再較長的描述前端
啟動類
- 給啟動類添加@ComponentScan注解掃描元件
java複制代碼package com.study.websocket.demo1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan("com.study.websocket")
public class SpringBootWebsocketDemo1Application {
public static void main(String[] args) {
SpringApplication.run(SpringBootWebsocketDemo1Application.class, args);
}
}
建立伺服器的入口:WebSocketServer
- 在這裡從配置檔案中讀取WebSocket伺服器的端口和請求路徑
WebSocketServer代碼
java複制代碼package com.study.websocket.demo1.server;
import com.study.websocket.demo1.handler.WebSocketServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Objects;
/**
* @author 啟年
* @date 2023-05-12 19:56
*/
@Slf4j
@Component
public class WebSocketServer {
/**
* Netty 服務的端口号
*/
@Value("${websocket.netty.port:19999}")
public int port;
@Value("${websocket.netty.path:/websocket}")
public String webSocketPath;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
/**
* 啟動WebSocket伺服器
*/
private void start() throws InterruptedException {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
Channel channel = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketServerInitializer(webSocketPath))
.bind(port)
.sync()
.channel();
log.debug("服務端啟動成功,端口号:{}", port);
channel
.closeFuture()
.sync();
}
/**
* 釋放資源
* PreDestroy注解:在容器銷毀該元件之前被調用
* 注解使用前提:該類的執行個體必須是由容器建立和管理的,如 Spring、JavaEE 容器等。
*/
@PreDestroy
public void destroy() {
if (Objects.nonNull(bossGroup)) {
bossGroup.shutdownGracefully();
}
if (Objects.nonNull(workerGroup)) {
bossGroup.shutdownGracefully();
}
}
/**
* 初始化WebSocketServer(調用init())
* PostConstruct注解:用于訓示一個方法在容器建立該元件之後立即調用
* 注解使用前提:該類的執行個體必須是由容器建立和管理的,如 Spring、JavaEE 容器等。
*/
@PostConstruct
public void init() {
//這裡要新開一個線程,否則會阻塞原本的controller等業務
new Thread(() -> {
try {
start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
建立伺服器的初始化類:WebSocketServerInitializer
WebSocketServerInitializer代碼
java複制代碼package com.study.websocket.demo1.handler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* @author 啟年
* @date 2023-05-12 22:55
*/
@Slf4j
@AllArgsConstructor
public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> {
/**
* WebSocket 服務的接口位址
*/
public String webSocketPath;
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
log.debug("服務的接口位址:{}", webSocketPath);
ChannelPipeline pipeline = ch.pipeline();
//自定義的Handler-心跳檢測
pipeline.addLast(new WebSocketIdleStateHandler());
//HTTP協定編解碼器,用于處理HTTP請求和響應的編碼和解碼。其主要作用是将HTTP請求和響應消息轉換為Netty的ByteBuf對象,并将其傳遞到下一個處理器進行處理。
pipeline.addLast(new HttpServerCodec());
//用于HTTP服務端,将來自用戶端的HTTP請求和響應消息聚合成一個完整的消息,以便後續的處理。
pipeline.addLast(new HttpObjectAggregator(65536));
//用于對WebSocket消息進行壓縮和解壓縮操作。
pipeline.addLast(new WebSocketServerCompressionHandler());
//可以對整個WebSocker通信進行初始化(當Http請求中有更新為WebSocker的請求時),以及握手處理
pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, null, true));
//自定義的Handler-處理WebSocket文本類型的消息
pipeline.addLast(new WebSocketTextHandler());
}
}
建立管理連接配接的配置類-NettyConfig
java複制代碼建立
- 使用ConcurrentMap<String, Channel>存儲線上連接配接的userId與channel的對應管理
java複制代碼package com.study.websocket.demo1.config;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Netty配置類
*
* @author 啟年
* @date 2023-05-12 19:32
*/
@Configuration
public class NettyConfig {
/**
* 存儲所有線上的用戶端Channel
*/
private static final ChannelGroup onlineChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 存儲所有線上的UserId與之對應的Channel
*/
private static final ConcurrentMap<String, Channel> onlineUserChannelMap = new ConcurrentHashMap<>();
/**
* 擷取所有線上的用戶端Channel
*/
public static ChannelGroup getOnlineChannelGroup() {
return onlineChannelGroup;
}
/**
* 擷取所有線上的UserId與之對應的Channel
*/
public static ConcurrentMap<String, Channel> getOnlineUserChannelMap() {
return onlineUserChannelMap;
}
}
建立Handler-心跳檢測:WebSocketIdleStateHandler
- 用于檢測用戶端指定時間内未發送心跳資料,則強制關閉用戶端的連接配接
WebSocketIdleStateHandler代碼
java複制代碼package com.study.websocket.demo1.handler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* WebSocket服務端的心跳檢測
*
* @author 啟年
* @date 2023-05-14 10:05
*/
@Slf4j
public class WebSocketIdleStateHandler extends IdleStateHandler {
/**
* 預設的讀空閑時間
*/
private static final int DEFAULT_READER_IDLE_TIME = 5;
/**
* 預設30秒讀空閑斷開用戶端
*/
public WebSocketIdleStateHandler() {
super(DEFAULT_READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
}
/**
* 指定心跳時間(秒)
*
* @param readerIdleTimeSeconds 讀空閑時間
* @param writerIdleTimeSeconds 寫空閑時間
* @param allIdleTimeSeconds 讀寫空閑時間
*/
public WebSocketIdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS);
}
/**
* 指定心跳時間及時間機關
*
* @param readerIdleTime 讀空閑時間
* @param writerIdleTime 寫空閑時間
* @param allIdleTime 讀寫空閑時間
* @param unit 時間機關
*/
public WebSocketIdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
super(readerIdleTime, writerIdleTime, allIdleTime, unit);
}
/**
* 當空閑事件觸發時執行
*/
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
//如果是讀空閑
if (evt.state().equals(IdleState.READER_IDLE)) {
Channel channel = ctx.channel();
log.debug("服務端未檢測到用戶端【{}】的心跳包,強制關閉用戶端!", channel.id());
channel.close();
}
super.channelIdle(ctx,evt);
}
}
建立Handler-WebSocket文本消息處理:WebSocketTextHandler
- 在這裡處理用戶端的連接配接、斷開以及心跳、用戶端連接配接、消息發送等事件
WebSocketTextHandler代碼
java複制代碼package com.study.websocket.demo1.handler;
import com.alibaba.fastjson.JSON;
import com.study.websocket.demo1.config.NettyConfig;
import com.study.websocket.demo1.entity.HeartbeatMessage;
import com.study.websocket.demo1.entity.RegisterMessage;
import com.study.websocket.demo1.entity.TextMessage;
import com.study.websocket.demo1.enums.MessageTypeEnum;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Arrays;
import java.util.Objects;
/**
* 處理WbeSocket的文本消息
*
* @author 啟年
* @date 2023-05-12 20:04
*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class WebSocketTextHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 存儲在 Channel 中的 attr 屬性名(存儲使用者Id)
*/
public static final String USER_ID = "userId";
public WebSocketTextHandler() {
log.debug("WebSocketTextHandler 啟動...");
}
/**
* 在新的 Channel 被添加到 ChannelPipeline 中時被調用。這通常發生在連接配接建立時,即 Channel 已經被成功綁定并注冊到 EventLoop 中。
* 在連接配接建立時被調用一次
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//将新連接配接的用戶端Channel存儲起來
NettyConfig.getOnlineChannelGroup().add(channel);
log.debug("新用戶端建立連結 --> {},線上使用者數量:{}", channel.id(), NettyConfig.getOnlineChannelGroup().size());
}
/**
* 在 WebSocket 連接配接斷開時,Netty 會自動觸發 channelInactive 事件,并将該事件交給事件處理器進行處理。
* 在 channelInactive 事件的處理過程中,會調用 handlerRemoved 方法,用于進行一些資源釋放等操作,確定 WebSocket 連接配接正常斷開。
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
//移除斷開的用戶端的Channel
Channel channel = ctx.channel();
cleanChannel(channel);
log.debug("用戶端斷開連結 --> {},線上使用者數量:{}", channel.id(), NettyConfig.getOnlineChannelGroup().size());
}
/**
* 處理用戶端非正常斷開(WebSocket 連接配接發生異常時調用)
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//擷取斷開連接配接的用戶端的Channel
Channel channel = ctx.channel();
//移除斷開的用戶端的Channel
cleanChannel(channel);
log.debug("用戶端異常斷開 --> {},線上使用者數量:{}", channel.id(), NettyConfig.getOnlineChannelGroup().size());
//當發生異常時,手動關閉Channel
channel.close();
}
/**
* 當 Channel 的連接配接建立并準備好接收資料時被調用。這意味着連接配接已經成功建立,可以開始發送和接收資料了。
* 在每次連接配接激活時被調用
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
/**
* 當接收到前端發送的WebSocket時處理
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//接收到的文本資訊的二進制形式
ByteBuf content = msg.content();
//接收到的文本資訊
String text = msg.text();
//擷取到該條消息的辨別,前端的字段必須後端的枚舉名大小寫一緻
//根據消息類型進行不同業務處理
String type = JSON.parseObject(text).get("type").toString();
MessageTypeEnum messageTypeEnum = MessageTypeEnum.valueOf(type);
//普通文本消息
if (MessageTypeEnum.TEXT.compareTo(messageTypeEnum) == 0) {
//發送消息
sendMsg(text);
} else if (MessageTypeEnum.HEARTBEAT.compareTo(messageTypeEnum) == 0) {
HeartbeatMessage heartbeatMessage = JSON.parseObject(text, HeartbeatMessage.class);
String userId = heartbeatMessage.getUserId();
//接收到用戶端的心跳
log.debug("來自【{}】的心跳", userId);
} else if (MessageTypeEnum.REGISTER.compareTo(messageTypeEnum) == 0) {
//注冊
register(ctx, text);
}
}
/**
* 将連接配接的用戶端注冊到服務端中
*
* @param ctx
* @param text
*/
private void register(ChannelHandlerContext ctx, String text) {
RegisterMessage registerMessage = JSON.parseObject(text, RegisterMessage.class);
String userId = registerMessage.getUserId();
//注冊用戶端
//給 Channel 綁定一個存儲 UserId 的 AttributeKey
Channel channel = ctx.channel();
//設定一個名為 userId 的 AttributeKey
AttributeKey<Object> userIdKey = AttributeKey.valueOf("userId");
//将 Channel 的 attr 設定一個名為 userId
channel
//在 Channel 中尋找名為 userIdKey 的 AttributeKey
.attr(userIdKey)
//給這個 AttributeKey 設定值
.set(userId);
//當自定義屬性在屬性集合中不存在時才進行添加
//.setIfAbsent(userId);
//将UserId與Channel建立聯系
NettyConfig.getOnlineUserChannelMap().put(userId, channel);
log.debug("線上使用者 --> {}", NettyConfig.getOnlineUserChannelMap().keySet());
//通知所有使用者都上線了
NettyConfig.getOnlineChannelGroup().writeAndFlush(new TextWebSocketFrame(
"使用者:【" + userId + "】上線啦!"
));
}
/**
* 給指定的使用者發送消息
*
* @param textMessageJson
*/
private void sendMsg(String textMessageJson) {
//擷取接收到的消息的實體類
TextMessage textMessage = JSON.parseObject(textMessageJson, TextMessage.class);
String userId = textMessage.getUserId();
String userMsg = textMessage.getMsg();
String receiver = textMessage.getReceiver();
//給指定的使用者發送消息
Channel receiverChannel = NettyConfig.getOnlineUserChannelMap().get(receiver);
if (Objects.nonNull(receiverChannel)) {
//TODO 這裡可以設計為結構化的資料,以傳回JSON資料便于解析
receiverChannel.writeAndFlush(new TextWebSocketFrame(userId + ":" + userMsg));
}
log.debug("使用者【{}】給【{}】發送的消息:{}", userId, receiver, userMsg);
//TODO 服務端給用戶端回複消息(可以設計為失敗時傳回)
//channel.writeAndFlush(new TextWebSocketFrame("服務端已接收到消息"));
}
/**
* 删除斷開連接配接的用戶端在程式中的資料
*
* @param channel 斷開連接配接的用戶端的 Channel
*/
private void cleanChannel(Channel channel) {
//擷取用戶端 Channel 中存儲的名為 userId 的 Attribute
Attribute<String> userIdKey = channel.attr(AttributeKey.valueOf(USER_ID));
String userId = userIdKey.get();
//從 ChannelGroup 中移除斷開的 Channel
NettyConfig.getOnlineChannelGroup().remove(channel);
//從 Map 中移除 UserId 與 Channel 的對照關系
NettyConfig.getOnlineUserChannelMap().remove(userId);
//通知所有使用者某使用者下線了
NettyConfig.getOnlineChannelGroup().writeAndFlush(new TextWebSocketFrame(
"使用者:【" + userId + "】下線啦!"
));
}
/**
* 檢查給定的字元串是否不是空串、空格、null
*
* @param strs 需要檢查的字元串
*/
private boolean checkHasText(String... strs) {
return Arrays.stream(strs).sequential().allMatch(StringUtils::hasText);
}
}
建立實體類
HeartbeatMessage
java複制代碼package com.study.websocket.demo1.entity;
import lombok.Data;
import java.io.Serializable;
/**
* @author 啟年
* @date 2023-05-14 13:39
*/
@Data
public class HeartbeatMessage implements Serializable {
private static final long serialVersionUID = 1290124171105321491L;
/**
* 發送心跳消息的使用者Id
*/
private String userId;
}
RegisterMessage
java複制代碼package com.study.websocket.demo1.entity;
import lombok.Data;
import java.io.Serializable;
/**
* @author 啟年
* @date 2023-05-14 13:40
*/
@Data
public class RegisterMessage implements Serializable {
private static final long serialVersionUID = -4953615574208683170L;
/**
* 注冊到服務端的使用者Id
*/
private String userId;
}
TextMessage
java複制代碼package com.study.websocket.demo1.entity;
import lombok.Data;
import java.io.Serializable;
/**
* 文本消息實體類
*
* @author 啟年
* @date 2023-05-12 20:16
*/
@Data
public class TextMessage implements Serializable {
private static final long serialVersionUID = -4851870722684661727L;
/**
* 發送消息的使用者Id
*/
private String userId;
/**
* 消息的接收者
*/
private String receiver;
/**
* 使用者發送的消息
*/
private String msg;
}
User
java複制代碼package com.study.websocket.demo1.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author 啟年
* @date 2023-05-13 12:47
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
private static final long serialVersionUID = 3147392908880170895L;
/**
* 使用者Id
*/
private String userId;
/**
* 使用者名
*/
private String username;
}
建立枚舉-MessageTypeEnum
java複制代碼package com.study.websocket.demo1.enums;
/**
* 消息類型枚舉
*
* @author 啟年
* @date 2023-05-14 13:36
*/
public enum MessageTypeEnum {
TEXT("普通文本消息"),
HEARTBEAT("心跳資料"),
REGISTER("注冊資料");
MessageTypeEnum(String desc) {
}
}
建立ISendMessageService
ISendMessageService代碼
java複制代碼package com.study.websocket.demo1.service;
/**
* @author 啟年
* @date 2023-05-12
*/
public interface ISendMessageService {
/**
* 根據 UserId 将資訊發送給指定的使用者
*
* @param userId 發送消息的使用者Id
* @param receiver 接收消息的使用者Id
* @param msg 要發送的消息
*/
void sendMsgToUserByUserId(String userId, String receiver, String msg);
/**
* 給所有的線上使用者發送消息
*
* @param msg 要發送的消息
*/
void sendMsgToGroup(String msg);
}
建立ISendMessageService實作類:SendMessageService
java複制代碼package com.study.websocket.demo1.service.impl;
import com.study.websocket.demo1.config.NettyConfig;
import com.study.websocket.demo1.service.ISendMessageService;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Objects;
/**
* @author 啟年
* @date 2023-05-12 23:31
*/
@Slf4j
@Service
public class SendMessageService implements ISendMessageService {
@Override
public void sendMsgToUserByUserId(String userId, String receiver, String msg) {
//根據UserId擷取對應的Channel
Channel channel = NettyConfig.getOnlineUserChannelMap().get(receiver);
if (Objects.isNull(channel)) {
throw new RuntimeException("UserId:" + receiver + "不存在");
}
TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(userId + ":" + msg);
//将消息發送給指定的使用者
channel.writeAndFlush(textWebSocketFrame);
log.debug(textWebSocketFrame.text());
}
@Override
public void sendMsgToGroup(String msg) {
//給所有線上的使用者發送消息
NettyConfig.getOnlineChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
}
}
控制器類-消息發送-SendMsgController
java複制代碼package com.study.websocket.demo1.controller;
import com.study.websocket.demo1.service.ISendMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletResponse;
import java.util.HashMap;
import java.util.Map;
/**
* @author 啟年
* @date 2023-05-12 23:33
*/
@RestController
@RequestMapping("/send")
public class SendMsgController {
@Autowired
private ISendMessageService sendMessageService;
/**
* 單發消息:根據UserId給某個使用者發送消息
*/
@PostMapping("/user")
public Map<String, Object> sendMsgToUserByUserId(
@RequestParam("userId") String userId,
@RequestParam("receiver") String receiver,
@RequestParam("msg") String msg) {
sendMessageService.sendMsgToUserByUserId(userId, receiver,msg);
Map<String, Object> response = new HashMap<>();
response.put("code", HttpServletResponse.SC_OK);
response.put("msg", "給" + userId + "的消息發送成功");
return response;
}
/**
* 群發消息:給所有線上的用戶端發送消息
*/
@PostMapping("/group")
public Map<String, Object> sendMsgToGroup(@RequestParam("msg") String msg) {
sendMessageService.sendMsgToGroup(msg);
Map<String, Object> response = new HashMap<>();
response.put("code", HttpServletResponse.SC_OK);
response.put("msg", "群發消息成功");
return response;
}
}
控制器類-使用者資料-UserController
java複制代碼package com.study.websocket.demo1.controller;
import com.study.websocket.demo1.config.NettyConfig;
import com.study.websocket.demo1.entity.User;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author 啟年
* @date 2023-05-13 12:45
*/
@RestController
@RequestMapping("/user")
public class UserController {
/**
* 傳回線上的UserId
*/
@CrossOrigin(originPatterns = {"http://localhost:8081","http://sso.server.com:9999","http://10.40.129.179:8081"})
@GetMapping("/online/list")
public Map<String, Object> onlineList() {
Map<String, Object> response = new HashMap<>();
List<User> list = new ArrayList<>();
NettyConfig.getOnlineUserChannelMap().forEach((key, value) -> {
User user = new User(key, key);
list.add(user);
});
response.put("code", 200);
response.put("msg", "success");
response.put("data", list);
return response;
}
}
結束
- 此時可以運作前端項目,進行對話
- 也可以使用API接口對某個使用者進行消息推送
連結:https://juejin.cn/post/7232905822278729783