背景
單個執行個體不多說,正常使用websocket肯定是沒有問題的,對于多個執行個體的情況下,我們都知道Session是不共享的,若是存在使用者A連接配接服務1,而使用者B推送消息到服務2,那麼,使用者A如何接受使用者B的消息呢?
解決方案
- websocket配置
@Configuration
public class WebSocketConfig {
//tomcat啟動無需該配置
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
- websocketService
@ServerEndpoint("/websocket")
@Component
public class WebSocketServer {
@OnOpen
public void onOpen(Session session) {
// 連接配接建立時觸發
WebSocketSessionManager.add(session);
}
@OnMessage
public void onMessage(String message, Session session) {
// 用戶端發送消息時觸發
// 将消息釋出到RabbitMQ中
WebSocketMessageQueue.sendMessage(message);
}
@OnClose
public void onClose(Session session) {
// 連接配接關閉時觸發
WebSocketSessionManager.remove(session);
}
@OnError
public void onError(Throwable e) {
// 發生錯誤時觸發
e.printStackTrace();
}
}
- RabbitMQ配置
@Configuration
public class RabbitConfig {
@Bean
public Queue websocketQueue() throws SocketException {
// ip + 端口 為隊列名 【這裡可以自行設定,隻要是該服務的唯一辨別即可,因為采用的廣播模式】
String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort();
return new Queue("ps_" + ip);
}
}
- RabbitMQ消費端
@Component
public class WebSocketMessageConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "#{websocketQueue.name}", durable = "true"),
exchange = @Exchange(name = EXCHANGE, type = "fanout")))
@RabbitHandler
public void onMessage(String message) {
// 接收到消息時觸發
WebSocketSessionManager.broadcast(message);
}
}
在WebSocket添加消息的接收方法,@RabbitListener 接收消息,隊列名稱使用常量命名,動态隊列名稱使用 #{name},其中的name是Queue的bean 名稱:
- websocket會話管理
@Component
public class WebSocketSessionManager {
private static final Logger logger = LoggerFactory.getLogger(WebSocketSessionManager.class);
private static Map<String, Session> sessionMap = new ConcurrentHashMap<>();
public static void add(Session session) {
sessionMap.put(session.getId(), session);
logger.info("WebSocket Session {} added", session.getId());
}
public static void remove(Session session) {
sessionMap.remove(session.getId());
logger.info("WebSocket Session {} removed", session.getId());
}
public static void broadcast(String message) {
for (Map.Entry<String, Session> entry : sessionMap.entrySet()) {
Session session = entry.getValue();
if (session.isOpen()) {
// 将消息發送到所有WebSocket連接配接
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
} else {
// 移除已關閉的WebSocket連接配接
sessionMap.remove(entry.getKey());
logger.info("WebSocket Session {} removed", entry.getKey());
}
}
}
}
- websocket消息發送
@Component
public class WebSocketMessageQueue {
private static final Logger logger = LoggerFactory.getLogger(WebSocketMessageQueue.class);
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
try {
// 發送消息到RabbitMQ隊列中
rabbitTemplate.convertAndSend("websocket-messages", "", message);
} catch (AmqpException e) {
logger.error("Error occurred while sending message to RabbitMQ: {}", e.getMessage());
}
}
}
注意:
使用rabbitmq,根據自己的業務的要求情況下,幂等、确認機制、重試機制等的考慮。