天天看点

WebSocket原理(响应式)及实例

作者:Doker多克

一. WebSocket 简介

WebSocket 协议 RFC 6455 提供了一个标准化的 在客户端和服务器之间建立全双工双向通信通道的方法 通过单个 TCP 连接。它是与HTTP不同的TCP协议,但旨在 通过 HTTP 工作,使用端口 80 和 443,并允许重用现有防火墙规则。

WebSocket交互以HTTP请求开始,该请求使用HTTPUpgrade标头来升级,或者在本例中,切换到WebSocket协议。以下示例显示了这样的交互:

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080

(1)Upgrade标头。
(2)Upgrade使用连接。           

支持 WebSocket 的服务器返回输出,而不是通常的 200 状态代码 类似于以下内容:

HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp

 (1)协议切换           

握手成功后,HTTP 升级请求的 TCP 套接字将保留 为客户端和服务器打开以继续发送和接收消息。

有关 WebSocket 工作原理的完整介绍超出了本文档的范围。

请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后面运行,则 可能需要将其配置为将 WebSocket 升级请求传递到 WebSocket 服务器。同样,如果应用程序在云环境中运行,请检查 与 WebSocket 支持相关的云提供商的说明。

1.1. HTTP 与 WebSocket

即使 WebSocket 被设计为与 HTTP 兼容并且以 HTTP 请求开始, 重要的是要了解这两种协议导致非常不同的 体系结构和应用程序编程模型。

在 HTTP 和 REST 中,应用程序被建模为多个 URL。要与应用程序交互, 客户端访问这些 URL,请求-响应样式。服务器将请求路由到 基于 HTTP URL、方法和标头的适当处理程序。

相比之下,在 WebSocket 中,初始连接通常只有一个 URL。 随后,所有应用程序消息都在同一 TCP 连接上流动。这指向 一种完全不同的异步、事件驱动的消息传递体系结构。

WebSocket也是一种低级传输协议,与HTTP不同,它没有规定 消息内容的任何语义。这意味着无法路由或处理 消息,除非客户端和服务器在消息语义上达成一致。

WebSocket客户端和服务器可以通过HTTP握手请求上的Sec WebSocket protocol标头协商使用更高级别的消息传递协议(例如STOMP)。如果没有这一点,他们需要制定自己的惯例。

1.2. 何时使用 WebSocket

WebSockets可以使网页具有动态性和交互性。但是,在许多情况下, AJAX 和 HTTP 流或长轮询的组合可以提供简单和 有效的解决方案。

例如,新闻、邮件和社交源需要动态更新,但可能是 完全可以每隔几分钟这样做一次。协作、游戏和金融应用, 另一方面,需要更接近实时。

延迟本身并不是决定性因素。如果消息量相对较低(例如, 监控网络故障)HTTP 流或轮询可以提供有效的解决方案。 低延迟、高频和高容量的组合使最佳 使用 WebSocket 的案例。

还请记住,在Internet上,超出您控制范围的限制性代理可能会阻止WebSocket交互,因为它们未配置为传递Upgrade标头,或者因为它们关闭了看起来空闲的长期连接。这意味着,与面向公共的应用程序相比,在防火墙内部应用程序中使用WebSocket是一个更直接的决定。

二、WebSocket API

Spring 框架提供了一个 WebSocket API,您可以使用它来编写客户端和 处理 WebSocket 消息的服务器端应用程序

2.1. Server

要创建WebSocket服务器,首先可以创建WebSocketHandler。以下示例显示了如何执行此操作:

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // ...
    }
}

           

然后,您可以将其映射到 URL:

@Configuration
class WebConfig {

    @Bean
    public HandlerMapping handlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/path", new MyWebSocketHandler());
        int order = -1; // before annotated controllers

        return new SimpleUrlHandlerMapping(map, order);
    }
}

           

如果使用WebFlux配置,则无需进一步操作,否则如果不使用WebFlush配置,则需要声明WebSocketHandlerAdapter,如下所示:

@Configuration
class WebConfig {

    // ...

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

           

2.2.WebSocketHandler

WebSocketHandler的handle方法接受WebSocketSession并返回Mono<Void>以指示会话的应用程序处理何时完成。会话通过两个流处理,一个用于入站消息,另一个用于出站消息。下表描述了处理流的两种方法:

WebSocketSession方法 描述
Flux<WebSocketMessage> receive() 提供对入站消息流的访问,并在连接关闭时完成。
Mono<Void> send(Publisher<WebSocketMessage>) 获取传出消息的源,编写消息,并返回一个Mono<Void>,该消息在源完成并完成编写时完成。

WebSocketHandler必须将入站流和出站流组合成一个统一的流,并返回反映该流完成情况的Mono<Void>。根据应用程序要求,统一流程在以下情况下完成:

  • 入站或出站消息流完成。
  • 入站流完成(即连接关闭),而出站流是无限的。
  • 在选定的点上,通过WebSocketSession的close方法。

当入站和出站消息流组合在一起时,无需检查连接是否打开,因为Reactive streams表示结束活动。入站流接收完成或错误信号,出站流接收取消信号。

处理程序的最基本实现是处理入站流的实现。以下示例显示了这样的实现:

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.receive()            (1)
                .doOnNext(message -> {
                    // ...                  (2)
                })
                .concatMap(message -> {
                    // ...                  (3)
                })
                .then();                    (4)
    }
}           

(1)访问入站消息流。

(2)对每条消息做些什么。

(3)执行使用消息内容的嵌套异步操作。

(4)返回接收完成时完成的Mono<Void>。

以下实现组合了入站和出站流:

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {

        Flux<WebSocketMessage> output = session.receive()               (1)
                .doOnNext(message -> {
                    // ...
                })
                .concatMap(message -> {
                    // ...
                })
                .map(value -> session.textMessage("Echo " + value));    (2)

        return session.send(output);                                    (3)
    }
}           

(1)处理入站消息流。

(2)创建出站消息,生成组合流。

(3)返回在我们继续接收时未完成的Mono<Void>。

入站和出站流可以是独立的,并且仅在完成时才加入, 如以下示例所示:

class ExampleHandler implements WebSocketHandler {

    @Override
    public Mono<Void> handle(WebSocketSession session) {

        Mono<Void> input = session.receive()                                (1)
                .doOnNext(message -> {
                    // ...
                })
                .concatMap(message -> {
                    // ...
                })
                .then();

        Flux<String> source = ... ;
        Mono<Void> output = session.send(source.map(session::textMessage)); (2)

        return Mono.zip(input, output).then();                              (3)
    }
}           

(1)处理入站消息流。

(2)发送传出消息。

(3)加入这些流,并返回一个Mono<Void>,在任何一个流结束时完成。

2.3.DataBuffer

DataBuffer是WebFlux中字节缓冲区的表示。参考资料的Spring Core部分在数据缓冲区和编解码器部分有更多内容。要理解的关键点是,在某些服务器(如Netty)上,字节缓冲区被合并并进行引用计数,并且在使用时必须释放,以避免内存泄漏。

在Netty上运行时,应用程序必须使用DataBufferUtils.retain(dataBuffer),如果它们希望保留输入数据缓冲区,以确保它们不会被释放,然后在缓冲区被消耗时使用DataBufferUtils.release(dataBuffer)。

2.4. Handshake

WebSocketHandlerAdapter委托给WebSocketService。默认情况下,这是HandshakeWebSocketService的一个实例,它对WebSocket请求执行基本检查,然后对正在使用的服务器使用RequestUpgradeStrategy。目前,有对Reactor Netty、Tomcat、Jetty和Undertow的内置支持。

HandshakeWebSocketService公开了一个sessionAttributePredicate属性,该属性允许设置Predicate<String>以从WebSession中提取属性并将其插入WebSocketSession的属性中。

2.5.服务器配置

每个服务器的RequestUpgradeStrategy公开了特定于底层WebSocket服务器引擎的配置。使用WebFlux Java配置时,您可以自定义WebFluxConfig相应部分中所示的财产,否则,如果不使用WebFlex配置,请使用以下选项:

@Configuration
class WebConfig {

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter(webSocketService());
    }

    @Bean
    public WebSocketService webSocketService() {
        TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
        strategy.setMaxSessionIdleTimeout(0L);
        return new HandshakeWebSocketService(strategy);
    }
}

           

检查服务器的升级策略以查看可用的选项。现在 只有Tomcat和Jetty暴露了这样的选择。

2.6. CORS

配置CORS并限制对WebSocket端点的访问的最简单方法是让WebSocketHandler实现CorsConfigurationSource,并返回一个具有允许的源、头和其他详细信息的CorsCoonfiguration。如果无法做到这一点,还可以在SimpleUrlHandler上设置corsConfigurations属性,以按URL模式指定CORS设置。如果同时指定了两者,则使用CorsConfiguration上的组合方法组合它们。

2.7. 客户端

Spring WebFlux提供了一个WebSocketClient抽象,其中包含Reactor Netty、Tomcat、Jetty、Undertow和标准Java(即JSR-356)的实现。

要启动WebSocket会话,可以创建客户端的实例并使用其执行方法:

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
        session.receive()
                .doOnNext(System.out::println)
                .then());

           

有些客户端(如Jetty)实施生命周期,需要停止并启动才能使用它们。所有客户端都具有与底层WebSocket客户端的配置相关的构造函数选项。

三、开发实例

1、引入依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>webjars-locator-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>sockjs-client</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>stomp-websocket</artifactId>
            <version>2.3.3</version>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>bootstrap</artifactId>
            <version>3.3.7</version>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>jquery</artifactId>
            <version>3.1.1-1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>           

2、消息进行建模

package com.example.messagingstompwebsocket;

public class HelloMessage {

  private String name;

  public HelloMessage() {
  }

  public HelloMessage(String name) {
    this.name = name;
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }
}           

3、创建消息处理控制器

在 Spring 处理 STOMP 消息传递的方法中,STOMP 消息可以路由到@Controller类。例如,将 (from ) 映射为处理到目标的消息,如以下清单所示:

package com.example.messagingstompwebsocket;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
import org.springframework.web.util.HtmlUtils;

@Controller
public class GreetingController {


  @MessageMapping("/hello")
  @SendTo("/topic/greetings")
  public Greeting greeting(HelloMessage message) throws Exception {
    Thread.sleep(1000); // simulated delay
    return new Greeting("Hello, " + HtmlUtils.htmlEscape(message.getName()) + "!");
  }

}           

这个控制器简洁而简单,但仍在继续。我们一步一步地将其分解。

@MessageMapping注释确保,如果消息发送到/hello目标,则调用greeting()方法。

消息的有效负载被绑定到HelloMessage对象,该对象被传递到greeting()中。

在内部,该方法的实现通过使线程休眠一秒钟来模拟处理延迟。这是为了证明,在客户机发送消息后,服务器可以花多长时间来异步处理消息。客户机可以继续进行它需要做的任何工作,而无需等待响应。

延迟一秒后,greeting()方法创建一个greeting对象并将其返回。返回值将广播给/ttopic/greetings的所有订阅者,如@SendTo注释中所指定的。请注意,输入消息中的名称是经过净化的,因为在这种情况下,它将被回显并在客户端的浏览器DOM中重新呈现。

4、为STOMP消息传递配置Spring

现在已经创建了服务的基本组件,您可以配置Spring以启用WebSocket和STOMP消息传递。

package com.example.messagingstompwebsocket;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

  @Override
  public void configureMessageBroker(MessageBrokerRegistry config) {
    config.enableSimpleBroker("/topic");
    config.setApplicationDestinationPrefixes("/app");
  }

  @Override
  public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/gs-guide-websocket").withSockJS();
  }

}           

WebSocketConfig用@Configuration注释,表示它是一个Spring配置类。它还带有@EnableWebSocketMessageBroker注释。顾名思义,@EnableWebSocketMessageBroker启用WebSocket消息处理,并由消息代理支持。

configureMessageBroker()方法实现WebSocketMessageBrokerConfigurer中的默认方法来配置消息代理。它首先调用enableSimpleBroker(),以启用一个基于内存的简单消息代理,在以/topic为前缀的目的地将问候消息传送回客户端。它还为绑定到用@MessageMapping注释的方法的消息指定/app前缀。此前缀将用于定义所有消息映射。例如,/app/hello是GreetingController.greeting()方法映射到的端点。

registerTompEndpoints()方法注册/gs-guide-websocket端点,启用SockJS回退选项,以便在websocket不可用时使用备用传输。SockJS客户端将尝试连接到/gs-guide-websocket并使用最佳可用传输(websocket、xhr流、xhr轮询等)。

大家好,我是Doker品牌的Sinbad,欢迎点赞和评论,您的鼓励是我们持续更新的动力!更多资料请前往官网:Doker 多克

继续阅读