天天看點

springboot+rabbitmq+websocket叢集推送解決方案

作者:奇妙的呆二木

背景

單個執行個體不多說,正常使用websocket肯定是沒有問題的,對于多個執行個體的情況下,我們都知道Session是不共享的,若是存在使用者A連接配接服務1,而使用者B推送消息到服務2,那麼,使用者A如何接受使用者B的消息呢?

解決方案

  • websocket配置
@Configuration
public class WebSocketConfig {
    //tomcat啟動無需該配置
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}           
  • websocketService
@ServerEndpoint("/websocket")
@Component
public class WebSocketServer {

    @OnOpen
    public void onOpen(Session session) {
        // 連接配接建立時觸發
        WebSocketSessionManager.add(session);
    }

    @OnMessage
    public void onMessage(String message, Session session) {
        // 用戶端發送消息時觸發
        // 将消息釋出到RabbitMQ中
        WebSocketMessageQueue.sendMessage(message);
    }

    @OnClose
    public void onClose(Session session) {
        // 連接配接關閉時觸發
        WebSocketSessionManager.remove(session);
    }

    @OnError
    public void onError(Throwable e) {
        // 發生錯誤時觸發
        e.printStackTrace();
    }
}           
  • RabbitMQ配置
@Configuration
public class RabbitConfig {

    @Bean
    public Queue websocketQueue() throws SocketException {
        // ip + 端口 為隊列名  【這裡可以自行設定,隻要是該服務的唯一辨別即可,因為采用的廣播模式】
        String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort();
        return new Queue("ps_" + ip);
    }
}

           
  • RabbitMQ消費端
@Component
public class WebSocketMessageConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "#{websocketQueue.name}", durable = "true"),
            exchange = @Exchange(name = EXCHANGE, type = "fanout")))
    @RabbitHandler
    public void onMessage(String message) {
        // 接收到消息時觸發
        WebSocketSessionManager.broadcast(message);
    }
}           
在WebSocket添加消息的接收方法,@RabbitListener 接收消息,隊列名稱使用常量命名,動态隊列名稱使用 #{name},其中的name是Queue的bean 名稱:
  • websocket會話管理
@Component
public class WebSocketSessionManager {

    private static final Logger logger = LoggerFactory.getLogger(WebSocketSessionManager.class);

    private static Map<String, Session> sessionMap = new ConcurrentHashMap<>();

    public static void add(Session session) {
        sessionMap.put(session.getId(), session);
        logger.info("WebSocket Session {} added", session.getId());
    }

    public static void remove(Session session) {
        sessionMap.remove(session.getId());
        logger.info("WebSocket Session {} removed", session.getId());
    }

    public static void broadcast(String message) {
        for (Map.Entry<String, Session> entry : sessionMap.entrySet()) {
            Session session = entry.getValue();
            if (session.isOpen()) {
                // 将消息發送到所有WebSocket連接配接
                try {
                    session.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            } else {
                // 移除已關閉的WebSocket連接配接
                sessionMap.remove(entry.getKey());
                logger.info("WebSocket Session {} removed", entry.getKey());
            }
        }
    }
}           
  • websocket消息發送
@Component
public class WebSocketMessageQueue {

    private static final Logger logger = LoggerFactory.getLogger(WebSocketMessageQueue.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        try {
            // 發送消息到RabbitMQ隊列中
            rabbitTemplate.convertAndSend("websocket-messages", "", message);
        } catch (AmqpException e) {
            logger.error("Error occurred while sending message to RabbitMQ: {}", e.getMessage());
        }
    }
}           

注意:

使用rabbitmq,根據自己的業務的要求情況下,幂等、确認機制、重試機制等的考慮。

繼續閱讀