天天看点

spring boot websocket stomp

本文描述在spring boot中如何使用基于stomp实现即时通信的功能。

  • spring boot 版本:2.1.2.RELEASE
  • jdk8

Maven pom.xml

<parent>
	<groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.2.RELEASE</version>
</parent>
<dependencies>
	<dependency>
	    <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-json</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
</dependencies>
           

基本配置

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    /**
     * broker定义成常量,其他地方也会用到
     */
    public static final String BROKER_SYSTEM_MSG = "/system/msg";

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // 注1
        config.enableSimpleBroker(BROKER_SYSTEM_MSG);
        // 注2
        config.setApplicationDestinationPrefixes("/app");
        // 注3
        config.setUserDestinationPrefix("/user");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注4
        registry.addEndpoint("/stomp")
                .setAllowedOrigins("*")
                .withSockJS();
    }
}
           
  • 注1

    定义订阅者节点,也就是客户端可以订阅哪些地址,如果不在这里定义的话,服务端是不能发消息给客户端的。

  • 注2

    客户端发消息给服务端的地址前缀,比如你在服务端定义了一个请求地址为

    /hello

    ,那么客户端发消息给你的时候,用的地址就是

    /app/hello

  • 注3

    服务端发消息给指定客户端的前缀,比如你定义了一个broker叫

    /system/msg

    ,如果你在服务端发消息用

    /system/msg

    地址时,消息是以广播方式发送的,也就是所有订阅了这个地址的客户端都能收到,而如果你用

    /user/system/msg

    这个地址来发送的话,就是发送给指定的客户端,我们目前的配置是无法发消息给指定客户端的,后面会说到怎么配置可以发消息给指定客户端。
  • 注4

    stomp

    的连接节点,在我们要订阅或发送消息之前,首先我们要和服务端连接上,这里就是定义一个连接的节点。
new SockJS("http://host:port/stomp")
           

在开发的时候

setAllowedOrigins("*")

是非常方便的,生产环境请自行设置限制。

客户端如何订阅和发送消息,以及服务端如何广播消息

@Controller
public class StompController {
	@MessageMapping("/hello")
	@SendTo("/system/msg")
	String hello(@Payload String name) {
	    System.out.println("hello " + name);
		return "fu*k away";
	}
}
           
  • @MessageMapping

    RequestMapping

    类似,定义一个接口,客户端可以发送消息到这个地址上。
// 客户端使用的是stomp-js v5
// https://stomp-js.github.io/guide/stompjs/2018/06/28/using-stompjs-v5.html
client.publish({destination: '/app/hello', body: 'world'})
           

还记得我们在

config.setApplicationDestinationPrefixes("/app")

定义了前缀,所以客户端发送消息的地址要加上这个前缀。

  • @SendTo

    服务端广播消息给客户端,这里的地址必须是在

    config.enableSimpleBroker()

    定义过的,否则消息无法到达客户端。下面是客户端订阅消息,这样就可以收到服务端的广播了。
// 客户端使用的是stomp-js v5
// https://stomp-js.github.io/guide/stompjs/2018/06/28/using-stompjs-v5.html
client.subscribe('/system/msg', message => {
	console.log(message)
})
           
  • @Payload String name

    客户端发过来的参数,也就是

    client.publish

    函数里的

    body

    参数。

以上,最基本的

stomp

就算完成了,可以实现客户端订阅消息,服务端广播消息。在我们定义的

Controller

里,客户端发送消息过来后,服务端就会立即广播一个消息给客户端,但是有一些时候我们希望在任何地方,任何时刻都可以广播消息给客户端。

如何你不想在@MessageMapping的方法里广播消息,将该方法的@SendTo和返回定义去掉就好了。

“手动”广播消息给客户端

比如我们有某个定时任务,完成任务后,需要广播一个消息给客户端。

@Service
public class MyTask {
	@Autowire
	private SimpMessagingTemplate messagingTemplate;

	public void doTask() {
		// 你的业务代码
		Map<String, Object> msg = new HashMap<>();
		msg.put("name", "zhangsan");
		messagingTemplate.convertAndSend("/system/msg", msg);
	}
}
           

SimpMessagingTemplate

这个类就是可以让我们自由自在广播消息的罪魁祸首,这个Bean直接注入就好,这是spring已经在内部定义好的。

convertAndSend()

方法的第一个参数就是我们定义的

broker

地址,再说一遍,不事先在

config.enableSimpleBroker()

定义好这个地址是没有未来的。

如何发送消息给指定的客户端

spring发送指定消息给客户端是依赖

org.springframework.http.server.ServerHttpRequest#getPrincipal()

来确定用户的身份,一般的,如果我们的用户是已经登录的,可以用用户的业务ID作为

Principal

,如果是未登录用户,可以随机生成一个唯一标示作为

Principal

也可以,这个需要根据你的业务场景来定制,这里我们只需要知道如何将自己的数据作为

Principal

交给spring即可。

public class CustomChannelInterceptor implements ChannelInterceptor {

    private static final Logger log = LogManager.getLogger(CustomChannelInterceptor.class);

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
        	List<String> tmp = accessor.getNativeHeader("X-User-ID");
        	String userId = tmp == null || tmp.size() == 0 ? null : tmp.get(0);
            accessor.setUser(() -> userId);
            log.debug("client user id: {}", userId);
        }
        return message;
    }
}
           

X-User-ID

是客户端发送的自定义头信息,这个ID你随便写都没问题,只要保证唯一即可,前面也说过,这里得依赖你的业务场景,也和你应用的安全框架实现有关,但不管如何,只要将你的唯一凭证交给

accessor.setUser()

即可,这样spring就认识当前这个客户端了。

然后回到我们的

WebSocketConfig

配置类中。

@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
	registration.interceptors(new CustomChannelInterceptor());
}
           

再看如何发送消息给指定客户端。

  • 通过

    SimpMessagingTemplate

    来发送
@Autowire
private SimpMessagingTemplate messagingTemplate;
...
String userId = ...; // 还记得前面的X-User-ID吗,就是那个东西
String destination = "/system/msg"; // broker那里定义的
Map<String, Object> payload = ...; // 发给客户端的参数
messagingTemplate.convertAndSendToUser(userId, destination, payload);
           
  • 通过

    @SendToUser

    来发送
@MessageMapping("/hello")
@SendToUser(value = "/system/msg", broadcast = false)
public Object hello() {
  return "fu*k away";
}
           

下面看客户端的代码,这里给出客户端的完整代码。

// 客户端使用的是stomp-js v5
// https://stomp-js.github.io/guide/stompjs/2018/06/28/using-stompjs-v5.html
import SockJS from 'sockjs-client'
import { Client } from '@stomp/stompjs'

let client = new Client({
	connectHeaders: {
	  'X-User-ID': 'xingshang' // 测试用写死了一个用户ID
	},
	debug: function (str) {
	  console.log(str)
	},
	reconnectDelay: 5000,
	heartbeatIncoming: 4000,
	heartbeatOutgoing: 4000
});

client.webSocketFactory = function () {
	return new SockJS("http://localhost:9780/stomp")
};
client.onConnect = () => {
  console.log('on connect')
  client.subscribe("/user/system/msg", message => {
    // user前缀,服务端只会发给我自己
  })
  client.subscribe('/system/msg', message => {
    // 没有user前缀,广播消息,服务端发的消息,任何人订阅这个地址都会收到消息
  })
}
client.activate() // 开始与服务端连接

someButton.click = () => {
  client.publish({
    destination: '/app/hello', // 发给服务端的消息用app开头
    body: 'world' // body只能是字符串,如果要发送结构,用JSON.stringify()转换
  })
}
           

至此,发送给指定客户端消息的代码完结。但还有件小事,前面用

messagingTemplate.convertAndSendToUser(userId, destination, payload)

发消息时,这里的

userId

如何获得,我们知道这个

userId

是从我的客户端的

X-User-ID

发过去的,whatever,当我们在

ChannelInterceptor#preSend()

方法获得这个id时,如何存储好让我们在后续的其他地方拿到。

如果你是在

@MessageMapping

方法里想获取这个信息,那么很方便。

@MessageMapping("/hello")
public void hello(Principal principal) {
  String userId = principal.getName();
}
           

如果你要在其他业务类获取,自己在

ChannelInterceptor#preSend()

里想办法把这个

userId

存储起来,和

sessionId

关联也好,和什么关联也好。

如何在服务端关闭会话连接

使用

stomp

时你会发现在客户端关闭连接是非常容易的,但是在服务端关闭就比较麻烦,首先我们要获得

WebSocketSession

这个对象保存起来,继续回到我们的

WebSocketConfig

配置类。

@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
    registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
        @Override
        public WebSocketHandler decorate(final WebSocketHandler handler) {
            return new WebSocketHandlerDecorator(handler) {
                @Override
                public void afterConnectionEstablished(final WebSocketSession session) throws Exception {
                    super.afterConnectionEstablished(session);
                    // 在这里将session存起来,根据session.id存在一个map也好,后面我们可以根据sessionId来获得这个session
                }
            };
        }
    });
}
           

有了

WebSocketSession

,就可以根据sessionId获取对应的session,并调用

close()

方法来关闭连接。

WebSocket事件

假如我们想知道连接断开的消息。

@Component
public class SessionDisconnectedEventListener implements ApplicationListener<SessionDisconnectEvent> {

    private static Logger log = LogManager.getLogger(SessionDisconnectedEventListener.class);

    @Override
    public void onApplicationEvent(SessionDisconnectEvent e) {
        log.debug("client disconnect: {}", e.getSessionId());
    }
}
           

其他事件看下面这个连接:

https://docs.spring.io/spring-framework/docs/5.0.0.M1/spring-framework-reference/html/websocket.html#websocket-stomp-appplication-context-events

素巴拉西,以后再也不怕忘记怎么写weboscket了。