閱讀對象:本文适合SpringBoot 初學者及對SpringBoot感興趣的童鞋閱讀。
背景介紹:在企業級 WEB 應用開發中,為了更好的使用者體驗&提升響應速度,往往會将一些耗時費力的請求 (Excel導入or導出,複雜計算, etc.) 進行***異步化***處理。 由此帶來的一個重要的問題是***如何通知使用者任務狀态***,常見的方法大緻分為2類4種:
client pull
HTTP Polling
client pull
HTTP Long-Polling
server push
Server-Sent Events (SSE)
server push
WebSocket
1. Polling 短輪詢
是一種非常簡單的實作方式。就是client通過***定時任務***不斷得重複請求伺服器,進而擷取新消息,而server按時間順序提供自上次請求以後發生的單個或多個消息。
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI3pVdC5GTsJleNVzaq5UbKdlTrhmaNlmVqlFaxMkTyElaNVTRq5Ee4k3YsR2VZRHbyg1aGJjYzJEWkZHOXFWdVhUY6VzVZBHctxkeWJjWoFzVhRXUXlld4d0YxkTeMZTTINGMShUYvwlbj5yZtlmbkN3YuQnclZnbvN2Ztl2Lc9CX6MHc0RHaiojIsJye.jpg)
短輪詢的優點非常明顯,就是實作簡單。當兩個方向上的資料都非常少,并且請求間隔不是非常密集時,這種方法就會非常有效。例如,新聞評論資訊可以每半分鐘更新一次,這對使用者來說是可以的。
它得缺點也是非常明顯,一旦我們對資料實時性要求非常高時,為了保證消息的及時送達,請求間隔必須縮短,在這種情況下,會加劇伺服器資源的浪費,降低服務的可用性。另一個缺點就是在消息的數量較少時,将會有大量的
request
做無用功,進而也導緻伺服器資源的浪費。
2. Long-Polling 長輪詢
長輪詢的官方定義是:
The server attempts to “hold open” (notimmediately reply to) each HTTP request, responding only when there are events to deliver. In this way, there is always a pending request to which the server can reply for the purpose of delivering events as they occur, thereby minimizing the latency in message delivery.
如果與
Polling
的方式相比,會發現
Long-Polling
的優點是通過hold open HTTP request 進而減少了無用的請求。
大緻步驟為:
- client向server請求并等待響應。
- 服務端将請求阻塞,并不斷檢查是否有新消息。如果在這個期間有新消息産生時就立即傳回。否則一直等待至
。請求逾時
- 當client
或擷取到新消息
,進行消息處理并發起下一次請求。請求逾時
Long-Polling
的缺點之一也是伺服器資源的浪費,因為它和
Polling
的一樣都屬于***被動擷取***,都需要不斷的向伺服器請求。在并發很高的情況下,對伺服器性能是個嚴峻的考驗。
Note:因為以上2兩種方式的實作都比較簡單,是以我們這裡就不做代碼示範了。接下來我們重點介紹一下及
Server-Sent Events
。
WebSocket
3. Demo概要
下面我們将通過一個***下載下傳檔案***的案例進行示範
SSE
和
WebSocket
的消息推送,在這之前,我們先簡單說一下我們項目的結構,整個項目基于SpringBoot 建構。
首先我們定義一個供前端通路的API
DownloadController
@RestController
public class DownloadController {
private static final Logger log = getLogger(DownloadController.class);
@Autowired
private MockDownloadComponent downloadComponent;
@GetMapping("/api/download/{type}")
public String download(@PathVariable String type, HttpServletRequest request) { // (A)
HttpSession session = request.getSession();
String sessionid = session.getId();
log.info("sessionid=[{}]", sessionid);
downloadComponent.mockDownload(type, sessionid); // (B)
return "success"; // (C)
}
}
- (A)
參數用于區分使用哪種推送方式,這裡為type
,sse
,ws
這三種類型。stomp
- (B)
用于異步模拟下載下傳檔案的過程。MockDownloadComponent
- © 因為下載下傳過程為異步化,是以該方法不會被阻塞并立即向用戶端傳回
,用于表明下載下傳開始。success
在
DownloadController
中我們調用
MockDownloadComponent
的
mockDownload()
的方法進行模拟真正的下載下傳邏輯。
@Component
public class MockDownloadComponent {
private static final Logger log = LoggerFactory.getLogger(DownloadController.class);
@Async // (A)
public void mockDownload(String type, String sessionid) {
for (int i = 0; i < 100; i ) {
try {
TimeUnit.MILLISECONDS.sleep(100); // (B)
int percent = i 1;
String content = String.format("{\"username\":\"%s\",\"percent\":%d}", sessionid, percent); // (C)
log.info("username={}'s file has been finished [{}]% ", sessionid, percent);
switch (type) { // (D)
case "sse":
SseNotificationController.usesSsePush(sessionid, content);
break;
case "ws":
WebSocketNotificationHandler.usesWSPush(sessionid, content);
break;
case "stomp":
this.usesStompPush(sessionid, content);
break;
default:
throw new UnsupportedOperationException("");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- (A) 我們使用
讓使其@Async
。異步化
- (B) 模拟下載下傳耗時。
- © 消息的格式為
。{"username":"abc","percent":1}
- (D) 根據不同的
選擇消息推送方式。type
4. Server-Sent Events
SSE
是W3C定義的一組API規範,這使伺服器能夠通過HTTP将資料推送到Web頁面,它具有如下特點:
- 單向半雙工:隻能由server向client推送消息
- 基于http:資料被編碼為“text/event-stream”内容并使用HTTP流機制進行傳輸
- 資料格式無限制:消息隻是遵循規範定義的一組
格式&key-value
編碼的文本資料流,我們可以在消息UTF-8
中可以使用payload
或者JSON
或自定義資料格式。XML
- http 長連接配接: 消息的實際傳遞是通過一個長期存在的HTTP連接配接完成的,消耗資源更少
- 簡單易用的API
浏覽器支援情況:
Note:IE 浏覽器可通過第三方JS庫進行支援SSE
4.1 SpringBoot 中使用SSE
從Spring 4.2開始支援SSE規範,我們隻需要在
Controller
中傳回
SseEmitter
對象即可。
Note:Spring 5 中提供了Spring Webflux 可以更加友善的使用SSE,但是為更貼近我們的實際項目,是以文本僅示範使用Spring MVC SSE。
我們在伺服器端定義一個
SseNotificationController
用于和用戶端處理和儲存
SSE
連接配接. 其
endpoint
為
/api/sse-notification
。
@RestController
public class SseNotificationController {
public static final Map<String, SseEmitter> SSE_HOLDER = new ConcurrentHashMap<>(); // (A)
@GetMapping("/api/sse-notification")
public SseEmitter files(HttpServletRequest request) {
long millis = TimeUnit.SECONDS.toMillis(60);
SseEmitter sseEmitter = new SseEmitter(millis); // (B)
HttpSession session = request.getSession();
String sessionid = session.getId();
SSE_HOLDER.put(sessionid, sseEmitter);
return sseEmitter;
}
/**
* 通過sessionId擷取對應的用戶端進行推送消息
*/
public static void usesSsePush(String sessionid, String content) { // (C)
SseEmitter emitter = SseNotificationController.SSE_HOLDER.get(sessionid);
if (Objects.nonNull(emitter)) {
try {
emitter.send(content);
} catch (IOException | IllegalStateException e) {
log.warn("sse send error", e);
SseNotificationController.SSE_HOLDER.remove(sessionid);
}
}
}
}
- (A)
儲存了所有用戶端的SSE_HOLDER
,用于後續通知對應用戶端。SseEmitter
- (B) 根據指定逾時時間建立一個
對象, 它是SpringMVC提供用于操作SSE的類。SseEmitter
- ©
提供根據sessionId向對應用戶端發送消息。發送隻需要調用usesSsePush()
的SseEmitter
方法即可。send()
至此服務端已經完成,我們使用Vue編寫用戶端
Download.html
進行測試。核心代碼如下:
usesSSENotification: function () {
var tt = this;
var url = "/api/sse-notification";
var sseClient = new EventSource(url); // (A)
sseClient.onopen = function () {...}; // (B)
sseClient.onmessage = function (msg) { // (C)
var jsonStr = msg.data;
console.log('message', jsonStr);
var obj = JSON.parse(jsonStr);
var percent = obj.percent;
tt.sseMsg = 'SSE 通知您:已下載下傳完成' percent "%\r\n";
if (percent === 100) {
sseClient.close(); // (D)
}
};
sseClient.onerror = function () {
console.log("EventSource failed.");
};
}
- (A) 開啟一個新的 SSE connection 并通路
。/api/sse-notification
- (B) 當連接配接成功時的callback。
- © 當有新消息時的callback。
- (D) 當下載下傳進度為100%時,關閉連接配接。
效果示範:
4. WebSocket
WebSocket
類似于标準的TCP連接配接,它是IETF(RFC 6455)定義的通過TCP進行實時全雙工通信一種通信方式,這意味這它的功能更強大,常用于如股票報價器,聊天應用。
相比于SSE,它不僅可以雙向通信,而且甚至還能處理音頻/視訊等二進制内容。
Note:使用,在高并發情況下,伺服器将擁有許多長連接配接。這對網絡代理層元件及
WebSocket
伺服器都是一個不小的性能挑戰,我們需要考慮其負載均衡方案。同時連接配接安全等問題也不容忽視。
WebSocket
4.1 Spring WebSocket (低級API)
Spring 4提供了一個新的
Spring-WebSocket
子產品,用于适應各種WebSocket引擎,它與Java WebSocket API标準(JSR-356)相容,并且提供了額外的增強功能。
Note: 對于應用程式來說,直接使用WebSocket API會大大增加開發難度,是以Spring為我們提供了 STOMP over WebSocket 更進階别的API使用WebSocket。在本文中将會分别示範通過low level API及higher level API進行示範。
如果想在SpringBoot中使用WebSocket,首先需要引入
spring-boot-starter-websocket
依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
然後就可以配置相關資訊,我們先通過low level API進行示範。
首先需要自定義一個
WebSocketNotificationHandler
用于處理WebSocket 的連接配接及消息處理。我們隻需要實作
WebSocketHandler
或子類
TextWebSocketHandler ``````BinaryWebSocketHandler
。
public class WebSocketNotificationHandler extends TextWebSocketHandler {
private static final Logger log = getLogger(WebSocketNotificationHandler.class);
public static final Map<String, WebSocketSession> WS_HOLDER= new ConcurrentHashMap<>(); // (A)
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception { // (B)
String httpSessionId = (String) session.getAttributes().get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
WS_HOLDER.put(httpSessionId, session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
log.info("handleTextMessage={}", message.getPayload());
}
public static void usesWSPush(String sessionid, String content) { // (C)
WebSocketSession wssession = WebSocketNotificationHandler.WS_HOLDER.get(sessionid);
if (Objects.nonNull(wssession)) {
TextMessage textMessage = new TextMessage(content);
try {
wssession.sendMessage(textMessage);
} catch (IOException | IllegalStateException e) {
WebSocketNotificationHandler.SESSIONS.remove(sessionid);
}
}
}
}
- (A)
用于儲存用戶端的WS_HOLDER
WebSocket Session
- (B) 重寫
方法,當連接配接建立之後,按afterConnectionEstablished()
将sessionId
儲存至WebSocket Session
,用于後續向client推送消息。WS_HOLDER
- © 根據
擷取對應sessionId
,并調用WebSocket Session
的WebSocket Session
方法向client發送消息。sendMessage(textMessage)
使用
@EnableWebSocket
開啟WebSocket,并實作
WebSocketConfigurer
進行配置。
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
WebSocketNotificationHandler notificationHandler = new WebSocketNotificationHandler();
registry.addHandler(notificationHandler, "/ws-notification") // (A)
.addInterceptors(new HttpSessionHandshakeInterceptor()) // (B)
.withSockJS(); // (C)
}
}
- (A) 将我們自定義的
注冊至WebSocketNotificationHandler
.WebSocketHandlerRegistry
- (B)
是一個内置的攔截器,用于傳遞HTTP會話屬性到WebSocket會話。當然你也可以通過HttpSessionHandshakeInterceptor
接口實作自己的攔截器。HandshakeInterceptor
- © 開啟SockJS的支援,SockJS的目标是讓應用程式使用WebSocket API時,當發現浏覽器不支援時,無需要更改任何代碼,即可使用非WebSocket替代方案,盡可能的模拟WebSocket。關于SockJS的更多資料,可參考https://github.com/sockjs/sockjs-client
server端至此就基本大功告成,接下來我們來完善一下client端
Download.html
,其核心方法如下:
usesWSNotification: function () {
var tt = this;
var url = "http://localhost:8080/ws-notification";
var sock = new SockJS(url); // (A)
sock.onopen = function () {
console.log('open');
sock.send('test');
};
sock.onmessage = function (msg) { // (B)
var jsonStr = msg.data;
console.log('message', jsonStr);
var obj = JSON.parse(jsonStr);
var percent = obj.percent;
tt.wsMsg = 'WS 通知您:已下載下傳完成' percent "%\r\n";
if (percent === 100) {
sock.close();
}
};
sock.onclose = function () {
console.log('ws close');
};
}
- (A) 首先需要在項目中引入SockJS Client , 并根據指定URL建立一個SockJS對象。
- (B) 當有新消息時的
,我們可以在該方法中處理我們的消息。callback
效果示範:
4.2 STOMP over WebSocket (進階API)
WebSocket雖然定義了兩種類型的消息,文本和二進制,但是針對消息的内容沒有定義,為了更友善的處理消息,我們希望Client和Server都需要就某種協定達成一緻,以幫助處理消息。那麼,有沒有已經造好的輪子呢?答案肯定是有的。這就是STOMP。
STOMP是一種簡單的面向文本的消息傳遞協定,它其實是消息隊列的一種協定, 和AMQP,JMS是平級的。 隻不過由于它的簡單性恰巧可以用于定義WS的消息體格式。雖然STOMP是面向文本的協定,但消息的内容也可以是二進制資料。同時STOMP 可已使用任何可靠的雙向流網絡協定,如TCP和WebSocket,目前很多服務端消息隊列都已經支援了STOMP, 比如RabbitMQ, ActiveMQ等。
它結構是一種基于幀的協定,一幀由一個指令,一組可選的Header和一個可選的Body組成。
COMMAND
header1:value1
header2:value2
Body^@
用戶端可以使用
SEND
或
SUBSCRIBE
指令發送或訂閱消息。 通過
destination
标記述消息應由誰來接收處理,形成了類似于MQ的釋出訂閱機制。
STOMP的優勢也非常明顯,即:
- 不需要建立自定義消息格式
- 我們可以使用現有的stomp.js用戶端
- 可以實作消息路由及廣播
- 可以使用第三方成熟的消息代理中間件,如RabbitMQ, ActiveMQ等
最重要的是,
Spring STOMP
為我們提供了能夠像Spring MVC一樣的程式設計模型,減少了我們的學習成本。
下面将我們的DEMO稍作調整,使用
Spring STOMP
來實作消息推送,在本例中我們使用
SimpleBroker
模式,我們的應用将會内置一個
STOMP Broker
,将所有資訊儲存至記憶體中。
具體代碼如下:
@Configuration
@EnableWebSocketMessageBroker // (A)
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-stomp-notification")
.addInterceptors(httpSessionHandshakeInterceptor()) // (B)
.setHandshakeHandler(httpSessionHandshakeHandler()) // (C)
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app") // (D)
.enableSimpleBroker("/topic", "/queue"); // (E)
}
@Bean
public HttpSessionHandshakeInterceptor httpSessionHandshakeInterceptor() {
return new HttpSessionHandshakeInterceptor();
}
@Bean
public HttpSessionHandshakeHandler httpSessionHandshakeHandler() {
return new HttpSessionHandshakeHandler();
}
}
- (A) 使用
注解開啟支援STOMP@EnableWebSocketMessageBroker
- (B) 建立一個攔截器,用于傳遞HTTP會話屬性到WebSocket會話。
- © 配置一個自定義的
,其主要作用是按sessionId标記識别連接配接。HttpSessionHandshakeHandler
- (D) 設定消息處理器路由字首,當消息的
已destination
開頭時,将會把該消息路由到server端的對應的消息處理方法中。***(在本例中無實際意義)***/app
- (E) 設定用戶端訂閱消息的路徑字首
HttpSessionHandshakeHandler
代碼如下:
public class HttpSessionHandshakeHandler extends DefaultHandshakeHandler {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
String sessionId = (String) attributes.get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
return new HttpSessionPrincipal(sessionId);
}
}
當我們需要向client發送消息時,隻需要注入
SimpMessagingTemplate
對象即可,是不是感覺非常熟悉?! 沒錯,這種
Template
模式和我們日常使用的
RestTemplate``````JDBCTemplate
是一樣的。
我們隻需要調用
SimpMessagingTemplate
的
convertAndSendToUser()
方法即可向對應使用者發送消息了。
private void usesStompPush(String sessionid, String content) {
String destination = "/queue/download-notification";
messagingTemplate.convertAndSendToUser(sessionid, destination, content);
}
在浏覽器端,client可以使用stomp.js和sockjs-client進行如下連接配接:
usesStompNotification: function () {
var tt = this;
var url = "http://localhost:8080/ws-stomp-notification";
// 公共topic
// var notificationTopic = "/topic/download-notification";
// 點對點廣播
var notificationTopic = "/user/queue/download-notification"; // (A)
var socket = new SockJS(url);
var stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
console.log("STOMP connection successful");
stompClient.subscribe(notificationTopic, function (msg) { // (B)
var jsonStr = msg.body;
var obj = JSON.parse(jsonStr);
var percent = obj.percent;
tt.stompMsg = 'STOMP 通知您:已下載下傳完成' percent "%\r\n";
if (percent === 100) {
stompClient.disconnect()
}
});
}, function (error) {
console.log("STOMP protocol error " error)
})
}
- (A) 如果想針對特定使用者接收消息,我們需要以
為字首,Spring STOMP會把以/user/
為字首的消息交給/user/
進行處理并發給特定的使用者,當然這個UserDestinationMessageHandler
是可以通過/user/
進行個性化配置的,為了簡單起見,我們這裡就使用預設配置,是以我們的topic url就是WebSocketBrokerConfig
。/user/queue/download-notification
- (B) 設定
消息處理callback進行消息處理。stompClient
效果示範:
5 總結
在文中為大家簡單講解了幾種常用的消息推送方案,并通過一個下載下傳案例重點示範了
SSE
及
WebSocket
這兩種server push模式的消息推送。當然還有很多細節并沒有在文中說明,建議大家下載下傳源碼對照參考。
相比較這幾種模式,小編認為如果我們的需求僅僅是向用戶端推送消息,那麼使用
SSE
的成本效益更高一些,
Long-Polling
次之。使用
WebSocket
有一種殺雞用牛刀的感覺,并且給我們系統也帶來了更多的複雜性,得不償失,是以不太推薦。而
Polling
雖然實作方式最簡單且相容性最強,但是其效率過低,是以不建議使用。當然如果您有其他見解,歡迎留言讨論交流。
文中示例源碼:https://github.com/leven-space/SpringBootNotification.git
如果您覺得這篇文章有用,請留下您的小??,我是一枚Java國小生,歡迎大家吐槽留言。