天天看點

SpringBoot項目使用Netty實作WebSocket伺服器+Vue前端聊天

作者:Java小熊

SpringBoot項目使用Netty實作WebSocket

最終效果圖

  • 使用者AB對話效果
    • 使用者A的界面
    • 使用者B的界面
  • 使用者上下線通知
  • 曆史消息記錄可以滾動
SpringBoot項目使用Netty實作WebSocket伺服器+Vue前端聊天
SpringBoot項目使用Netty實作WebSocket伺服器+Vue前端聊天
  • 用戶端發送心跳

項目Gitee位址

  • gitee.com/lyzya/netty…

參考過的文章

  • juejin.cn/post/684490…

搭建項目

  • 建立一個SpringBoot項目:spring-boot-websocket-demo1
  • 項目的pom.xml檔案
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.11</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.study.websocket</groupId>
    <artifactId>spring-boot-websocket-demo1</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-websocket-demo1</name>
    <description>spring-boot-websocket-demo1</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <!-- spring-boot-starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!-- spring-boot-starter-test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- netty-all(Netty依賴) -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.28.Final</version>
        </dependency>

        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

           

搭建項目-前端

  • 前端代碼在項目的spring-boot-websocket-demo1\webapp路徑下,這裡不再較長的描述前端

啟動類

  • 給啟動類添加@ComponentScan注解掃描元件
java複制代碼package com.study.websocket.demo1;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan("com.study.websocket")
public class SpringBootWebsocketDemo1Application {
  
  public static void main(String[] args) {
    SpringApplication.run(SpringBootWebsocketDemo1Application.class, args);
  }
  
}

           

建立伺服器的入口:WebSocketServer

  • 在這裡從配置檔案中讀取WebSocket伺服器的端口和請求路徑

WebSocketServer代碼

java複制代碼package com.study.websocket.demo1.server;

import com.study.websocket.demo1.handler.WebSocketServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Objects;

/**
 * @author 啟年
 * @date 2023-05-12 19:56
 */
@Slf4j
@Component
public class WebSocketServer {
  
  /**
   * Netty 服務的端口号
   */
  @Value("${websocket.netty.port:19999}")
  public int port;
  
  @Value("${websocket.netty.path:/websocket}")
  public String webSocketPath;
  
  private EventLoopGroup bossGroup;
  
  private EventLoopGroup workerGroup;
  
  /**
   * 啟動WebSocket伺服器
   */
  private void start() throws InterruptedException {
    bossGroup = new NioEventLoopGroup(1);
    workerGroup = new NioEventLoopGroup();
    
    Channel channel = new ServerBootstrap()
        .group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new WebSocketServerInitializer(webSocketPath))
        .bind(port)
        .sync()
        .channel();
    
    log.debug("服務端啟動成功,端口号:{}", port);
    
    channel
        .closeFuture()
        .sync();
  }
  
  /**
   * 釋放資源
   * PreDestroy注解:在容器銷毀該元件之前被調用
   * 注解使用前提:該類的執行個體必須是由容器建立和管理的,如 Spring、JavaEE 容器等。
   */
  @PreDestroy
  public void destroy() {
    if (Objects.nonNull(bossGroup)) {
      bossGroup.shutdownGracefully();
    }
    
    if (Objects.nonNull(workerGroup)) {
      bossGroup.shutdownGracefully();
    }
  }
  
  /**
   * 初始化WebSocketServer(調用init())
   * PostConstruct注解:用于訓示一個方法在容器建立該元件之後立即調用
   * 注解使用前提:該類的執行個體必須是由容器建立和管理的,如 Spring、JavaEE 容器等。
   */
  @PostConstruct
  public void init() {
    //這裡要新開一個線程,否則會阻塞原本的controller等業務
    new Thread(() -> {
      try {
        start();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }).start();
    
  }
  
}

           

建立伺服器的初始化類:WebSocketServerInitializer

  • 用于給初始化Netty的Handler

WebSocketServerInitializer代碼

java複制代碼package com.study.websocket.demo1.handler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
 * @author 啟年
 * @date 2023-05-12 22:55
 */
@Slf4j
@AllArgsConstructor
public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> {
  
  /**
   * WebSocket 服務的接口位址
   */
  public String webSocketPath;
  
  @Override
  protected void initChannel(NioSocketChannel ch) throws Exception {
    log.debug("服務的接口位址:{}", webSocketPath);
    ChannelPipeline pipeline = ch.pipeline();
    //自定義的Handler-心跳檢測
    pipeline.addLast(new WebSocketIdleStateHandler());
    //HTTP協定編解碼器,用于處理HTTP請求和響應的編碼和解碼。其主要作用是将HTTP請求和響應消息轉換為Netty的ByteBuf對象,并将其傳遞到下一個處理器進行處理。
    pipeline.addLast(new HttpServerCodec());
    //用于HTTP服務端,将來自用戶端的HTTP請求和響應消息聚合成一個完整的消息,以便後續的處理。
    pipeline.addLast(new HttpObjectAggregator(65536));
    //用于對WebSocket消息進行壓縮和解壓縮操作。
    pipeline.addLast(new WebSocketServerCompressionHandler());
    //可以對整個WebSocker通信進行初始化(當Http請求中有更新為WebSocker的請求時),以及握手處理
    pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, null, true));
    //自定義的Handler-處理WebSocket文本類型的消息
    pipeline.addLast(new WebSocketTextHandler());
  }
  
}


           

建立管理連接配接的配置類-NettyConfig

java複制代碼建立
           
  • 使用ConcurrentMap<String, Channel>存儲線上連接配接的userId與channel的對應管理
java複制代碼package com.study.websocket.demo1.config;

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * Netty配置類
 *
 * @author 啟年
 * @date 2023-05-12 19:32
 */
@Configuration
public class NettyConfig {
  /**
   * 存儲所有線上的用戶端Channel
   */
  private static final ChannelGroup onlineChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  
  /**
   * 存儲所有線上的UserId與之對應的Channel
   */
  private static final ConcurrentMap<String, Channel> onlineUserChannelMap = new ConcurrentHashMap<>();
  
  /**
   * 擷取所有線上的用戶端Channel
   */
  public static ChannelGroup getOnlineChannelGroup() {
    return onlineChannelGroup;
  }
  
  /**
   * 擷取所有線上的UserId與之對應的Channel
   */
  public static ConcurrentMap<String, Channel> getOnlineUserChannelMap() {
    return onlineUserChannelMap;
  }
  
}

           

建立Handler-心跳檢測:WebSocketIdleStateHandler

  • 用于檢測用戶端指定時間内未發送心跳資料,則強制關閉用戶端的連接配接

WebSocketIdleStateHandler代碼

java複制代碼package com.study.websocket.demo1.handler;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
 * WebSocket服務端的心跳檢測
 *
 * @author 啟年
 * @date 2023-05-14 10:05
 */
@Slf4j
public class WebSocketIdleStateHandler extends IdleStateHandler {
  
  /**
   * 預設的讀空閑時間
   */
  private static final int DEFAULT_READER_IDLE_TIME = 5;
  
  /**
   * 預設30秒讀空閑斷開用戶端
   */
  public WebSocketIdleStateHandler() {
    super(DEFAULT_READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
  }
  
  /**
   * 指定心跳時間(秒)
   *
   * @param readerIdleTimeSeconds 讀空閑時間
   * @param writerIdleTimeSeconds 寫空閑時間
   * @param allIdleTimeSeconds    讀寫空閑時間
   */
  public WebSocketIdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS);
  }
  
  /**
   * 指定心跳時間及時間機關
   *
   * @param readerIdleTime 讀空閑時間
   * @param writerIdleTime 寫空閑時間
   * @param allIdleTime    讀寫空閑時間
   * @param unit           時間機關
   */
  public WebSocketIdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    super(readerIdleTime, writerIdleTime, allIdleTime, unit);
  }
  
  /**
   * 當空閑事件觸發時執行
   */
  @Override
  protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
    //如果是讀空閑
    if (evt.state().equals(IdleState.READER_IDLE)) {
      Channel channel = ctx.channel();
      log.debug("服務端未檢測到用戶端【{}】的心跳包,強制關閉用戶端!", channel.id());
      channel.close();
    }
    
    super.channelIdle(ctx,evt);
  }
  
}

           

建立Handler-WebSocket文本消息處理:WebSocketTextHandler

  • 在這裡處理用戶端的連接配接、斷開以及心跳、用戶端連接配接、消息發送等事件

WebSocketTextHandler代碼

java複制代碼package com.study.websocket.demo1.handler;

import com.alibaba.fastjson.JSON;
import com.study.websocket.demo1.config.NettyConfig;
import com.study.websocket.demo1.entity.HeartbeatMessage;
import com.study.websocket.demo1.entity.RegisterMessage;
import com.study.websocket.demo1.entity.TextMessage;
import com.study.websocket.demo1.enums.MessageTypeEnum;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.Arrays;
import java.util.Objects;

/**
 * 處理WbeSocket的文本消息
 *
 * @author 啟年
 * @date 2023-05-12 20:04
 */
@Slf4j
@Component
@ChannelHandler.Sharable
public class WebSocketTextHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  
  /**
   * 存儲在 Channel 中的 attr 屬性名(存儲使用者Id)
   */
  public static final String USER_ID = "userId";
  
  public WebSocketTextHandler() {
    log.debug("WebSocketTextHandler 啟動...");
  }
  
  /**
   * 在新的 Channel 被添加到 ChannelPipeline 中時被調用。這通常發生在連接配接建立時,即 Channel 已經被成功綁定并注冊到 EventLoop 中。
   * 在連接配接建立時被調用一次
   */
  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    //将新連接配接的用戶端Channel存儲起來
    NettyConfig.getOnlineChannelGroup().add(channel);
    log.debug("新用戶端建立連結 --> {},線上使用者數量:{}", channel.id(), NettyConfig.getOnlineChannelGroup().size());
    
  }
  
  /**
   * 在 WebSocket 連接配接斷開時,Netty 會自動觸發 channelInactive 事件,并将該事件交給事件處理器進行處理。
   * 在 channelInactive 事件的處理過程中,會調用 handlerRemoved 方法,用于進行一些資源釋放等操作,確定 WebSocket 連接配接正常斷開。
   */
  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    //移除斷開的用戶端的Channel
    Channel channel = ctx.channel();
    cleanChannel(channel);
    log.debug("用戶端斷開連結 --> {},線上使用者數量:{}", channel.id(), NettyConfig.getOnlineChannelGroup().size());
  }
  
  /**
   * 處理用戶端非正常斷開(WebSocket 連接配接發生異常時調用)
   */
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    //擷取斷開連接配接的用戶端的Channel
    Channel channel = ctx.channel();
    //移除斷開的用戶端的Channel
    cleanChannel(channel);
    log.debug("用戶端異常斷開 --> {},線上使用者數量:{}", channel.id(), NettyConfig.getOnlineChannelGroup().size());
    //當發生異常時,手動關閉Channel
    channel.close();
  }
  
  /**
   * 當 Channel 的連接配接建立并準備好接收資料時被調用。這意味着連接配接已經成功建立,可以開始發送和接收資料了。
   * 在每次連接配接激活時被調用
   */
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
  }
  
  /**
   * 當接收到前端發送的WebSocket時處理
   */
  @Override
  protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
    
    //接收到的文本資訊的二進制形式
    ByteBuf content = msg.content();
    
    //接收到的文本資訊
    String text = msg.text();
    //擷取到該條消息的辨別,前端的字段必須後端的枚舉名大小寫一緻
    //根據消息類型進行不同業務處理
    String type = JSON.parseObject(text).get("type").toString();
    MessageTypeEnum messageTypeEnum = MessageTypeEnum.valueOf(type);
    
    //普通文本消息
    if (MessageTypeEnum.TEXT.compareTo(messageTypeEnum) == 0) {
      //發送消息
      sendMsg(text);
    } else if (MessageTypeEnum.HEARTBEAT.compareTo(messageTypeEnum) == 0) {
      HeartbeatMessage heartbeatMessage = JSON.parseObject(text, HeartbeatMessage.class);
      String userId = heartbeatMessage.getUserId();
      //接收到用戶端的心跳
      log.debug("來自【{}】的心跳", userId);
    } else if (MessageTypeEnum.REGISTER.compareTo(messageTypeEnum) == 0) {
      //注冊
      register(ctx, text);
    }
    
  }
  
  /**
   * 将連接配接的用戶端注冊到服務端中
   *
   * @param ctx
   * @param text
   */
  private void register(ChannelHandlerContext ctx, String text) {
    RegisterMessage registerMessage = JSON.parseObject(text, RegisterMessage.class);
    String userId = registerMessage.getUserId();
    //注冊用戶端
    //給 Channel 綁定一個存儲 UserId 的 AttributeKey
    Channel channel = ctx.channel();
    //設定一個名為 userId 的 AttributeKey
    AttributeKey<Object> userIdKey = AttributeKey.valueOf("userId");
    //将 Channel 的 attr 設定一個名為 userId
    channel
        //在 Channel 中尋找名為 userIdKey 的 AttributeKey
        .attr(userIdKey)
        //給這個 AttributeKey 設定值
        .set(userId);
    //當自定義屬性在屬性集合中不存在時才進行添加
    //.setIfAbsent(userId);
    
    //将UserId與Channel建立聯系
    NettyConfig.getOnlineUserChannelMap().put(userId, channel);
    log.debug("線上使用者 --> {}", NettyConfig.getOnlineUserChannelMap().keySet());
    
    //通知所有使用者都上線了
    NettyConfig.getOnlineChannelGroup().writeAndFlush(new TextWebSocketFrame(
        "使用者:【" + userId + "】上線啦!"
    ));
  }
  
  /**
   * 給指定的使用者發送消息
   *
   * @param textMessageJson
   */
  private void sendMsg(String textMessageJson) {
    //擷取接收到的消息的實體類
    TextMessage textMessage = JSON.parseObject(textMessageJson, TextMessage.class);
    String userId = textMessage.getUserId();
    String userMsg = textMessage.getMsg();
    String receiver = textMessage.getReceiver();
    //給指定的使用者發送消息
    Channel receiverChannel = NettyConfig.getOnlineUserChannelMap().get(receiver);
    if (Objects.nonNull(receiverChannel)) {
      //TODO 這裡可以設計為結構化的資料,以傳回JSON資料便于解析
      receiverChannel.writeAndFlush(new TextWebSocketFrame(userId + ":" + userMsg));
    }
    log.debug("使用者【{}】給【{}】發送的消息:{}", userId, receiver, userMsg);
    //TODO 服務端給用戶端回複消息(可以設計為失敗時傳回)
    //channel.writeAndFlush(new TextWebSocketFrame("服務端已接收到消息"));
  }
  
  /**
   * 删除斷開連接配接的用戶端在程式中的資料
   *
   * @param channel 斷開連接配接的用戶端的 Channel
   */
  private void cleanChannel(Channel channel) {
    //擷取用戶端 Channel 中存儲的名為 userId 的 Attribute
    Attribute<String> userIdKey = channel.attr(AttributeKey.valueOf(USER_ID));
    String userId = userIdKey.get();
    //從 ChannelGroup 中移除斷開的 Channel
    NettyConfig.getOnlineChannelGroup().remove(channel);
    //從 Map 中移除 UserId 與 Channel 的對照關系
    NettyConfig.getOnlineUserChannelMap().remove(userId);
    
    //通知所有使用者某使用者下線了
    NettyConfig.getOnlineChannelGroup().writeAndFlush(new TextWebSocketFrame(
        "使用者:【" + userId + "】下線啦!"
    ));
  }
  
  /**
   * 檢查給定的字元串是否不是空串、空格、null
   *
   * @param strs 需要檢查的字元串
   */
  private boolean checkHasText(String... strs) {
    return Arrays.stream(strs).sequential().allMatch(StringUtils::hasText);
  }
  
}

           

建立實體類

HeartbeatMessage

java複制代碼package com.study.websocket.demo1.entity;

import lombok.Data;

import java.io.Serializable;

/**
 * @author 啟年
 * @date 2023-05-14 13:39
 */
@Data
public class HeartbeatMessage implements Serializable {
  
  private static final long serialVersionUID = 1290124171105321491L;
  
  
  /**
   * 發送心跳消息的使用者Id
   */
  private String userId;

}

           

RegisterMessage

java複制代碼package com.study.websocket.demo1.entity;

import lombok.Data;

import java.io.Serializable;

/**
 * @author 啟年
 * @date 2023-05-14 13:40
 */
@Data
public class RegisterMessage implements Serializable {
  
  private static final long serialVersionUID = -4953615574208683170L;
  /**
   * 注冊到服務端的使用者Id
   */
  private String userId;
}

           

TextMessage

java複制代碼package com.study.websocket.demo1.entity;

import lombok.Data;

import java.io.Serializable;

/**
 * 文本消息實體類
 *
 * @author 啟年
 * @date 2023-05-12 20:16
 */
@Data
public class TextMessage implements Serializable {
  
  private static final long serialVersionUID = -4851870722684661727L;
  
  /**
   * 發送消息的使用者Id
   */
  private String userId;
  
  /**
   * 消息的接收者
   */
  private String receiver;
  
  /**
   * 使用者發送的消息
   */
  private String msg;
  
}

           

User

java複制代碼package com.study.websocket.demo1.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * @author 啟年
 * @date 2023-05-13 12:47
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
  
  private static final long serialVersionUID = 3147392908880170895L;
  
  /**
   * 使用者Id
   */
  private String userId;
  
  /**
   * 使用者名
   */
  private String username;
  
}

           

建立枚舉-MessageTypeEnum

java複制代碼package com.study.websocket.demo1.enums;

/**
 * 消息類型枚舉
 *
 * @author 啟年
 * @date 2023-05-14 13:36
 */
public enum MessageTypeEnum {
  TEXT("普通文本消息"),
  HEARTBEAT("心跳資料"),
  REGISTER("注冊資料");
  
  MessageTypeEnum(String desc) {
  }
}

           

建立ISendMessageService

  • 該接口主要提供對Controller層的功能支援

ISendMessageService代碼

java複制代碼package com.study.websocket.demo1.service;

/**
 * @author 啟年
 * @date 2023-05-12
 */
public interface ISendMessageService {
  
  /**
   * 根據 UserId 将資訊發送給指定的使用者
   *
   * @param userId   發送消息的使用者Id
   * @param receiver 接收消息的使用者Id
   * @param msg      要發送的消息
   */
  void sendMsgToUserByUserId(String userId, String receiver, String msg);
  
  /**
   * 給所有的線上使用者發送消息
   *
   * @param msg 要發送的消息
   */
  void sendMsgToGroup(String msg);
  
}

           

建立ISendMessageService實作類:SendMessageService

  • 具體的實作了對某個使用者推送消息以及群發消息
java複制代碼package com.study.websocket.demo1.service.impl;

import com.study.websocket.demo1.config.NettyConfig;
import com.study.websocket.demo1.service.ISendMessageService;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.Objects;

/**
 * @author 啟年
 * @date 2023-05-12 23:31
 */
@Slf4j
@Service
public class SendMessageService implements ISendMessageService {
  
  @Override
  public void sendMsgToUserByUserId(String userId, String receiver, String msg) {
    //根據UserId擷取對應的Channel
    Channel channel = NettyConfig.getOnlineUserChannelMap().get(receiver);
    
    if (Objects.isNull(channel)) {
      throw new RuntimeException("UserId:" + receiver + "不存在");
    }
    TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(userId + ":" + msg);
    //将消息發送給指定的使用者
    channel.writeAndFlush(textWebSocketFrame);
    log.debug(textWebSocketFrame.text());
  }
  
  @Override
  public void sendMsgToGroup(String msg) {
    //給所有線上的使用者發送消息
    NettyConfig.getOnlineChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
  }
  
}

           

控制器類-消息發送-SendMsgController

java複制代碼package com.study.websocket.demo1.controller;

import com.study.websocket.demo1.service.ISendMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletResponse;
import java.util.HashMap;
import java.util.Map;

/**
 * @author 啟年
 * @date 2023-05-12 23:33
 */
@RestController
@RequestMapping("/send")
public class SendMsgController {
  
  @Autowired
  private ISendMessageService sendMessageService;
  
  /**
   * 單發消息:根據UserId給某個使用者發送消息
   */
  @PostMapping("/user")
  public Map<String, Object> sendMsgToUserByUserId(
      @RequestParam("userId") String userId,
      @RequestParam("receiver") String receiver,
      @RequestParam("msg") String msg) {
    sendMessageService.sendMsgToUserByUserId(userId, receiver,msg);
    Map<String, Object> response = new HashMap<>();
    response.put("code", HttpServletResponse.SC_OK);
    response.put("msg", "給" + userId + "的消息發送成功");
    return response;
  }
  
  /**
   * 群發消息:給所有線上的用戶端發送消息
   */
  @PostMapping("/group")
  public Map<String, Object> sendMsgToGroup(@RequestParam("msg") String msg) {
    sendMessageService.sendMsgToGroup(msg);
    Map<String, Object> response = new HashMap<>();
    response.put("code", HttpServletResponse.SC_OK);
    response.put("msg", "群發消息成功");
    return response;
  }
  
}

           

控制器類-使用者資料-UserController

java複制代碼package com.study.websocket.demo1.controller;

import com.study.websocket.demo1.config.NettyConfig;
import com.study.websocket.demo1.entity.User;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author 啟年
 * @date 2023-05-13 12:45
 */
@RestController
@RequestMapping("/user")
public class UserController {
  
  /**
   * 傳回線上的UserId
   */
  @CrossOrigin(originPatterns = {"http://localhost:8081","http://sso.server.com:9999","http://10.40.129.179:8081"})
  @GetMapping("/online/list")
  public Map<String, Object> onlineList() {
    Map<String, Object> response = new HashMap<>();
    
    List<User> list = new ArrayList<>();
    NettyConfig.getOnlineUserChannelMap().forEach((key, value) -> {
      User user = new User(key, key);
      list.add(user);
    });
    response.put("code", 200);
    response.put("msg", "success");
    response.put("data", list);
    return response;
  }
  
}

           

結束

  • 此時可以運作前端項目,進行對話
  • 也可以使用API接口對某個使用者進行消息推送
連結:https://juejin.cn/post/7232905822278729783