天天看點

WebSocket 分布式叢集怎麼搞?

問題起因

最近做項目時遇到了需要多使用者之間通信的問題,涉及到了WebSocket握手請求,以及叢集中WebSocket Session共享的問題。

期間我經過了幾天的研究,總結出了幾個實作分布式WebSocket叢集的辦法,從zuul到spring cloud gateway的不同嘗試,總結出了這篇文章,希望能幫助到某些人,并且能一起分享這方面的想法與研究。

以下是我的場景描述

資源:4台伺服器。其中隻有一台伺服器具備ssl認證域名,一台redis+mysql伺服器,兩台應用伺服器(叢集)

應用釋出限制條件:由于場景需要,應用場所需要ssl認證的域名才能釋出。是以ssl認證的域名伺服器用來當api網關,負責https請求與wss(安全認證的ws)連接配接。俗稱https解除安裝,使用者請求https域名伺服器,但真實通路到的是http+ip位址的形式。隻要網關配置高,能handle多個應用

需求:使用者登入應用,需要與伺服器建立wss連接配接,不同角色之間可以單發消息,也可以群發消息

叢集中的應用服務類型:每個叢集執行個體都負責http無狀态請求服務與ws長連接配接服務

系統架構圖

WebSocket 分布式叢集怎麼搞?

在我的實作裡,每個應用伺服器都負責http and ws請求,其實也可以将ws請求建立的聊天模型單獨成立為一個子產品。從分布式的角度來看,這兩種實作類型差不多,但從實作友善性來說,一個應用服務http+ws請求的方式更為友善。下文會有解釋

本文涉及的技術棧

Eureka 服務發現與注冊

Redis Session共享

Redis 消息訂閱

Spring Boot

Zuul 網關

Spring Cloud Gateway 網關

Spring WebSocket 處理長連接配接

Ribbon 負載均衡

Netty 多協定NIO網絡通信架構

Consistent Hash 一緻性雜湊演算法

相信能走到這一步的人都了解過我上面列舉的技術棧了,如果還沒有,可以先去網上找找入門教程了解一下。下面的内容都與上述技術相關,題主預設大家都了解過了...

技術可行性分析

下面我将描述session特性,以及根據這些特性列舉出n個解決分布式架構中處理ws請求的叢集方案

WebSocketSession與HttpSession

在Spring所內建的WebSocket裡面,每個ws連接配接都有一個對應的session:WebSocketSession,在Spring WebSocket中,我們建立ws連接配接之後可以通過類似這樣的方式進行與用戶端的通信:

protected void handleTextMessage(WebSocketSession session, TextMessage message) {
   System.out.println("伺服器接收到的消息: "+ message );
   //send message to client
   session.sendMessage(new TextMessage("message"));
}      

那麼問題來了:ws的session無法序列化到redis,是以在叢集中,我們無法将所有WebSocketSession都緩存到redis進行session共享。每台伺服器都有各自的session。于此相反的是HttpSession,redis可以支援httpsession共享,但是目前沒有websocket session共享的方案,是以走redis websocket session共享這條路是行不通的。

有的人可能會想:我可不可以将sessin關鍵資訊緩存到redis,叢集中的伺服器從redis拿取session關鍵資訊然後重新建構websocket session...我隻想說這種方法如果有人能試出來,請告訴我一聲...

以上便是websocket session與http session共享的差別,總的來說就是http session共享已經有解決方案了,而且很簡單,隻要引入相關依賴:spring-session-data-redis和spring-boot-starter-redis,大家可以從網上找個demo玩一下就知道怎麼做了。而websocket session共享的方案由于websocket底層實作的方式,我們無法做到真正的websocket session共享。

解決方案的演變

Netty與Spring WebSocket

剛開始的時候,我嘗試着用netty實作了websocket服務端的搭建。在netty裡面,并沒有websocket session這樣的概念,與其類似的是channel,每一個用戶端連接配接都代表一個channel。前端的ws請求通過netty監聽的端口,走websocket協定進行ws握手連接配接之後,通過一些列的handler(責鍊模式)進行消息處理。與websocket session類似地,服務端在連接配接建立後有一個channel,我們可以通過channel進行與用戶端的通信

/**
* TODO 根據伺服器傳進來的id,配置設定到不同的group
*/
private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);

@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
   //retain增加引用計數,防止接下來的調用引用失效
   System.out.println("伺服器接收到來自 " + ctx.channel().id() + " 的消息: " + msg.text());
   //将消息發送給group裡面的所有channel,也就是發送消息給用戶端
   GROUP.writeAndFlush(msg.retain());
}      

那麼,服務端用netty還是用spring websocket?以下我将從幾個方面列舉這兩種實作方式的優缺點

使用netty實作websocket

玩過netty的人都知道netty是的線程模型是nio模型,并發量非常高,spring5之前的網絡線程模型是servlet實作的,而servlet不是nio模型,是以在spring5之後,spring的底層網絡實作采用了netty。如果我們單獨使用netty來開發websocket服務端,速度快是絕對的,但是可能會遇到下列問題:

與系統的其他應用內建不友善,在rpc調用的時候,無法享受springcloud裡feign服務調用的便利性

業務邏輯可能要重複實作

使用netty可能需要重複造輪子

怎麼連接配接上服務注冊中心,也是一件麻煩的事情

restful服務與ws服務需要分開實作,如果在netty上實作restful服務,有多麻煩可想而知,用spring一站式restful開發相信很多人都習慣了。

使用spring websocket實作ws服務

spring websocket已經被springboot很好地內建了,是以在springboot上開發ws服務非常友善,做法非常簡單。

Spring Boot 基礎就不介紹了,推薦下這個實戰教程:

https://github.com/javastacks/spring-boot-best-practice

第一步:添加依賴

WebSocket 分布式叢集怎麼搞?
@Component
@SuppressWarnings("unchecked")
public class MessageHandler extends TextWebSocketHandler {
   private List<WebSocketSession> clients = new ArrayList<>();

   @Override
   public void afterConnectionEstablished(WebSocketSession session) {
       clients.add(session);
       System.out.println("uri :" + session.getUri());
       System.out.println("連接配接建立: " + session.getId());
       System.out.println("current seesion: " + clients.size());
   }

   @Override
   public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
       clients.remove(session);
       System.out.println("斷開連接配接: " + session.getId());
   }

   @Override
   protected void handleTextMessage(WebSocketSession session, TextMessage message) {
       String payload = message.getPayload();
       Map<String, String> map = JSONObject.parseObject(payload, HashMap.class);
       System.out.println("接受到的資料" + map);
       clients.forEach(s -> {
           try {
               System.out.println("發送消息給: " + session.getId());
               s.sendMessage(new TextMessage("伺服器傳回收到的資訊," + payload));
           } catch (Exception e) {
               e.printStackTrace();
           }
       });
   }
}      

從這個demo中,使用spring websocket實作ws服務的便利性大家可想而知了。為了能更好地向spring cloud大家族看齊,我最終采用了spring websocket實作ws服務。

是以我的應用服務架構是這樣子的:一個應用既負責restful服務,也負責ws服務。沒有将ws服務子產品拆分是因為拆分出去要使用feign來進行服務調用。第一本人比較懶惰,第二拆分與不拆分相差在多了一層服務間的io調用,是以就沒有這麼做了。

從zuul技術轉型到spring cloud gateway

要實作websocket叢集,我們必不可免地得從zuul轉型到spring cloud gateway。原因如下:

zuul1.0版本不支援websocket轉發,zuul 2.0開始支援websocket,zuul2.0幾個月前開源了,但是2.0版本沒有被spring boot內建,而且文檔不健全。是以轉型是必須的,同時轉型也很容易實作。

在gateway中,為了實作ssl認證和動态路由負載均衡,yml檔案中以下的某些配置是必須的,在這裡提前避免大家采坑。

server:
  port: 443
  ssl:
    enabled: true
    key-store: classpath:xxx.jks
    key-store-password: xxxx
    key-store-type: JKS
    key-alias: alias
spring:
  application:
    name: api-gateway
  cloud:
    gateway:
      httpclient:
        ssl:
          handshake-timeout-millis: 10000
          close-notify-flush-timeout-millis: 3000
          close-notify-read-timeout-millis: 0
          useInsecureTrustManager: true
      discovery:
        locator:
          enabled: true
          lower-case-service-id: true
      routes:
      - id: dc
        uri: lb://dc
        predicates:
        - Path=/dc/**
      - id: wecheck
        uri: lb://wecheck
        predicates:
        - Path=/wecheck/**      

如果要愉快地玩https解除安裝,我們還需要配置一個filter,否則請求網關時會出現錯誤not an SSL/TLS record

@Component
public class HttpsToHttpFilter implements GlobalFilter, Ordered {
  private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099;
  @Override
  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
      URI originalUri = exchange.getRequest().getURI();
      ServerHttpRequest request = exchange.getRequest();
      ServerHttpRequest.Builder mutate = request.mutate();
      String forwardedUri = request.getURI().toString();
      if (forwardedUri != null && forwardedUri.startsWith("https")) {
          try {
              URI mutatedUri = new URI("http",
                      originalUri.getUserInfo(),
                      originalUri.getHost(),
                      originalUri.getPort(),
                      originalUri.getPath(),
                      originalUri.getQuery(),
                      originalUri.getFragment());
              mutate.uri(mutatedUri);
          } catch (Exception e) {
              throw new IllegalStateException(e.getMessage(), e);
          }
      }
      ServerHttpRequest build = mutate.build();
      ServerWebExchange webExchange = exchange.mutate().request(build).build();
      return chain.filter(webExchange);
  }

  @Override
  public int getOrder() {
      return HTTPS_TO_HTTP_FILTER_ORDER;
  }
}      

這樣子我們就可以使用gateway來解除安裝https請求了,到目前為止,我們的基本架構已經搭建完畢,網關既可以轉發https請求,也可以轉發wss請求。接下來就是使用者多對多之間session互通的通訊解決方案了。接下來,我将根據方案的優雅性,從最不優雅的方案開始講起。

session廣播

這是最簡單的websocket叢集通訊解決方案。場景如下:

教師A想要群發消息給他的學生們

教師的消息請求發給網關,内容包含{我是教師A,我想把xxx消息發送我的學生們}

網關接收到消息,擷取叢集所有ip位址,逐個調用教師的請求

叢集中的每台伺服器擷取請求,根據教師A的資訊查找本地有沒有與學生關聯的session,有則調用sendMessage方法,沒有則忽略請求

WebSocket 分布式叢集怎麼搞?

session廣播實作很簡單,但是有一個緻命缺陷:計算力浪費現象,當伺服器沒有消息接收者session的時候,相當于浪費了一次循環周遊的計算力,該方案在并發需求不高的情況下可以優先考慮,實作很容易。

spring cloud中擷取服務叢集中每台伺服器資訊的方法如下

@Resource

private EurekaClient eurekaClient;

Application app = eurekaClient.getApplication("service-name");

//instanceInfo包括了一台伺服器ip,port等消息

InstanceInfo instanceInfo = app.getInstances().get(0);

System.out.println("ip address: " + instanceInfo.getIPAddr());

伺服器需要維護關系映射表,将使用者的id與session做映射,session建立時在映射表中添加映射關系,session斷開後要删除映射表内關聯關系

一緻性雜湊演算法實作(本文的要點)

這種方法是本人認為最優雅的實作方案,了解這種方案需要一定的時間,如果你耐心看下去,相信你一定會有所收獲。再強調一次,不了解一緻性雜湊演算法的同學請先看這裡,現先假設哈希環是順時針查找的。

首先,想要将一緻性雜湊演算法的思想應用到我們的websocket叢集,我們需要解決以下新問題:

叢集節點DOWN,會影響到哈希環映射到狀态是DOWN的節點。

叢集節點UP,會影響到舊key映射不到對應的節點。

哈希環讀寫共享。

在叢集中,總會出現服務UP/DOWN的問題。

針對節點DOWN的問題分析如下:

一個伺服器DOWN的時候,其擁有的websocket session會自動關閉連接配接,并且前端會收到通知。此時會影響到哈希環的映射錯誤。我們隻需要當監聽到伺服器DOWN的時候,删除哈希環上面對應的實際結點和虛結點,避免讓網關轉發到狀态是DOWN的伺服器上。

實作方法:在eureka治理中心監聽叢集服務DOWN事件,并及時更新哈希環。

針對節點UP的問題分析如下:

現假設叢集中有服務 CacheB上線了,該伺服器的ip位址剛好被映射到key1和 cacheA之間。那麼key1對應的使用者每次要發消息時都跑去 CacheB發送消息,結果明顯是發送不了消息,因為 CacheB沒有key1對應的session。

WebSocket 分布式叢集怎麼搞?

此時我們有兩種解決方案。

方案A簡單,動作大:

eureka監聽到節點UP事件之後,根據現有叢集資訊,更新哈希環。并且斷開所有session連接配接,讓用戶端重新連接配接,此時用戶端會連接配接到更新後的哈希環節點,以此避免消息無法送達的情況。

方案B複雜,動作小:

我們先看看沒有虛拟節點的情況,假設 CacheC和 CacheA之間上線了伺服器 CacheB。所有映射在 CacheC到 CacheB的使用者發消息時都會去 CacheB裡面找session發消息。也就是說 CacheB一但上線,便會影響到 CacheC到 CacheB之間的使用者發送消息。是以我們隻需要将 CacheA斷開 CacheC到 CacheB的使用者所對應的session,讓用戶端重連。

WebSocket 分布式叢集怎麼搞?

接下來是有虛拟節點的情況,假設淺色的節點是虛拟節點。我們用長括号來代表某段區域映射的結果屬于某個 Cache。首先是C節點未上線的情況。圖大家應該都懂吧,所有B的虛拟節點都會指向真實的B節點,是以所有B節點逆時針那一部分都會映射到B(因為我們規定哈希環順時針查找)。

WebSocket 分布式叢集怎麼搞?

由以上情況我們可以知道:節點上線,會有許多對應虛拟節點也同時上線,是以我們需要将多段範圍key對應的session斷開連接配接(上圖紅色的部分)。具體算法有點複雜,實作的方式因人而異,大家可以嘗試一下自己實作算法。

哈希環應該放在哪裡?

gateway本地建立并維護哈希環。當ws請求進來的時候,本地擷取哈希環并擷取映射伺服器資訊,轉發ws請求。這種方法看上去不錯,但實際上是不太可取的,回想一下上面伺服器DOWN的時候隻能通過eureka監聽,那麼eureka監聽到DOWN事件之後,需要通過io來通知gateway删除對應節點嗎?顯然太麻煩了,将eureka的職責分散到gateway,不建議這麼做。

eureka建立,并放到redis共享讀寫。這個方案可行,當eureka監聽到服務DOWN的時候,修改哈希環并推送到redis上。為了請求響應時間盡量地短,我們不可以讓gateway每次轉發ws請求的時候都去redis取一次哈希環。哈希環修改的機率的确很低,gateway隻需要應用redis的消息訂閱模式,訂閱哈希環修改事件便可以解決此問題。

至此我們的spring websocket叢集已經搭建的差不多了,最重要的地方還是一緻性雜湊演算法。現在有最後一個技術瓶頸,網關如何根據ws請求轉發到指定的叢集伺服器上?

答案在負載均衡。spring cloud gateway或zuul都預設內建了ribbon作為負載均衡,我們隻需要根據建立ws請求時用戶端發來的user id,重寫ribbon負載均衡算法,根據user id進行hash,并在哈希環上尋找ip,并将ws請求轉發到該ip便完事了。流程如下圖所示:

WebSocket 分布式叢集怎麼搞?

接下來使用者溝通的時候,隻需要根據id進行hash,在哈希環上擷取對應ip,便可以知道與該使用者建立ws連接配接時的session存在哪台伺服器上了!

spring cloud Finchley.RELEASE 版本中ribbon未完善的地方

題主在實際操作的時候發現了ribbon兩個不完善的地方......

根據網上找的方法,繼承AbstractLoadBalancerRule重寫負載均衡政策之後,多個不同應用的請求變得混亂。假如eureka上有兩個service A和B,重寫負載均衡政策之後,請求A或B的服務,最終隻會映射到其中一個服務上。非常奇怪!可能spring cloud gateway官網需要給出一個正确的重寫負載均衡政策的demo。

一緻性雜湊演算法需要一個key,類似user id,根據key進行hash之後在哈希環上搜尋并傳回ip。但是ribbon沒有完善choose函數的key參數,直接寫死了default!

WebSocket 分布式叢集怎麼搞?

難道這樣子我們就沒有辦法了嗎?其實還有一個可行并且暫時可替代的辦法!

如下圖所示,用戶端發送一個普通的http請求(包含id參數)給網關,網關根據id進行hash,在哈希環中尋找ip位址,将ip位址傳回給用戶端,用戶端再根據該ip位址進行ws請求。

WebSocket 分布式叢集怎麼搞?

由于ribbon未完善key的處理,我們暫時無法在ribbon上實作一緻性雜湊演算法。隻能間接地通過用戶端發起兩次請求(一次http,一次ws)的方式來實作一緻性哈希。希望不久之後ribbon能更新這個缺陷!讓我們的websocket叢集實作得更優雅一點。

後記

以上便是我這幾天探索的結果。期間遇到了許多問題,并逐一解決難題,列出兩個websocket叢集解決方案。第一個是session廣播,第二個是一緻性哈希。

這兩種方案針對不同場景各有優缺點,本文并未用到ActiveMQ,Karfa等消息隊列實作消息推送,隻是想通過自己的想法,不依靠消息隊列來簡單地實作多使用者之間的長連接配接通訊。希望能為大家提供一條不同于尋常的思路。