天天看點

spring boot websocket stomp

本文描述在spring boot中如何使用基于stomp實作即時通信的功能。

  • spring boot 版本:2.1.2.RELEASE
  • jdk8

Maven pom.xml

<parent>
	<groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.2.RELEASE</version>
</parent>
<dependencies>
	<dependency>
	    <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-json</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
</dependencies>
           

基本配置

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    /**
     * broker定義成常量,其他地方也會用到
     */
    public static final String BROKER_SYSTEM_MSG = "/system/msg";

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // 注1
        config.enableSimpleBroker(BROKER_SYSTEM_MSG);
        // 注2
        config.setApplicationDestinationPrefixes("/app");
        // 注3
        config.setUserDestinationPrefix("/user");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注4
        registry.addEndpoint("/stomp")
                .setAllowedOrigins("*")
                .withSockJS();
    }
}
           
  • 注1

    定義訂閱者節點,也就是用戶端可以訂閱哪些位址,如果不在這裡定義的話,服務端是不能發消息給用戶端的。

  • 注2

    用戶端發消息給服務端的位址字首,比如你在服務端定義了一個請求位址為

    /hello

    ,那麼用戶端發消息給你的時候,用的位址就是

    /app/hello

  • 注3

    服務端發消息給指定用戶端的字首,比如你定義了一個broker叫

    /system/msg

    ,如果你在服務端發消息用

    /system/msg

    位址時,消息是以廣播方式發送的,也就是所有訂閱了這個位址的用戶端都能收到,而如果你用

    /user/system/msg

    這個位址來發送的話,就是發送給指定的用戶端,我們目前的配置是無法發消息給指定用戶端的,後面會說到怎麼配置可以發消息給指定用戶端。
  • 注4

    stomp

    的連接配接節點,在我們要訂閱或發送消息之前,首先我們要和服務端連接配接上,這裡就是定義一個連接配接的節點。
new SockJS("http://host:port/stomp")
           

在開發的時候

setAllowedOrigins("*")

是非常友善的,生産環境請自行設定限制。

用戶端如何訂閱和發送消息,以及服務端如何廣播消息

@Controller
public class StompController {
	@MessageMapping("/hello")
	@SendTo("/system/msg")
	String hello(@Payload String name) {
	    System.out.println("hello " + name);
		return "fu*k away";
	}
}
           
  • @MessageMapping

    RequestMapping

    類似,定義一個接口,用戶端可以發送消息到這個位址上。
// 用戶端使用的是stomp-js v5
// https://stomp-js.github.io/guide/stompjs/2018/06/28/using-stompjs-v5.html
client.publish({destination: '/app/hello', body: 'world'})
           

還記得我們在

config.setApplicationDestinationPrefixes("/app")

定義了字首,是以用戶端發送消息的位址要加上這個字首。

  • @SendTo

    服務端廣播消息給用戶端,這裡的位址必須是在

    config.enableSimpleBroker()

    定義過的,否則消息無法到達用戶端。下面是用戶端訂閱消息,這樣就可以收到服務端的廣播了。
// 用戶端使用的是stomp-js v5
// https://stomp-js.github.io/guide/stompjs/2018/06/28/using-stompjs-v5.html
client.subscribe('/system/msg', message => {
	console.log(message)
})
           
  • @Payload String name

    用戶端發過來的參數,也就是

    client.publish

    函數裡的

    body

    參數。

以上,最基本的

stomp

就算完成了,可以實作用戶端訂閱消息,服務端廣播消息。在我們定義的

Controller

裡,用戶端發送消息過來後,服務端就會立即廣播一個消息給用戶端,但是有一些時候我們希望在任何地方,任何時刻都可以廣播消息給用戶端。

如何你不想在@MessageMapping的方法裡廣播消息,将該方法的@SendTo和傳回定義去掉就好了。

“手動”廣播消息給用戶端

比如我們有某個定時任務,完成任務後,需要廣播一個消息給用戶端。

@Service
public class MyTask {
	@Autowire
	private SimpMessagingTemplate messagingTemplate;

	public void doTask() {
		// 你的業務代碼
		Map<String, Object> msg = new HashMap<>();
		msg.put("name", "zhangsan");
		messagingTemplate.convertAndSend("/system/msg", msg);
	}
}
           

SimpMessagingTemplate

這個類就是可以讓我們自由自在廣播消息的罪魁禍首,這個Bean直接注入就好,這是spring已經在内部定義好的。

convertAndSend()

方法的第一個參數就是我們定義的

broker

位址,再說一遍,不事先在

config.enableSimpleBroker()

定義好這個位址是沒有未來的。

如何發送消息給指定的用戶端

spring發送指定消息給用戶端是依賴

org.springframework.http.server.ServerHttpRequest#getPrincipal()

來确定使用者的身份,一般的,如果我們的使用者是已經登入的,可以用使用者的業務ID作為

Principal

,如果是未登入使用者,可以随機生成一個唯一标示作為

Principal

也可以,這個需要根據你的業務場景來定制,這裡我們隻需要知道如何将自己的資料作為

Principal

交給spring即可。

public class CustomChannelInterceptor implements ChannelInterceptor {

    private static final Logger log = LogManager.getLogger(CustomChannelInterceptor.class);

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
        	List<String> tmp = accessor.getNativeHeader("X-User-ID");
        	String userId = tmp == null || tmp.size() == 0 ? null : tmp.get(0);
            accessor.setUser(() -> userId);
            log.debug("client user id: {}", userId);
        }
        return message;
    }
}
           

X-User-ID

是用戶端發送的自定義頭資訊,這個ID你随便寫都沒問題,隻要保證唯一即可,前面也說過,這裡得依賴你的業務場景,也和你應用的安全架構實作有關,但不管如何,隻要将你的唯一憑證交給

accessor.setUser()

即可,這樣spring就認識目前這個用戶端了。

然後回到我們的

WebSocketConfig

配置類中。

@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
	registration.interceptors(new CustomChannelInterceptor());
}
           

再看如何發送消息給指定用戶端。

  • 通過

    SimpMessagingTemplate

    來發送
@Autowire
private SimpMessagingTemplate messagingTemplate;
...
String userId = ...; // 還記得前面的X-User-ID嗎,就是那個東西
String destination = "/system/msg"; // broker那裡定義的
Map<String, Object> payload = ...; // 發給用戶端的參數
messagingTemplate.convertAndSendToUser(userId, destination, payload);
           
  • 通過

    @SendToUser

    來發送
@MessageMapping("/hello")
@SendToUser(value = "/system/msg", broadcast = false)
public Object hello() {
  return "fu*k away";
}
           

下面看用戶端的代碼,這裡給出用戶端的完整代碼。

// 用戶端使用的是stomp-js v5
// https://stomp-js.github.io/guide/stompjs/2018/06/28/using-stompjs-v5.html
import SockJS from 'sockjs-client'
import { Client } from '@stomp/stompjs'

let client = new Client({
	connectHeaders: {
	  'X-User-ID': 'xingshang' // 測試用寫死了一個使用者ID
	},
	debug: function (str) {
	  console.log(str)
	},
	reconnectDelay: 5000,
	heartbeatIncoming: 4000,
	heartbeatOutgoing: 4000
});

client.webSocketFactory = function () {
	return new SockJS("http://localhost:9780/stomp")
};
client.onConnect = () => {
  console.log('on connect')
  client.subscribe("/user/system/msg", message => {
    // user字首,服務端隻會發給我自己
  })
  client.subscribe('/system/msg', message => {
    // 沒有user字首,廣播消息,服務端發的消息,任何人訂閱這個位址都會收到消息
  })
}
client.activate() // 開始與服務端連接配接

someButton.click = () => {
  client.publish({
    destination: '/app/hello', // 發給服務端的消息用app開頭
    body: 'world' // body隻能是字元串,如果要發送結構,用JSON.stringify()轉換
  })
}
           

至此,發送給指定用戶端消息的代碼完結。但還有件小事,前面用

messagingTemplate.convertAndSendToUser(userId, destination, payload)

發消息時,這裡的

userId

如何獲得,我們知道這個

userId

是從我的用戶端的

X-User-ID

發過去的,whatever,當我們在

ChannelInterceptor#preSend()

方法獲得這個id時,如何存儲好讓我們在後續的其他地方拿到。

如果你是在

@MessageMapping

方法裡想擷取這個資訊,那麼很友善。

@MessageMapping("/hello")
public void hello(Principal principal) {
  String userId = principal.getName();
}
           

如果你要在其他業務類擷取,自己在

ChannelInterceptor#preSend()

裡想辦法把這個

userId

存儲起來,和

sessionId

關聯也好,和什麼關聯也好。

如何在服務端關閉會話連接配接

使用

stomp

時你會發現在用戶端關閉連接配接是非常容易的,但是在服務端關閉就比較麻煩,首先我們要獲得

WebSocketSession

這個對象儲存起來,繼續回到我們的

WebSocketConfig

配置類。

@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
    registry.addDecoratorFactory(new WebSocketHandlerDecoratorFactory() {
        @Override
        public WebSocketHandler decorate(final WebSocketHandler handler) {
            return new WebSocketHandlerDecorator(handler) {
                @Override
                public void afterConnectionEstablished(final WebSocketSession session) throws Exception {
                    super.afterConnectionEstablished(session);
                    // 在這裡将session存起來,根據session.id存在一個map也好,後面我們可以根據sessionId來獲得這個session
                }
            };
        }
    });
}
           

有了

WebSocketSession

,就可以根據sessionId擷取對應的session,并調用

close()

方法來關閉連接配接。

WebSocket事件

假如我們想知道連接配接斷開的消息。

@Component
public class SessionDisconnectedEventListener implements ApplicationListener<SessionDisconnectEvent> {

    private static Logger log = LogManager.getLogger(SessionDisconnectedEventListener.class);

    @Override
    public void onApplicationEvent(SessionDisconnectEvent e) {
        log.debug("client disconnect: {}", e.getSessionId());
    }
}
           

其他事件看下面這個連接配接:

https://docs.spring.io/spring-framework/docs/5.0.0.M1/spring-framework-reference/html/websocket.html#websocket-stomp-appplication-context-events

素巴拉西,以後再也不怕忘記怎麼寫weboscket了。