本文描述在spring boot中如何使用基于stomp实现即时通信的功能。
- spring boot 版本:2.1.2.RELEASE
- jdk8
Maven pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
基本配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
/**
* broker定义成常量,其他地方也会用到
*/
public static final String BROKER_SYSTEM_MSG = "/system/msg";
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 注1
config.enableSimpleBroker(BROKER_SYSTEM_MSG);
// 注2
config.setApplicationDestinationPrefixes("/app");
// 注3
config.setUserDestinationPrefix("/user");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注4
registry.addEndpoint("/stomp")
.setAllowedOrigins("*")
.withSockJS();
}
}
-
注1
定义订阅者节点,也就是客户端可以订阅哪些地址,如果不在这里定义的话,服务端是不能发消息给客户端的。
-
注2
客户端发消息给服务端的地址前缀,比如你在服务端定义了一个请求地址为
,那么客户端发消息给你的时候,用的地址就是/hello
。/app/hello
-
注3
服务端发消息给指定客户端的前缀,比如你定义了一个broker叫
,如果你在服务端发消息用/system/msg
地址时,消息是以广播方式发送的,也就是所有订阅了这个地址的客户端都能收到,而如果你用/system/msg
这个地址来发送的话,就是发送给指定的客户端,我们目前的配置是无法发消息给指定客户端的,后面会说到怎么配置可以发消息给指定客户端。/user/system/msg
- 注4
的连接节点,在我们要订阅或发送消息之前,首先我们要和服务端连接上,这里就是定义一个连接的节点。stomp
new SockJS("http://host:port/stomp")
在开发的时候
setAllowedOrigins("*")
是非常方便的,生产环境请自行设置限制。
客户端如何订阅和发送消息,以及服务端如何广播消息
@Controller
public class StompController {
@MessageMapping("/hello")
@SendTo("/system/msg")
String hello(@Payload String name) {
System.out.println("hello " + name);
return "fu*k away";
}
}
-
@MessageMapping
和
类似,定义一个接口,客户端可以发送消息到这个地址上。RequestMapping
// 客户端使用的是stomp-js v5
// https://stomp-js.github.io/guide/stompjs/2018/06/28/using-stompjs-v5.html
client.publish({destination: '/app/hello', body: 'world'})
还记得我们在
config.setApplicationDestinationPrefixes("/app")
定义了前缀,所以客户端发送消息的地址要加上这个前缀。
-
@SendTo
服务端广播消息给客户端,这里的地址必须是在
定义过的,否则消息无法到达客户端。下面是客户端订阅消息,这样就可以收到服务端的广播了。config.enableSimpleBroker()
// 客户端使用的是stomp-js v5
// https://stomp-js.github.io/guide/stompjs/2018/06/28/using-stompjs-v5.html
client.subscribe('/system/msg', message => {
console.log(message)
})
-
@Payload String name
客户端发过来的参数,也就是
函数里的client.publish
参数。body
以上,最基本的
stomp
就算完成了,可以实现客户端订阅消息,服务端广播消息。在我们定义的
Controller
里,客户端发送消息过来后,服务端就会立即广播一个消息给客户端,但是有一些时候我们希望在任何地方,任何时刻都可以广播消息给客户端。
如何你不想在@MessageMapping的方法里广播消息,将该方法的@SendTo和返回定义去掉就好了。
“手动”广播消息给客户端
比如我们有某个定时任务,完成任务后,需要广播一个消息给客户端。
@Service
public class MyTask {
@Autowire
private SimpMessagingTemplate messagingTemplate;
public void doTask() {
// 你的业务代码
Map<String, Object> msg = new HashMap<>();
msg.put("name", "zhangsan");
messagingTemplate.convertAndSend("/system/msg", msg);
}
}
SimpMessagingTemplate
这个类就是可以让我们自由自在广播消息的罪魁祸首,这个Bean直接注入就好,这是spring已经在内部定义好的。
convertAndSend()
方法的第一个参数就是我们定义的
broker
地址,再说一遍,不事先在
config.enableSimpleBroker()
定义好这个地址是没有未来的。
如何发送消息给指定的客户端
spring发送指定消息给客户端是依赖
org.springframework.http.server.ServerHttpRequest#getPrincipal()
来确定用户的身份,一般的,如果我们的用户是已经登录的,可以用用户的业务ID作为
Principal
,如果是未登录用户,可以随机生成一个唯一标示作为
Principal
也可以,这个需要根据你的业务场景来定制,这里我们只需要知道如何将自己的数据作为
Principal
交给spring即可。
public class CustomChannelInterceptor implements ChannelInterceptor {
private static final Logger log = LogManager.getLogger(CustomChannelInterceptor.class);
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
List<String> tmp = accessor.getNativeHeader("X-User-ID");
String userId = tmp == null || tmp.size() == 0 ? null : tmp.get(0);
accessor.setUser(() -> userId);
log.debug("client user id: {}", userId);
}
return message;
}
}
X-User-ID
是客户端发送的自定义头信息,这个ID你随便写都没问题,只要保证唯一即可,前面也说过,这里得依赖你的业务场景,也和你应用的安全框架实现有关,但不管如何,只要将你的唯一凭证交给
accessor.setUser()
即可,这样spring就认识当前这个客户端了。
然后回到我们的
WebSocketConfig
配置类中。
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new CustomChannelInterceptor());
}
再看如何发送消息给指定客户端。
- 通过
来发送SimpMessagingTemplate
@Autowire
private SimpMessagingTemplate messagingTemplate;
...
String userId = ...; // 还记得前面的X-User-ID吗,就是那个东西
String destination = "/system/msg"; // broker那里定义的
Map<String, Object> payload = ...; // 发给客户端的参数
messagingTemplate.convertAndSendToUser(userId, destination, payload);
- 通过
来发送@SendToUser
@MessageMapping("/hello")
@SendToUser(value = "/system/msg", broadcast = false)
public Object hello() {
return "fu*k away";
}
下面看客户端的代码,这里给出客户端的完整代码。
// 客户端使用的是stomp-js v5
// https://stomp-js.github.io/guide/stompjs/2018/06/28/using-stompjs-v5.html
import SockJS from 'sockjs-client'
import { Client } from '@stomp/stompjs'
let client = new Client({
connectHeaders: {
'X-User-ID': 'xingshang' // 测试用写死了一个用户ID
},
debug: function (str) {
console.log(str)
},
reconnectDelay: 5000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000
});
client.webSocketFactory = function () {
return new SockJS("http://localhost:9780/stomp")
};
client.onConnect = () => {
console.log('on connect')
client.subscribe("/user/system/msg", message => {
// user前缀,服务端只会发给我自己
})
client.subscribe('/system/msg', message => {
// 没有user前缀,广播消息,服务端发的消息,任何人订阅这个地址都会收到消息
})
}
client.activate() // 开始与服务端连接
someButton.click = () => {
client.publish({
destination: '/app/hello', // 发给服务端的消息用app开头
body: 'world' // body只能是字符串,如果要发送结构,用JSON.stringify()转换
})
}
至此,发送给指定客户端消息的代码完结。但还有件小事,前面用
messagingTemplate.convertAndSendToUser(userId, destination, payload)
发消息时,这里的
userId
如何获得,我们知道这个
userId
是从我的客户端的
X-User-ID
发过去的,whatever,当我们在
ChannelInterceptor#preSend()
方法获得这个id时,如何存储好让我们在后续的其他地方拿到。
如果你是在
@MessageMapping
方法里想获取这个信息,那么很方便。
@MessageMapping("/hello")
public void hello(Principal principal) {
String userId = principal.getName();
}
如果你要在其他业务类获取,自己在
ChannelInterceptor#preSend()
里想办法把这个
userId
存储起来,和
sessionId
关联也好,和什么关联也好。
如何在服务端关闭会话连接
使用
stomp
时你会发现在客户端关闭连接是非常容易的,但是在服务端关闭就比较麻烦,首先我们要获得
WebSocketSession
这个对象保存起来,继续回到我们的
WebSocketConfig
配置类。
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
@Override
public WebSocketHandler decorate(final WebSocketHandler handler) {
return new WebSocketHandlerDecorator(handler) {
@Override
public void afterConnectionEstablished(final WebSocketSession session) throws Exception {
super.afterConnectionEstablished(session);
// 在这里将session存起来,根据session.id存在一个map也好,后面我们可以根据sessionId来获得这个session
}
};
}
});
}
有了
WebSocketSession
,就可以根据sessionId获取对应的session,并调用
close()
方法来关闭连接。
WebSocket事件
假如我们想知道连接断开的消息。
@Component
public class SessionDisconnectedEventListener implements ApplicationListener<SessionDisconnectEvent> {
private static Logger log = LogManager.getLogger(SessionDisconnectedEventListener.class);
@Override
public void onApplicationEvent(SessionDisconnectEvent e) {
log.debug("client disconnect: {}", e.getSessionId());
}
}
其他事件看下面这个连接:
https://docs.spring.io/spring-framework/docs/5.0.0.M1/spring-framework-reference/html/websocket.html#websocket-stomp-appplication-context-events
素巴拉西,以后再也不怕忘记怎么写weboscket了。