天天看點

WebSocket原理(響應式)及執行個體

作者:Doker多克

一. WebSocket 簡介

WebSocket 協定 RFC 6455 提供了一個标準化的 在用戶端和伺服器之間建立全雙工雙向通信通道的方法 通過單個 TCP 連接配接。它是與HTTP不同的TCP協定,但旨在 通過 HTTP 工作,使用端口 80 和 443,并允許重用現有防火牆規則。

WebSocket互動以HTTP請求開始,該請求使用HTTPUpgrade标頭來更新,或者在本例中,切換到WebSocket協定。以下示例顯示了這樣的互動:

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080

(1)Upgrade标頭。
(2)Upgrade使用連接配接。           

支援 WebSocket 的伺服器傳回輸出,而不是通常的 200 狀态代碼 類似于以下内容:

HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp

 (1)協定切換           

握手成功後,HTTP 更新請求的 TCP 套接字将保留 為用戶端和伺服器打開以繼續發送和接收消息。

有關 WebSocket 工作原理的完整介紹超出了本文檔的範圍。

請注意,如果 WebSocket 伺服器在 Web 伺服器(例如 nginx)後面運作,則 可能需要将其配置為将 WebSocket 更新請求傳遞到 WebSocket 伺服器。同樣,如果應用程式在雲環境中運作,請檢查 與 WebSocket 支援相關的雲提供商的說明。

1.1. HTTP 與 WebSocket

即使 WebSocket 被設計為與 HTTP 相容并且以 HTTP 請求開始, 重要的是要了解這兩種協定導緻非常不同的 體系結構和應用程式程式設計模型。

在 HTTP 和 REST 中,應用程式被模組化為多個 URL。要與應用程式互動, 用戶端通路這些 URL,請求-響應樣式。伺服器将請求路由到 基于 HTTP URL、方法和标頭的适當處理程式。

相比之下,在 WebSocket 中,初始連接配接通常隻有一個 URL。 随後,所有應用程式消息都在同一 TCP 連接配接上流動。這指向 一種完全不同的異步、事件驅動的消息傳遞體系結構。

WebSocket也是一種低級傳輸協定,與HTTP不同,它沒有規定 消息内容的任何語義。這意味着無法路由或處理 消息,除非用戶端和伺服器在消息語義上達成一緻。

WebSocket用戶端和伺服器可以通過HTTP握手請求上的Sec WebSocket protocol标頭協商使用更進階别的消息傳遞協定(例如STOMP)。如果沒有這一點,他們需要制定自己的慣例。

1.2. 何時使用 WebSocket

WebSockets可以使網頁具有動态性和互動性。但是,在許多情況下, AJAX 和 HTTP 流或長輪詢的組合可以提供簡單和 有效的解決方案。

例如,新聞、郵件和社交源需要動态更新,但可能是 完全可以每隔幾分鐘這樣做一次。協作、遊戲和金融應用, 另一方面,需要更接近實時。

延遲本身并不是決定性因素。如果消息量相對較低(例如, 監控網絡故障)HTTP 流或輪詢可以提供有效的解決方案。 低延遲、高頻和高容量的組合使最佳 使用 WebSocket 的案例。

還請記住,在Internet上,超出您控制範圍的限制性代理可能會阻止WebSocket互動,因為它們未配置為傳遞Upgrade标頭,或者因為它們關閉了看起來空閑的長期連接配接。這意味着,與面向公共的應用程式相比,在防火牆内部應用程式中使用WebSocket是一個更直接的決定。

二、WebSocket API

Spring 架構提供了一個 WebSocket API,您可以使用它來編寫用戶端和 處理 WebSocket 消息的伺服器端應用程式

2.1. Server

要建立WebSocket伺服器,首先可以建立WebSocketHandler。以下示例顯示了如何執行此操作:

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // ...
    }
}

           

然後,您可以将其映射到 URL:

@Configuration
class WebConfig {

    @Bean
    public HandlerMapping handlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/path", new MyWebSocketHandler());
        int order = -1; // before annotated controllers

        return new SimpleUrlHandlerMapping(map, order);
    }
}

           

如果使用WebFlux配置,則無需進一步操作,否則如果不使用WebFlush配置,則需要聲明WebSocketHandlerAdapter,如下所示:

@Configuration
class WebConfig {

    // ...

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

           

2.2.WebSocketHandler

WebSocketHandler的handle方法接受WebSocketSession并傳回Mono<Void>以訓示會話的應用程式處理何時完成。會話通過兩個流處理,一個用于入站消息,另一個用于出站消息。下表描述了處理流的兩種方法:

WebSocketSession方法 描述
Flux<WebSocketMessage> receive() 提供對入站消息流的通路,并在連接配接關閉時完成。
Mono<Void> send(Publisher<WebSocketMessage>) 擷取傳出消息的源,編寫消息,并傳回一個Mono<Void>,該消息在源完成并完成編寫時完成。

WebSocketHandler必須将入站流和出站流組合成一個統一的流,并傳回反映該流完成情況的Mono<Void>。根據應用程式要求,統一流程在以下情況下完成:

  • 入站或出站消息流完成。
  • 入站流完成(即連接配接關閉),而出站流是無限的。
  • 在標明的點上,通過WebSocketSession的close方法。

當入站和出站消息流組合在一起時,無需檢查連接配接是否打開,因為Reactive streams表示結束活動。入站流接收完成或錯誤信号,出站流接收取消信号。

處理程式的最基本實作是處理入站流的實作。以下示例顯示了這樣的實作:

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.receive()            (1)
                .doOnNext(message -> {
                    // ...                  (2)
                })
                .concatMap(message -> {
                    // ...                  (3)
                })
                .then();                    (4)
    }
}           

(1)通路入站消息流。

(2)對每條消息做些什麼。

(3)執行使用消息内容的嵌套異步操作。

(4)傳回接收完成時完成的Mono<Void>。

以下實作組合了入站和出站流:

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {

        Flux<WebSocketMessage> output = session.receive()               (1)
                .doOnNext(message -> {
                    // ...
                })
                .concatMap(message -> {
                    // ...
                })
                .map(value -> session.textMessage("Echo " + value));    (2)

        return session.send(output);                                    (3)
    }
}           

(1)處理入站消息流。

(2)建立出站消息,生成組合流。

(3)傳回在我們繼續接收時未完成的Mono<Void>。

入站和出站流可以是獨立的,并且僅在完成時才加入, 如以下示例所示:

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {

        Mono<Void> input = session.receive()                                (1)
                .doOnNext(message -> {
                    // ...
                })
                .concatMap(message -> {
                    // ...
                })
                .then();

        Flux<String> source = ... ;
        Mono<Void> output = session.send(source.map(session::textMessage)); (2)

        return Mono.zip(input, output).then();                              (3)
    }
}           

(1)處理入站消息流。

(2)發送傳出消息。

(3)加入這些流,并傳回一個Mono<Void>,在任何一個流結束時完成。

2.3.DataBuffer

DataBuffer是WebFlux中位元組緩沖區的表示。參考資料的Spring Core部分在資料緩沖區和編解碼器部分有更多内容。要了解的關鍵點是,在某些伺服器(如Netty)上,位元組緩沖區被合并并進行引用計數,并且在使用時必須釋放,以避免記憶體洩漏。

在Netty上運作時,應用程式必須使用DataBufferUtils.retain(dataBuffer),如果它們希望保留輸入資料緩沖區,以確定它們不會被釋放,然後在緩沖區被消耗時使用DataBufferUtils.release(dataBuffer)。

2.4. Handshake

WebSocketHandlerAdapter委托給WebSocketService。預設情況下,這是HandshakeWebSocketService的一個執行個體,它對WebSocket請求執行基本檢查,然後對正在使用的伺服器使用RequestUpgradeStrategy。目前,有對Reactor Netty、Tomcat、Jetty和Undertow的内置支援。

HandshakeWebSocketService公開了一個sessionAttributePredicate屬性,該屬性允許設定Predicate<String>以從WebSession中提取屬性并将其插入WebSocketSession的屬性中。

2.5.伺服器配置

每個伺服器的RequestUpgradeStrategy公開了特定于底層WebSocket伺服器引擎的配置。使用WebFlux Java配置時,您可以自定義WebFluxConfig相應部分中所示的财産,否則,如果不使用WebFlex配置,請使用以下選項:

@Configuration
class WebConfig {

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter(webSocketService());
    }

    @Bean
    public WebSocketService webSocketService() {
        TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
        strategy.setMaxSessionIdleTimeout(0L);
        return new HandshakeWebSocketService(strategy);
    }
}

           

檢查伺服器的更新政策以檢視可用的選項。現在 隻有Tomcat和Jetty暴露了這樣的選擇。

2.6. CORS

配置CORS并限制對WebSocket端點的通路的最簡單方法是讓WebSocketHandler實作CorsConfigurationSource,并傳回一個具有允許的源、頭和其他詳細資訊的CorsCoonfiguration。如果無法做到這一點,還可以在SimpleUrlHandler上設定corsConfigurations屬性,以按URL模式指定CORS設定。如果同時指定了兩者,則使用CorsConfiguration上的組合方法組合它們。

2.7. 用戶端

Spring WebFlux提供了一個WebSocketClient抽象,其中包含Reactor Netty、Tomcat、Jetty、Undertow和标準Java(即JSR-356)的實作。

要啟動WebSocket會話,可以建立用戶端的執行個體并使用其執行方法:

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
        session.receive()
                .doOnNext(System.out::println)
                .then());

           

有些用戶端(如Jetty)實施生命周期,需要停止并啟動才能使用它們。所有用戶端都具有與底層WebSocket用戶端的配置相關的構造函數選項。

三、開發執行個體

1、引入依賴

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>webjars-locator-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>sockjs-client</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>stomp-websocket</artifactId>
            <version>2.3.3</version>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>bootstrap</artifactId>
            <version>3.3.7</version>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>jquery</artifactId>
            <version>3.1.1-1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>           

2、消息進行模組化

package com.example.messagingstompwebsocket;

public class HelloMessage {

  private String name;

  public HelloMessage() {
  }

  public HelloMessage(String name) {
    this.name = name;
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }
}           

3、建立消息處理控制器

在 Spring 處理 STOMP 消息傳遞的方法中,STOMP 消息可以路由到@Controller類。例如,将 (from ) 映射為處理到目标的消息,如以下清單所示:

package com.example.messagingstompwebsocket;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
import org.springframework.web.util.HtmlUtils;

@Controller
public class GreetingController {


  @MessageMapping("/hello")
  @SendTo("/topic/greetings")
  public Greeting greeting(HelloMessage message) throws Exception {
    Thread.sleep(1000); // simulated delay
    return new Greeting("Hello, " + HtmlUtils.htmlEscape(message.getName()) + "!");
  }

}           

這個控制器簡潔而簡單,但仍在繼續。我們一步一步地将其分解。

@MessageMapping注釋確定,如果消息發送到/hello目标,則調用greeting()方法。

消息的有效負載被綁定到HelloMessage對象,該對象被傳遞到greeting()中。

在内部,該方法的實作通過使線程休眠一秒鐘來模拟處理延遲。這是為了證明,在客戶機發送消息後,伺服器可以花多長時間來異步處理消息。客戶機可以繼續進行它需要做的任何工作,而無需等待響應。

延遲一秒後,greeting()方法建立一個greeting對象并将其傳回。傳回值将廣播給/ttopic/greetings的所有訂閱者,如@SendTo注釋中所指定的。請注意,輸入消息中的名稱是經過淨化的,因為在這種情況下,它将被回顯并在用戶端的浏覽器DOM中重新呈現。

4、為STOMP消息傳遞配置Spring

現在已經建立了服務的基本元件,您可以配置Spring以啟用WebSocket和STOMP消息傳遞。

package com.example.messagingstompwebsocket;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

  @Override
  public void configureMessageBroker(MessageBrokerRegistry config) {
    config.enableSimpleBroker("/topic");
    config.setApplicationDestinationPrefixes("/app");
  }

  @Override
  public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/gs-guide-websocket").withSockJS();
  }

}           

WebSocketConfig用@Configuration注釋,表示它是一個Spring配置類。它還帶有@EnableWebSocketMessageBroker注釋。顧名思義,@EnableWebSocketMessageBroker啟用WebSocket消息處理,并由消息代理支援。

configureMessageBroker()方法實作WebSocketMessageBrokerConfigurer中的預設方法來配置消息代理。它首先調用enableSimpleBroker(),以啟用一個基于記憶體的簡單消息代理,在以/topic為字首的目的地将問候消息傳送回用戶端。它還為綁定到用@MessageMapping注釋的方法的消息指定/app字首。此字首将用于定義所有消息映射。例如,/app/hello是GreetingController.greeting()方法映射到的端點。

registerTompEndpoints()方法注冊/gs-guide-websocket端點,啟用SockJS回退選項,以便在websocket不可用時使用備用傳輸。SockJS用戶端将嘗試連接配接到/gs-guide-websocket并使用最佳可用傳輸(websocket、xhr流、xhr輪詢等)。

大家好,我是Doker品牌的Sinbad,歡迎點贊和評論,您的鼓勵是我們持續更新的動力!更多資料請前往官網:Doker 多克

繼續閱讀