天天看點

Spring Boot 二三事:WEB 應用消息推送的那點事

閱讀對象:本文适合SpringBoot 初學者及對SpringBoot感興趣的童鞋閱讀。
背景介紹:在企業級 WEB 應用開發中,為了更好的使用者體驗&提升響應速度,往往會将一些耗時費力的請求 (Excel導入or導出,複雜計算, etc.) 進行***異步化***處理。 由此帶來的一個重要的問題是***如何通知使用者任務狀态***,常見的方法大緻分為2類4種:
  • HTTP Polling

    client pull
  • HTTP Long-Polling

    client pull
  • Server-Sent Events (SSE)

    server push
  • WebSocket

    server push

1. Polling 短輪詢

是一種非常簡單的實作方式。就是client通過***定時任務***不斷得重複請求伺服器,進而擷取新消息,而server按時間順序提供自上次請求以後發生的單個或多個消息。

Spring Boot 二三事:WEB 應用消息推送的那點事

短輪詢的優點非常明顯,就是實作簡單。當兩個方向上的資料都非常少,并且請求間隔不是非常密集時,這種方法就會非常有效。例如,新聞評論資訊可以每半分鐘更新一次,這對使用者來說是可以的。

它得缺點也是非常明顯,一旦我們對資料實時性要求非常高時,為了保證消息的及時送達,請求間隔必須縮短,在這種情況下,會加劇伺服器資源的浪費,降低服務的可用性。另一個缺點就是在消息的數量較少時,将會有大量的

request

做無用功,進而也導緻伺服器資源的浪費。

2. Long-Polling 長輪詢

長輪詢的官方定義是:

The server attempts to “hold open” (notimmediately reply to) each HTTP request, responding only when there are events to deliver. In this way, there is always a pending request to which the server can reply for the purpose of delivering events as they occur, thereby minimizing the latency in message delivery.

如果與

Polling

的方式相比,會發現

Long-Polling

的優點是通過hold open HTTP request 進而減少了無用的請求。

大緻步驟為:

  1. client向server請求并等待響應。
  2. 服務端将請求阻塞,并不斷檢查是否有新消息。如果在這個期間有新消息産生時就立即傳回。否則一直等待至

    請求逾時

  3. 當client

    擷取到新消息

    請求逾時

    ,進行消息處理并發起下一次請求。
Spring Boot 二三事:WEB 應用消息推送的那點事

Long-Polling

的缺點之一也是伺服器資源的浪費,因為它和

Polling

的一樣都屬于***被動擷取***,都需要不斷的向伺服器請求。在并發很高的情況下,對伺服器性能是個嚴峻的考驗。

Note:因為以上2兩種方式的實作都比較簡單,是以我們這裡就不做代碼示範了。接下來我們重點介紹一下

Server-Sent Events

WebSocket

3. Demo概要

下面我們将通過一個***下載下傳檔案***的案例進行示範

SSE

WebSocket

的消息推送,在這之前,我們先簡單說一下我們項目的結構,整個項目基于SpringBoot 建構。

首先我們定義一個供前端通路的API

DownloadController

@RestController
public class DownloadController {
    private static final Logger log = getLogger(DownloadController.class);
    @Autowired
    private MockDownloadComponent downloadComponent;  

    @GetMapping("/api/download/{type}")
    public String download(@PathVariable String type, HttpServletRequest request) {  // (A)
        HttpSession session = request.getSession();
        String sessionid = session.getId();
        log.info("sessionid=[{}]", sessionid);
        downloadComponent.mockDownload(type, sessionid);  // (B)
        return "success"; // (C)
    }
}
           
  • (A)

    type

    參數用于區分使用哪種推送方式,這裡為

    sse

    ,

    ws

    ,

    stomp

    這三種類型。
  • (B)

    MockDownloadComponent

    用于異步模拟下載下傳檔案的過程。
  • © 因為下載下傳過程為異步化,是以該方法不會被阻塞并立即向用戶端傳回

    success

    ,用于表明下載下傳開始。

DownloadController

中我們調用

MockDownloadComponent

mockDownload()

的方法進行模拟真正的下載下傳邏輯。

@Component
public class MockDownloadComponent {
    private static final Logger log = LoggerFactory.getLogger(DownloadController.class);

    @Async // (A)
    public void mockDownload(String type, String sessionid) {
        for (int i = 0; i < 100; i  ) {
            try {
                TimeUnit.MILLISECONDS.sleep(100); // (B)

                int percent = i   1;
                String content = String.format("{\"username\":\"%s\",\"percent\":%d}", sessionid, percent); // (C)
                log.info("username={}'s file has been finished [{}]% ", sessionid, percent);

                switch (type) { // (D)
                    case "sse":
                        SseNotificationController.usesSsePush(sessionid, content);
                        break;
                    case "ws":
                        WebSocketNotificationHandler.usesWSPush(sessionid, content);
                        break;
                    case "stomp":
                        this.usesStompPush(sessionid, content);
                        break;
                    default:
                        throw new UnsupportedOperationException("");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
           
  • (A) 我們使用

    @Async

    讓使其

    異步化

  • (B) 模拟下載下傳耗時。
  • © 消息的格式為

    {"username":"abc","percent":1}

  • (D) 根據不同的

    type

    選擇消息推送方式。

4. Server-Sent Events

SSE

是W3C定義的一組API規範,這使伺服器能夠通過HTTP将資料推送到Web頁面,它具有如下特點:

  • 單向半雙工:隻能由server向client推送消息
  • 基于http:資料被編碼為“text/event-stream”内容并使用HTTP流機制進行傳輸
  • 資料格式無限制:消息隻是遵循規範定義的一組

    key-value

    格式&

    UTF-8

    編碼的文本資料流,我們可以在消息

    payload

    中可以使用

    JSON

    或者

    XML

    或自定義資料格式。
  • http 長連接配接: 消息的實際傳遞是通過一個長期存在的HTTP連接配接完成的,消耗資源更少
  • 簡單易用的API
Spring Boot 二三事:WEB 應用消息推送的那點事

浏覽器支援情況:

Spring Boot 二三事:WEB 應用消息推送的那點事
Note:IE 浏覽器可通過第三方JS庫進行支援SSE
4.1 SpringBoot 中使用SSE

從Spring 4.2開始支援SSE規範,我們隻需要在

Controller

中傳回

SseEmitter

對象即可。

Note:Spring 5 中提供了Spring Webflux 可以更加友善的使用SSE,但是為更貼近我們的實際項目,是以文本僅示範使用Spring MVC SSE。

我們在伺服器端定義一個

SseNotificationController

用于和用戶端處理和儲存

SSE

連接配接. 其

endpoint

/api/sse-notification

@RestController
public class SseNotificationController {

    public static final Map<String, SseEmitter> SSE_HOLDER = new ConcurrentHashMap<>(); // (A)

    @GetMapping("/api/sse-notification")
    public SseEmitter files(HttpServletRequest request) {
        long millis = TimeUnit.SECONDS.toMillis(60);
        SseEmitter sseEmitter = new SseEmitter(millis); // (B)

        HttpSession session = request.getSession();
        String sessionid = session.getId();

        SSE_HOLDER.put(sessionid, sseEmitter); 
        return sseEmitter;
    }

    /**
     * 通過sessionId擷取對應的用戶端進行推送消息
     */
    public static void usesSsePush(String sessionid, String content) {  // (C)
        SseEmitter emitter = SseNotificationController.SSE_HOLDER.get(sessionid);
        if (Objects.nonNull(emitter)) {
            try {
                emitter.send(content);
            } catch (IOException | IllegalStateException e) {
                log.warn("sse send error", e);
                SseNotificationController.SSE_HOLDER.remove(sessionid);
            }
        }
    }

}
           
  • (A)

    SSE_HOLDER

    儲存了所有用戶端的

    SseEmitter

    ,用于後續通知對應用戶端。
  • (B) 根據指定逾時時間建立一個

    SseEmitter

    對象, 它是SpringMVC提供用于操作SSE的類。
  • ©

    usesSsePush()

    提供根據sessionId向對應用戶端發送消息。發送隻需要調用

    SseEmitter

    send()

    方法即可。

至此服務端已經完成,我們使用Vue編寫用戶端

Download.html

進行測試。核心代碼如下:

usesSSENotification: function () {
                var tt = this;
                var url = "/api/sse-notification";
                var sseClient = new EventSource(url);  // (A)
                sseClient.onopen = function () {...}; // (B)

                sseClient.onmessage = function (msg) {   // (C)
                    var jsonStr = msg.data;
                    console.log('message', jsonStr);
                    var obj = JSON.parse(jsonStr);
                    var percent = obj.percent;
                    tt.sseMsg  = 'SSE 通知您:已下載下傳完成'   percent   "%\r\n";
                    if (percent === 100) {
                        sseClient.close();  // (D)
                    }
                };
                sseClient.onerror = function () {
                    console.log("EventSource failed.");
                };
            }
           
  • (A) 開啟一個新的 SSE connection 并通路

    /api/sse-notification

  • (B) 當連接配接成功時的callback。
  • © 當有新消息時的callback。
  • (D) 當下載下傳進度為100%時,關閉連接配接。

效果示範:

Spring Boot 二三事:WEB 應用消息推送的那點事

4. WebSocket

WebSocket

類似于标準的TCP連接配接,它是IETF(RFC 6455)定義的通過TCP進行實時全雙工通信一種通信方式,這意味這它的功能更強大,常用于如股票報價器,聊天應用。

相比于SSE,它不僅可以雙向通信,而且甚至還能處理音頻/視訊等二進制内容。

Note:使用

WebSocket

,在高并發情況下,伺服器将擁有許多長連接配接。這對網絡代理層元件及

WebSocket

伺服器都是一個不小的性能挑戰,我們需要考慮其負載均衡方案。同時連接配接安全等問題也不容忽視。
4.1 Spring WebSocket (低級API)

Spring 4提供了一個新的

Spring-WebSocket

子產品,用于适應各種WebSocket引擎,它與Java WebSocket API标準(JSR-356)相容,并且提供了額外的增強功能。

Note: 對于應用程式來說,直接使用WebSocket API會大大增加開發難度,是以Spring為我們提供了 STOMP over WebSocket 更進階别的API使用WebSocket。在本文中将會分别示範通過low level API及higher level API進行示範。

如果想在SpringBoot中使用WebSocket,首先需要引入

spring-boot-starter-websocket

依賴

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
           

然後就可以配置相關資訊,我們先通過low level API進行示範。

首先需要自定義一個

WebSocketNotificationHandler

用于處理WebSocket 的連接配接及消息處理。我們隻需要實作

WebSocketHandler

或子類

TextWebSocketHandler ``````BinaryWebSocketHandler

public class WebSocketNotificationHandler extends TextWebSocketHandler {

    private static final Logger log = getLogger(WebSocketNotificationHandler.class);

    public static final Map<String, WebSocketSession> WS_HOLDER= new ConcurrentHashMap<>();  // (A)


    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {   // (B)
        String httpSessionId = (String) session.getAttributes().get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
        WS_HOLDER.put(httpSessionId, session);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        log.info("handleTextMessage={}", message.getPayload()); 
    }

    public static void usesWSPush(String sessionid, String content) {    // (C)
        WebSocketSession wssession = WebSocketNotificationHandler.WS_HOLDER.get(sessionid);
        if (Objects.nonNull(wssession)) {
            TextMessage textMessage = new TextMessage(content);
            try {
                wssession.sendMessage(textMessage);
            } catch (IOException | IllegalStateException e) {
                WebSocketNotificationHandler.SESSIONS.remove(sessionid);
            }
        }
    }
}
           
  • (A)

    WS_HOLDER

    用于儲存用戶端的

    WebSocket Session

  • (B) 重寫

    afterConnectionEstablished()

    方法,當連接配接建立之後,按

    sessionId

    WebSocket Session

    儲存至

    WS_HOLDER

    ,用于後續向client推送消息。
  • © 根據

    sessionId

    擷取對應

    WebSocket Session

    ,并調用

    WebSocket Session

    sendMessage(textMessage)

    方法向client發送消息。

使用

@EnableWebSocket

開啟WebSocket,并實作

WebSocketConfigurer

進行配置。

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {

        WebSocketNotificationHandler notificationHandler = new WebSocketNotificationHandler(); 
        
        registry.addHandler(notificationHandler, "/ws-notification") // (A)
                .addInterceptors(new HttpSessionHandshakeInterceptor())  // (B)
                .withSockJS();  // (C)
    }
}
           
  • (A) 将我們自定義的

    WebSocketNotificationHandler

    注冊至

    WebSocketHandlerRegistry

    .
  • (B)

    HttpSessionHandshakeInterceptor

    是一個内置的攔截器,用于傳遞HTTP會話屬性到WebSocket會話。當然你也可以通過

    HandshakeInterceptor

    接口實作自己的攔截器。
  • © 開啟SockJS的支援,SockJS的目标是讓應用程式使用WebSocket API時,當發現浏覽器不支援時,無需要更改任何代碼,即可使用非WebSocket替代方案,盡可能的模拟WebSocket。關于SockJS的更多資料,可參考https://github.com/sockjs/sockjs-client

server端至此就基本大功告成,接下來我們來完善一下client端

Download.html

,其核心方法如下:

usesWSNotification: function () {
                var tt = this;
                var url = "http://localhost:8080/ws-notification";
                var sock = new SockJS(url);   // (A)
                sock.onopen = function () {
                    console.log('open');
                    sock.send('test');
                };

                sock.onmessage = function (msg) {   // (B)
                    var jsonStr = msg.data;

                    console.log('message', jsonStr);

                    var obj = JSON.parse(jsonStr);
                    var percent = obj.percent;
                    tt.wsMsg  = 'WS 通知您:已下載下傳完成'   percent   "%\r\n";
                    if (percent === 100) {
                        sock.close();
                    }
                };

                sock.onclose = function () { 
                    console.log('ws  close');
                };
            }
           
  • (A) 首先需要在項目中引入SockJS Client , 并根據指定URL建立一個SockJS對象。
  • (B) 當有新消息時的

    callback

    ,我們可以在該方法中處理我們的消息。

效果示範:

Spring Boot 二三事:WEB 應用消息推送的那點事
4.2 STOMP over WebSocket (進階API)

WebSocket雖然定義了兩種類型的消息,文本和二進制,但是針對消息的内容沒有定義,為了更友善的處理消息,我們希望Client和Server都需要就某種協定達成一緻,以幫助處理消息。那麼,有沒有已經造好的輪子呢?答案肯定是有的。這就是STOMP。

STOMP是一種簡單的面向文本的消息傳遞協定,它其實是消息隊列的一種協定, 和AMQP,JMS是平級的。 隻不過由于它的簡單性恰巧可以用于定義WS的消息體格式。雖然STOMP是面向文本的協定,但消息的内容也可以是二進制資料。同時STOMP 可已使用任何可靠的雙向流網絡協定,如TCP和WebSocket,目前很多服務端消息隊列都已經支援了STOMP, 比如RabbitMQ, ActiveMQ等。

它結構是一種基于幀的協定,一幀由一個指令,一組可選的Header和一個可選的Body組成。

COMMAND
header1:value1
header2:value2

Body^@
           

用戶端可以使用

SEND

SUBSCRIBE

指令發送或訂閱消息。 通過

destination

标記述消息應由誰來接收處理,形成了類似于MQ的釋出訂閱機制。

Spring Boot 二三事:WEB 應用消息推送的那點事

STOMP的優勢也非常明顯,即:

  1. 不需要建立自定義消息格式
  2. 我們可以使用現有的stomp.js用戶端
  3. 可以實作消息路由及廣播
  4. 可以使用第三方成熟的消息代理中間件,如RabbitMQ, ActiveMQ等

最重要的是,

Spring STOMP

為我們提供了能夠像Spring MVC一樣的程式設計模型,減少了我們的學習成本。

下面将我們的DEMO稍作調整,使用

Spring STOMP

來實作消息推送,在本例中我們使用

SimpleBroker

模式,我們的應用将會内置一個

STOMP Broker

,将所有資訊儲存至記憶體中。

Spring Boot 二三事:WEB 應用消息推送的那點事

具體代碼如下:

@Configuration
@EnableWebSocketMessageBroker  // (A)
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {

        registry.addEndpoint("/ws-stomp-notification")
                .addInterceptors(httpSessionHandshakeInterceptor())   // (B)
                .setHandshakeHandler(httpSessionHandshakeHandler())  // (C)
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app")  // (D)
                .enableSimpleBroker("/topic", "/queue");  // (E)
    }

    @Bean
    public HttpSessionHandshakeInterceptor httpSessionHandshakeInterceptor() {
        return new HttpSessionHandshakeInterceptor();
    }

    @Bean
    public HttpSessionHandshakeHandler httpSessionHandshakeHandler() {
        return new HttpSessionHandshakeHandler();
    }

}

           
  • (A) 使用

    @EnableWebSocketMessageBroker

    注解開啟支援STOMP
  • (B) 建立一個攔截器,用于傳遞HTTP會話屬性到WebSocket會話。
  • © 配置一個自定義的

    HttpSessionHandshakeHandler

    ,其主要作用是按sessionId标記識别連接配接。
  • (D) 設定消息處理器路由字首,當消息的

    destination

    /app

    開頭時,将會把該消息路由到server端的對應的消息處理方法中。***(在本例中無實際意義)***
  • (E) 設定用戶端訂閱消息的路徑字首

HttpSessionHandshakeHandler

代碼如下:

public class HttpSessionHandshakeHandler extends DefaultHandshakeHandler {

    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
        String sessionId = (String) attributes.get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
        return new HttpSessionPrincipal(sessionId);

    }
}
           

當我們需要向client發送消息時,隻需要注入

SimpMessagingTemplate

對象即可,是不是感覺非常熟悉?! 沒錯,這種

Template

模式和我們日常使用的

RestTemplate``````JDBCTemplate

是一樣的。

我們隻需要調用

SimpMessagingTemplate

convertAndSendToUser()

方法即可向對應使用者發送消息了。

private void usesStompPush(String sessionid, String content) {
        String destination = "/queue/download-notification";
        messagingTemplate.convertAndSendToUser(sessionid, destination, content);
    }
           

在浏覽器端,client可以使用stomp.js和sockjs-client進行如下連接配接:

usesStompNotification: function () {
                var tt = this;
                var url = "http://localhost:8080/ws-stomp-notification";
                // 公共topic
                // var notificationTopic = "/topic/download-notification";
                // 點對點廣播
                var notificationTopic = "/user/queue/download-notification"; // (A)

                var socket = new SockJS(url);
                var stompClient = Stomp.over(socket);

                stompClient.connect({}, function (frame) {
                    console.log("STOMP connection successful");

                    stompClient.subscribe(notificationTopic, function (msg) {   // (B)
                        var jsonStr = msg.body;

                        var obj = JSON.parse(jsonStr);
                        var percent = obj.percent;
                        tt.stompMsg  = 'STOMP 通知您:已下載下傳完成'   percent   "%\r\n";
                        if (percent === 100) {
                            stompClient.disconnect()
                        }

                    });

                }, function (error) {
                    console.log("STOMP protocol error "   error)
                })
            }
           
  • (A) 如果想針對特定使用者接收消息,我們需要以

    /user/

    為字首,Spring STOMP會把以

    /user/

    為字首的消息交給

    UserDestinationMessageHandler

    進行處理并發給特定的使用者,當然這個

    /user/

    是可以通過

    WebSocketBrokerConfig

    進行個性化配置的,為了簡單起見,我們這裡就使用預設配置,是以我們的topic url就是

    /user/queue/download-notification

  • (B) 設定

    stompClient

    消息處理callback進行消息處理。

效果示範:

Spring Boot 二三事:WEB 應用消息推送的那點事

5 總結

在文中為大家簡單講解了幾種常用的消息推送方案,并通過一個下載下傳案例重點示範了

SSE

WebSocket

這兩種server push模式的消息推送。當然還有很多細節并沒有在文中說明,建議大家下載下傳源碼對照參考。

Spring Boot 二三事:WEB 應用消息推送的那點事

相比較這幾種模式,小編認為如果我們的需求僅僅是向用戶端推送消息,那麼使用

SSE

的成本效益更高一些,

Long-Polling

次之。使用

WebSocket

有一種殺雞用牛刀的感覺,并且給我們系統也帶來了更多的複雜性,得不償失,是以不太推薦。而

Polling

雖然實作方式最簡單且相容性最強,但是其效率過低,是以不建議使用。當然如果您有其他見解,歡迎留言讨論交流。

文中示例源碼:https://github.com/leven-space/SpringBootNotification.git

如果您覺得這篇文章有用,請留下您的小??,我是一枚Java國小生,歡迎大家吐槽留言。

繼續閱讀