本文描述在spring boot中如何使用基于stomp實作即時通信的功能。
- spring boot 版本:2.1.2.RELEASE
- jdk8
Maven pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
基本配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
/**
* broker定義成常量,其他地方也會用到
*/
public static final String BROKER_SYSTEM_MSG = "/system/msg";
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 注1
config.enableSimpleBroker(BROKER_SYSTEM_MSG);
// 注2
config.setApplicationDestinationPrefixes("/app");
// 注3
config.setUserDestinationPrefix("/user");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注4
registry.addEndpoint("/stomp")
.setAllowedOrigins("*")
.withSockJS();
}
}
-
注1
定義訂閱者節點,也就是用戶端可以訂閱哪些位址,如果不在這裡定義的話,服務端是不能發消息給用戶端的。
-
注2
用戶端發消息給服務端的位址字首,比如你在服務端定義了一個請求位址為
,那麼用戶端發消息給你的時候,用的位址就是/hello
。/app/hello
-
注3
服務端發消息給指定用戶端的字首,比如你定義了一個broker叫
,如果你在服務端發消息用/system/msg
位址時,消息是以廣播方式發送的,也就是所有訂閱了這個位址的用戶端都能收到,而如果你用/system/msg
這個位址來發送的話,就是發送給指定的用戶端,我們目前的配置是無法發消息給指定用戶端的,後面會說到怎麼配置可以發消息給指定用戶端。/user/system/msg
- 注4
的連接配接節點,在我們要訂閱或發送消息之前,首先我們要和服務端連接配接上,這裡就是定義一個連接配接的節點。stomp
new SockJS("http://host:port/stomp")
在開發的時候
setAllowedOrigins("*")
是非常友善的,生産環境請自行設定限制。
用戶端如何訂閱和發送消息,以及服務端如何廣播消息
@Controller
public class StompController {
@MessageMapping("/hello")
@SendTo("/system/msg")
String hello(@Payload String name) {
System.out.println("hello " + name);
return "fu*k away";
}
}
-
@MessageMapping
和
類似,定義一個接口,用戶端可以發送消息到這個位址上。RequestMapping
// 用戶端使用的是stomp-js v5
// https://stomp-js.github.io/guide/stompjs/2018/06/28/using-stompjs-v5.html
client.publish({destination: '/app/hello', body: 'world'})
還記得我們在
config.setApplicationDestinationPrefixes("/app")
定義了字首,是以用戶端發送消息的位址要加上這個字首。
-
@SendTo
服務端廣播消息給用戶端,這裡的位址必須是在
定義過的,否則消息無法到達用戶端。下面是用戶端訂閱消息,這樣就可以收到服務端的廣播了。config.enableSimpleBroker()
// 用戶端使用的是stomp-js v5
// https://stomp-js.github.io/guide/stompjs/2018/06/28/using-stompjs-v5.html
client.subscribe('/system/msg', message => {
console.log(message)
})
-
@Payload String name
用戶端發過來的參數,也就是
函數裡的client.publish
參數。body
以上,最基本的
stomp
就算完成了,可以實作用戶端訂閱消息,服務端廣播消息。在我們定義的
Controller
裡,用戶端發送消息過來後,服務端就會立即廣播一個消息給用戶端,但是有一些時候我們希望在任何地方,任何時刻都可以廣播消息給用戶端。
如何你不想在@MessageMapping的方法裡廣播消息,将該方法的@SendTo和傳回定義去掉就好了。
“手動”廣播消息給用戶端
比如我們有某個定時任務,完成任務後,需要廣播一個消息給用戶端。
@Service
public class MyTask {
@Autowire
private SimpMessagingTemplate messagingTemplate;
public void doTask() {
// 你的業務代碼
Map<String, Object> msg = new HashMap<>();
msg.put("name", "zhangsan");
messagingTemplate.convertAndSend("/system/msg", msg);
}
}
SimpMessagingTemplate
這個類就是可以讓我們自由自在廣播消息的罪魁禍首,這個Bean直接注入就好,這是spring已經在内部定義好的。
convertAndSend()
方法的第一個參數就是我們定義的
broker
位址,再說一遍,不事先在
config.enableSimpleBroker()
定義好這個位址是沒有未來的。
如何發送消息給指定的用戶端
spring發送指定消息給用戶端是依賴
org.springframework.http.server.ServerHttpRequest#getPrincipal()
來确定使用者的身份,一般的,如果我們的使用者是已經登入的,可以用使用者的業務ID作為
Principal
,如果是未登入使用者,可以随機生成一個唯一标示作為
Principal
也可以,這個需要根據你的業務場景來定制,這裡我們隻需要知道如何将自己的資料作為
Principal
交給spring即可。
public class CustomChannelInterceptor implements ChannelInterceptor {
private static final Logger log = LogManager.getLogger(CustomChannelInterceptor.class);
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
List<String> tmp = accessor.getNativeHeader("X-User-ID");
String userId = tmp == null || tmp.size() == 0 ? null : tmp.get(0);
accessor.setUser(() -> userId);
log.debug("client user id: {}", userId);
}
return message;
}
}
X-User-ID
是用戶端發送的自定義頭資訊,這個ID你随便寫都沒問題,隻要保證唯一即可,前面也說過,這裡得依賴你的業務場景,也和你應用的安全架構實作有關,但不管如何,隻要将你的唯一憑證交給
accessor.setUser()
即可,這樣spring就認識目前這個用戶端了。
然後回到我們的
WebSocketConfig
配置類中。
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new CustomChannelInterceptor());
}
再看如何發送消息給指定用戶端。
- 通過
來發送SimpMessagingTemplate
@Autowire
private SimpMessagingTemplate messagingTemplate;
...
String userId = ...; // 還記得前面的X-User-ID嗎,就是那個東西
String destination = "/system/msg"; // broker那裡定義的
Map<String, Object> payload = ...; // 發給用戶端的參數
messagingTemplate.convertAndSendToUser(userId, destination, payload);
- 通過
來發送@SendToUser
@MessageMapping("/hello")
@SendToUser(value = "/system/msg", broadcast = false)
public Object hello() {
return "fu*k away";
}
下面看用戶端的代碼,這裡給出用戶端的完整代碼。
// 用戶端使用的是stomp-js v5
// https://stomp-js.github.io/guide/stompjs/2018/06/28/using-stompjs-v5.html
import SockJS from 'sockjs-client'
import { Client } from '@stomp/stompjs'
let client = new Client({
connectHeaders: {
'X-User-ID': 'xingshang' // 測試用寫死了一個使用者ID
},
debug: function (str) {
console.log(str)
},
reconnectDelay: 5000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000
});
client.webSocketFactory = function () {
return new SockJS("http://localhost:9780/stomp")
};
client.onConnect = () => {
console.log('on connect')
client.subscribe("/user/system/msg", message => {
// user字首,服務端隻會發給我自己
})
client.subscribe('/system/msg', message => {
// 沒有user字首,廣播消息,服務端發的消息,任何人訂閱這個位址都會收到消息
})
}
client.activate() // 開始與服務端連接配接
someButton.click = () => {
client.publish({
destination: '/app/hello', // 發給服務端的消息用app開頭
body: 'world' // body隻能是字元串,如果要發送結構,用JSON.stringify()轉換
})
}
至此,發送給指定用戶端消息的代碼完結。但還有件小事,前面用
messagingTemplate.convertAndSendToUser(userId, destination, payload)
發消息時,這裡的
userId
如何獲得,我們知道這個
userId
是從我的用戶端的
X-User-ID
發過去的,whatever,當我們在
ChannelInterceptor#preSend()
方法獲得這個id時,如何存儲好讓我們在後續的其他地方拿到。
如果你是在
@MessageMapping
方法裡想擷取這個資訊,那麼很友善。
@MessageMapping("/hello")
public void hello(Principal principal) {
String userId = principal.getName();
}
如果你要在其他業務類擷取,自己在
ChannelInterceptor#preSend()
裡想辦法把這個
userId
存儲起來,和
sessionId
關聯也好,和什麼關聯也好。
如何在服務端關閉會話連接配接
使用
stomp
時你會發現在用戶端關閉連接配接是非常容易的,但是在服務端關閉就比較麻煩,首先我們要獲得
WebSocketSession
這個對象儲存起來,繼續回到我們的
WebSocketConfig
配置類。
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
@Override
public WebSocketHandler decorate(final WebSocketHandler handler) {
return new WebSocketHandlerDecorator(handler) {
@Override
public void afterConnectionEstablished(final WebSocketSession session) throws Exception {
super.afterConnectionEstablished(session);
// 在這裡将session存起來,根據session.id存在一個map也好,後面我們可以根據sessionId來獲得這個session
}
};
}
});
}
有了
WebSocketSession
,就可以根據sessionId擷取對應的session,并調用
close()
方法來關閉連接配接。
WebSocket事件
假如我們想知道連接配接斷開的消息。
@Component
public class SessionDisconnectedEventListener implements ApplicationListener<SessionDisconnectEvent> {
private static Logger log = LogManager.getLogger(SessionDisconnectedEventListener.class);
@Override
public void onApplicationEvent(SessionDisconnectEvent e) {
log.debug("client disconnect: {}", e.getSessionId());
}
}
其他事件看下面這個連接配接:
https://docs.spring.io/spring-framework/docs/5.0.0.M1/spring-framework-reference/html/websocket.html#websocket-stomp-appplication-context-events
素巴拉西,以後再也不怕忘記怎麼寫weboscket了。