天天看點

Spring Cloud 微服務架構下的 WebSocket 解決方案

WebSocket在現代浏覽器中的應用已經算是比較普遍了,在某些業務場景下,要求必須能夠在伺服器端推送消息至用戶端。在沒有WebSocket的年代,我們使用過dwr,在那個時候dwr真實一個非常棒的方案。但是在WebSocket興起之後,我們更願意使用标準實作來解決問題、

首先交代一下,本篇文章不講解WebSocket的配置,主要講的是針對在微服務架構叢集模式下解決方案的選擇。

微服務架構大家應該都不陌生了,在微服務架構下,服務是分布式的,而且為了保證業務的可用性,每個服務都是以叢集的形式存在。在叢集模式下,要保證叢集的每一個節點的通路得到相同的結果就需要做到資料一緻性,如緩存、session等。

微服務叢集緩存通常使用分布式緩存redis解決,session一緻性也通常會通過redis解決,但是現在更流行的是無狀态的Http,即無session化,最常見的解決方案就是OAuth。

WebSocket有所不同,它是與服務端建立一個長連接配接,在叢集模式下,顯然不可能把前端與服務叢集中的每一個節點建立連接配接,一個可行的思路是像解決http session的共享一樣,通過redis來實作websocket的session共享,但是websocket session的數量是遠多于http session的數量的(因為每打開一個頁面都會建立一個websocket連接配接),是以随着使用者量的增長,共享的資料量太大,很容易造成瓶頸。

另一個思路是,websocket總歸會與叢集中某個節點建立連接配接,那麼,隻要找到連接配接所在的節點,就可以向服務端推送消息了,那麼要解決的問題就是如何找到一個websocket連接配接所在的節點。要找到連接配接在哪個節點上,我們需要一個唯一的辨別符用于尋找連接配接,然而在基于stomp的釋出-訂閱模式下,一個消息的推送可能是面向若幹個連接配接的,可能分布在叢集中的每一個節點上,這樣去尋找連接配接的代價也很高。既然這樣,我們不妨換種思路,每一個websocket消息,我們在叢集的每個節點上都進行推送,訂閱了該消息的連接配接,不管有一個還是一萬個,最終肯定都能收到這個消息。基于這個思路,我們做了一些技術選型:

  • RabbitMQ
  • Spring Cloud Stream

首先說RabbitMQ,進階消息隊列,可以實作消息廣播(當然kafka一樣可以做到,這裡隻介紹一種),另一項技術是Spring Cloud Stream,stream是一個用于建構高度可擴充事件驅動型微服務的架構,并且它可以跟RabbitMQ、Kafka以及其他多種消息服務內建,使用了stream,要把rabbitmq換成kafka隻不過是改改配置的事情。接下來重點介紹使用方法:

引入依賴

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
           

配置Binder

binder是stream中的重要概念,是用于配置用于stream釋出和訂閱事件的消息中間件。先看一段配置:

spring:
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                username: username
                password: password
                virtual-host: /
           

配置中的 defaultRabbit 是binder的名稱,一會會在其他配置中引用,type指定了消息中間件的類型,environment是對消息中間件的配置,這裡的配置結構和spring.rabbitmq命名空間下的配置項一模一樣的,可以參照着進行配置(這樣配置的作用是可以把stream的rabbitmq配置和項目中其他地方使用的rabbitmq區分開,如果這裡不配置environment,binder會沿用spring.rabbitmq命名空間下的配置),比如你的項目中的rabbitmq的配置是這樣的:

spring:
  rabbitmq:
    host: localhost
    username: username
    password: password
    virtual-host: /
           

那上門的binder的environment配置完全可以去掉。

消息流與binder的綁定

微服務要接收揮着釋出事件消息,根據spring cloud stream的名字,顧名思義,需要使用流,是以需要在配置中聲明兩個事件流,一個輸入流,一個輸出流:

spring:
  cloud:
    stream:
      bindings:
        websocketMessageIn:
          destination: websocketMessage
          binder: defaultRabbit
        websocketMessageOut:
          destination: websocketMessage
          binder: defaultRabbit
           

這裡我們看到,事件流引用了binder,表示這兩個流使用rabbitmq這個中間件(看到這裡想必大家已經明白了,在一個項目中完全可以同時使用rabbit和kafka作為事件流的消息中間件)。

websocketMessageIn,websocketMessageOut是事件流的名字(可以自己随便起),destination指定了兩個事件流的destination是同一個,這決定了寫入和讀取是指向同一個地方(不一定是同一個消息隊列)。

事件流聲明

事件流使用接口進行定義:

/**
 * websocket消息事件流接口
 * Created by 吳昊 on 18-11-8.
 *
 * @author 吳昊
 * @since 1.4.3
 */
interface WebSocketMessageStream {
  companion object {
    const val INPUT: String = "webSocketMessageIn"
    const val OUTPUT: String = "webSocketMessageOut"
  }

  /**
   * 輸入
   */
  @Input(INPUT)
  fun input(): SubscribableChannel

  /**
   * 輸出
   */
  @Output(OUTPUT)
  fun output(): MessageChannel
}
           

聲明事件流接口,這裡面定義了兩個常量,分别對應配置中的兩個流名稱,通過調用input()方法擷取輸入流,通過調用output()擷取輸出流。

該接口的實作由spring cloud stream完成,不需要自己實作。

使用事件流

聲明一個bean:

@Component
@EnableBinding(WebSocketMessageStream::class)
class WebSocketMessageService {
……
           

這裡的@EnableBinding 注解指明了事件流接口類,隻有添加了這個注解(要能被Spring識别到,可以加在入口類上,也可以加在@Configuration注解的類上),該接口才會被實作,并且加入到Spring的容器中(可以注入)。

上面WebSocketMessageService的内容如下:

@Autowired
  private lateinit var stream: WebSocketMessageStream
  @Autowired
  private lateinit var template: SimpMessagingTemplate

  @StreamListener(WebSocketMessageStream.INPUT)
  fun messageReceived(message: WebSocketMessage) {
    template.convertAndSend(message.destination, message.body)
  }

  fun send(destination: String, body: Any) {
    stream.output().send(
        MutableMessage(WebSocketMessage(destination, body))
    )
  }
           

接收消息

@StreamListener 注解指明了要監聽的事件流,方法接收的參數即事件的消息内容(使用jackson反序列化),這裡的messageReceived方法直接将接收到的消息直接用websocket發送給前端

發送消息

同樣,發送也很簡單,将消息直接發送到輸入流中,上面的send方法即是将原本應該用SimpMessagingTemplate發送給websocket的消息發送到spring cloud stream的事件流中。這樣做以後,項目中所有需要向前端推送webSocket消息的操作都應該調用send方法來進行。

講到這裡大家可能還有點糊塗,也有一些疑問,為什麼這樣每個微服務節點就能收到事件消息了?或者單個節點接收事件消息和多個節點接收的配置是怎麼控制的。各位不要着急,待我慢慢道來,接下來就要結合rabbit的知識來講解

了:

首先看一下rabbit的消息隊列:

Spring Cloud 微服務架構下的 WebSocket 解決方案

從圖中看到,存在多個以webSocketMessage開頭的隊列,這是每一個微服務節點建立了一個消息隊列,再來看exchange:

Spring Cloud 微服務架構下的 WebSocket 解決方案

exchange綁定的消息隊列

Spring Cloud 微服務架構下的 WebSocket 解決方案

這裡的exchange名稱和上面消息隊列的名稱字首均是webSocketMessage, 這個都是**由前面的binding配置中的destination指定的,和destination名稱保持一緻**

當應用向輸入流中寫入事件時,使用destination作為key(即webSocketMessage),将消息寫入名為webSocketMessage的exchange,由于exchange綁定的消息隊列字首均為webSocketMessage且routing key都是#,是以exchange會将消息路由到每一個webSocketMessage開頭的消息隊列上(這裡涉及到rabbitmq的知識點,如過不懂請自行查閱資料),這樣每一個微服務都能接收到相同的消息。

我們再來看前面提出的問題,這樣的配置可以把消息推送到每一個微服務節點,那麼如果需要一個消息隻被一個節點接收,該怎麼配置呢?很簡單,一個配置項就可以搞定:

spring:
  cloud:
    stream:
      bindings:
        websocketMessageIn:
          group: test
          destination: websocketMessage
          binder: defaultRabbit
           

可以看到,相比前面的配置,僅僅多了一個group的配置,這樣配置之後,rabbitmq會生成一個名為websocketMessage.test的消息隊列(前面講到的每個微服務建立的消息隊列是自動删除的,即微服務斷開連接配接後消息隊列就被删除,而這個消息隊列是持久化的,也就是即使所有的微服務節點全部斷開連接配接也不會被删除),所有的微服務節點監聽這一個隊列,當隊列中有消息時,隻會被一個節點消費。

要講的内容到此結束,spring cloud stream的配置遠不止這些,但是這些配置已足夠完成我所需要做的事情,其他的配置請參考spring cloud stream官方文檔:

http://cloud.spring.io/spring-cloud-static/spring-cloud-stream/Fishtown.RC2/single/spring-cloud-stream.html

繼續閱讀