天天看點

Websocket叢集解決方案

作者:小碼code

最近在項目中在做一個消息推送的功能,比如客戶下單之後通知給給對應的客戶發送系統通知,這種消息推送需要使用到全雙工的websocket推送消息。

所謂的全雙工表示用戶端和服務端都能向對方發送消息。不使用同樣是全雙工的http是因為http隻能由用戶端主動發起請求,服務接收後傳回消息。websocket建立起連接配接之後,用戶端和服務端都能主動向對方發送消息。

上一篇文章Spring Boot 整合單機websocket介紹了websocket在單機模式下進行消息的發送和接收:

Websocket叢集解決方案

使用者A和使用者B和web伺服器建立連接配接之後,使用者A發送一條消息到伺服器,伺服器再推送給使用者B,在單機系統上所有的使用者都和同一個伺服器建立連接配接,所有的session都存儲在同一個伺服器中。

單個伺服器是無法支撐幾萬人同時連接配接同一個伺服器,需要使用到分布式或者叢集将請求連接配接負載均衡到到不同的服務下。消息的發送方和接收方在同一個伺服器,這就和單體伺服器類似,能成功接收到消息:

Websocket叢集解決方案

但負載均衡使用輪詢的算法,無法保證消息發送方和接收方處于同一個伺服器,當發送方和接收方不是在同一個伺服器時,接收方是無法接受到消息的:

Websocket叢集解決方案

websocket叢集問題解決思路

用戶端和服務端每次建立連接配接時候,會建立有狀态的會話session,伺服器的儲存維持連接配接的session。用戶端每次隻能和叢集伺服器其中的一個伺服器連接配接,後續也是和該伺服器進行資料傳輸。

要解決叢集的問題,應該考慮session共享的問題,用戶端成功連接配接伺服器之後,其他伺服器也知道用戶端連接配接成功。

方案一:session 共享(不可行)

和websocket類似的http是如何解決叢集問題的?解決方案之一就是共享session,用戶端登入服務端之後,将session資訊存儲在Redis資料庫中,連接配接其他伺服器時,從Redis擷取session,實際就是将session資訊存儲在Redis中,實作redis的共享。

session可以被共享的前提是可以被序列化,而websocket的session是無法被序列化的,http的session記錄的是請求的資料,而websocket的session對應的是連接配接,連接配接到不同的伺服器,session也不同,無法被序列化。

方案二:ip hash(不可行)

http不使用session共享,就可以使用Nginx負載均衡的ip hash算法,用戶端每次都是請求同一個伺服器,用戶端的session都儲存在伺服器上,而後續請求都是請求該伺服器,都能擷取到session,就不存在分布式session問題了。

websocket相對http來說,可以由服務端主動推動消息給用戶端,如果接收消息的服務端和發送消息消息的服務端不是同一個服務端,發送消息的服務端無法找到接收消息對應的session,即兩個session不處于同一個服務端,也就無法推送消息。如下圖所示:

Websocket叢集解決方案
解決問題的方法是将所有消息的發送方和接收方都處于同一個伺服器下,而消息發送方和接收方都是不确定的,顯然是無法實作的。

方案三:廣播模式

将消息的發送方和接收方都處于同一個伺服器下才能發送消息,那麼可以轉換一下思路,可以将消息以消息廣播的方式通知給所有的伺服器,可以使用消息中間件釋出訂閱模式,消息脫離了伺服器的限制,通過發送到中間件,再發送給訂閱的伺服器,類似廣播一樣,隻要訂閱了消息,都能接收到消息的通知:

Websocket叢集解決方案

釋出者釋出消息到消息中間件,消息中間件再将發送給所有訂閱者:

Websocket叢集解決方案

廣播模式的實作

搭建單機 websocket

參考以前寫的websocket單機搭建 文章,先搭建單機websocket實作消息的推送。

1. 添加依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
           

2. 建立 ServerEndpointExporter 的 bean 執行個體

ServerEndpointExporter 的 bean 執行個體自動注冊 @ServerEndpoint 注解聲明的 websocket endpoint,使用springboot自帶tomcat啟動需要該配置,使用獨立 tomcat 則不需要該配置。

@Configuration
public class WebSocketConfig {
    //tomcat啟動無需該配置
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
           

3. 建立服務端點 ServerEndpoint 和 用戶端端

  • 服務端點
@Component
@ServerEndpoint(value = "/message")
@Slf4j
public class WebSocket {

 private static Map<String, WebSocket> webSocketSet = new ConcurrentHashMap<>();

 private Session session;

 @OnOpen
 public void onOpen(Session session) throws SocketException {
  this.session = session;
  webSocketSet.put(this.session.getId(),this);

  log.info("【websocket】有新的連接配接,總數:{}",webSocketSet.size());
 }

 @OnClose
 public void onClose(){
  String id = this.session.getId();
  if (id != null){
   webSocketSet.remove(id);
   log.info("【websocket】連接配接斷開:總數:{}",webSocketSet.size());
  }
 }

 @OnMessage
 public void onMessage(String message){
  if (!message.equals("ping")){
   log.info("【wesocket】收到用戶端發送的消息,message={}",message);
   sendMessage(message);
  }
 }

 /**
  * 發送消息
  * @param message
  * @return
  */
 public void sendMessage(String message){
  for (WebSocket webSocket : webSocketSet.values()) {
   webSocket.session.getAsyncRemote().sendText(message);
  }
  log.info("【wesocket】發送消息,message={}", message);

 }

}
           
  • 用戶端點
<div>
    <input type="text" name="message" id="message">
    <button id="sendBtn">發送</button>
</div>
<div style="width:100px;height: 500px;" id="content">
</div>
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.js"></script>
<script type="text/javascript">
    var ws = new WebSocket("ws://127.0.0.1:8080/message");
    ws.onopen = function(evt) {
        console.log("Connection open ...");
    };

    ws.onmessage = function(evt) {
        console.log( "Received Message: " + evt.data);
        var p = $("<p>"+evt.data+"</p>")
        $("#content").prepend(p);
        $("#message").val("");
    };

    ws.onclose = function(evt) {
        console.log("Connection closed.");
    };

    $("#sendBtn").click(function(){
        var aa = $("#message").val();
        ws.send(aa);
    })

</script>
           

服務端和用戶端中的OnOpen、onclose、onmessage都是一一對應的。

  • 服務啟動後,用戶端ws.onopen調用服務端的@OnOpen注解的方法,儲存用戶端的session資訊,握手建立連接配接。
  • 用戶端調用ws.send發送消息,對應服務端的@OnMessage注解下面的方法接收消息。
  • 服務端調用session.getAsyncRemote().sendText發送消息,對應的用戶端ws.onmessage接收消息。

添加 controller

@GetMapping({"","index.html"})
public ModelAndView index() {
 ModelAndView view = new ModelAndView("index");
 return view;
}
           

效果展示

打開兩個用戶端,其中的一個用戶端發送消息,另一個用戶端也能接收到消息。

Websocket叢集解決方案

添加 RabbitMQ 中間件

這裡使用比較常用的RabbitMQ作為消息中間件,而RabbitMQ支援釋出訂閱模式:

Websocket叢集解決方案

添加消息訂閱

交換機使用扇形交換機,消息分發給每一條綁定該交換機的隊列。以伺服器所在的IP + 端口作為唯一辨別作為隊列的命名,啟動一個服務,使用隊列綁定交換機,實作消息的訂閱:

@Configuration
public class RabbitConfig {

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE");
    }

    @Bean
    public Queue psQueue() throws SocketException {
        // ip + 端口 為隊列名 
        String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort();
        return new Queue("ps_" + ip);
    }

    @Bean
    public Binding routingFirstBinding() throws SocketException {
        return BindingBuilder.bind(psQueue()).to(fanoutExchange());
    }
}

           
擷取伺服器IP和端口可以具體檢視Github源碼,這裡就不做較長的描述了。

修改服務端點 ServerEndpoint

在WebSocket添加消息的接收方法,@RabbitListener 接收消息,隊列名稱使用常量命名,動态隊列名稱使用 #{name},其中的name是Queue的bean 名稱:

@RabbitListener(queues= "#{psQueue.name}")
public void pubsubQueueFirst(String message) {
  System.out.println(message);
  sendMessage(message);
}
           

然後再調用sendMessage方法發送給所在連接配接的用戶端。

修改消息發送

在WebSocket類的onMessage方法将消息發送改成RabbitMQ方式發送:

@OnMessage
public void onMessage(String message){
  if (!message.equals("ping")){
    log.info("【wesocket】收到用戶端發送的消息,message={}",message);
    //sendMessage(message);
    if (rabbitTemplate == null) {
      rabbitTemplate = (RabbitTemplate) SpringContextUtil.getBean("rabbitTemplate");
    }
    rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, message);
  }
}
           

消息通知流程如下所示:

Websocket叢集解決方案

啟動兩個執行個體,模拟叢集環境

打開idea的Edit Configurations:

Websocket叢集解決方案

點選左上角的COPY,然後添加端口server.port=8081:

Websocket叢集解決方案

啟動兩個服務,端口分别是8080和8081。在啟動8081端口的服務,将前端連接配接端口改成8081:

var ws = new WebSocket("ws://127.0.0.1:8081/message");
           

效果展示

Websocket叢集解決方案

源碼

github源碼

參考

  • Spring Websocket in a tomcat cluster
  • WebSocket 叢集方案

繼續閱讀