天天看點

【websocket】spring boot 內建 websocket 的四種方式

【websocket】spring boot 內建 websocket 的四種方式

內建 websocket 的四種方案

1. 原生注解

pom.xml

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
           

WebSocketConfig

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author buhao
 * @version WebSocketConfig.java, v 0.1 2019-10-18 15:45 buhao
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpoint() {
        return new ServerEndpointExporter();
    }
}
           
說明:

這個配置類很簡單,通過這個配置 spring boot 才能去掃描後面的關于 websocket 的注解

WsServerEndpoint

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.ws;

import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * @author buhao
 * @version WsServerEndpoint.java, v 0.1 2019-10-18 16:06 buhao
 */
@ServerEndpoint("/myWs")
@Component
public class WsServerEndpoint {

    /**
     * 連接配接成功
     *
     * @param session
     */
    @OnOpen
    public void onOpen(Session session) {
        System.out.println("連接配接成功");
    }

    /**
     * 連接配接關閉
     *
     * @param session
     */
    @OnClose
    public void onClose(Session session) {
        System.out.println("連接配接關閉");
    }

    /**
     * 接收到消息
     *
     * @param text
     */
    @OnMessage
    public String onMsg(String text) throws IOException {
        return "servet 發送:" + text;
    }
}
           
說明

這裡有幾個注解需要注意一下,首先是他們的包都在 **javax.websocket **下。并不是 spring 提供的,而 jdk 自帶的,下面是他們的具體作用。

  1. @ServerEndpoint
  2. 通過這個 spring boot 就可以知道你暴露出去的 ws 應用的路徑,有點類似我們經常用的@RequestMapping。比如你的啟動端口是8080,而這個注解的值是ws,那我們就可以通過 ws://127.0.0.1:8080/ws 來連接配接你的應用
  3. @OnOpen
  4. 當 websocket 建立連接配接成功後會觸發這個注解修飾的方法,注意它有一個 Session 參數
  5. @OnClose
  6. 當 websocket 建立的連接配接斷開後會觸發這個注解修飾的方法,注意它有一個 Session 參數
  7. @OnMessage
  8. 當用戶端發送消息到服務端時,會觸發這個注解修改的方法,它有一個 String 入參表明用戶端傳入的值
  9. @OnError
  10. 當 websocket 建立連接配接時出現異常會觸發這個注解修飾的方法,注意它有一個 Session 參數

另外一點就是服務端如何發送消息給用戶端,服務端發送消息必須通過上面說的 Session 類,通常是在@OnOpen 方法中,當連接配接成功後把 session 存入 Map 的 value,key 是與 session 對應的使用者辨別,當要發送的時候通過 key 獲得 session 再發送,這裡可以通過 session.getBasicRemote_().sendText(_) 來對用戶端發送消息。

2. Spring封裝

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
           

HttpAuthHandler

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.handler;

import cn.coder4j.study.example.websocket.config.WsSessionManager;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.time.LocalDateTime;

/**
 * @author buhao
 * @version MyWSHandler.java, v 0.1 2019-10-17 17:10 buhao
 */
@Component
public class HttpAuthHandler extends TextWebSocketHandler {

    /**
     * socket 建立成功事件
     *
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        Object token = session.getAttributes().get("token");
        if (token != null) {
            // 使用者連接配接成功,放入線上使用者緩存
            WsSessionManager.add(token.toString(), session);
        } else {
            throw new RuntimeException("使用者登入已經失效!");
        }
    }

    /**
     * 接收消息事件
     *
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        // 獲得用戶端傳來的消息
        String payload = message.getPayload();
        Object token = session.getAttributes().get("token");
        System.out.println("server 接收到 " + token + " 發送的 " + payload);
        session.sendMessage(new TextMessage("server 發送給 " + token + " 消息 " + payload + " " + LocalDateTime.now().toString()));
    }

    /**
     * socket 斷開連接配接時
     *
     * @param session
     * @param status
     * @throws Exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        Object token = session.getAttributes().get("token");
        if (token != null) {
            // 使用者退出,移除緩存
            WsSessionManager.remove(token.toString());
        }
    }


}
           

通過繼承 TextWebSocketHandler 類并覆寫相應方法,可以對 websocket 的事件進行處理,這裡可以同原生注解的那幾個注解連起來看

  1. afterConnectionEstablished 方法是在 socket 連接配接成功後被觸發,同原生注解裡的 @OnOpen 功能
  2. **afterConnectionClosed  **方法是在 socket 連接配接關閉後被觸發,同原生注解裡的 @OnClose 功能
  3. **handleTextMessage **方法是在用戶端發送資訊時觸發,同原生注解裡的 @OnMessage 功能

WsSessionManager

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author buhao
 * @version WsSessionManager.java, v 0.1 2019-10-22 10:24 buhao
 */
@Slf4j
public class WsSessionManager {
    /**
     * 儲存連接配接 session 的地方
     */
    private static ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();

    /**
     * 添加 session
     *
     * @param key
     */
    public static void add(String key, WebSocketSession session) {
        // 添加 session
        SESSION_POOL.put(key, session);
    }

    /**
     * 删除 session,會傳回删除的 session
     *
     * @param key
     * @return
     */
    public static WebSocketSession remove(String key) {
        // 删除 session
        return SESSION_POOL.remove(key);
    }

    /**
     * 删除并同步關閉連接配接
     *
     * @param key
     */
    public static void removeAndClose(String key) {
        WebSocketSession session = remove(key);
        if (session != null) {
            try {
                // 關閉連接配接
                session.close();
            } catch (IOException e) {
                // todo: 關閉出現異常處理
                e.printStackTrace();
            }
        }
    }

    /**
     * 獲得 session
     *
     * @param key
     * @return
     */
    public static WebSocketSession get(String key) {
        // 獲得 session
        return SESSION_POOL.get(key);
    }
}
           

這裡簡單通過 **ConcurrentHashMap **來實作了一個 session 池,用來儲存已經登入的 web socket 的  session。前文提過,服務端發送消息給用戶端必須要通過這個 session。

MyInterceptor

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.interceptor;

import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

import java.util.HashMap;
import java.util.Map;

/**
 * @author buhao
 * @version MyInterceptor.java, v 0.1 2019-10-17 19:21 buhao
 */
@Component
public class MyInterceptor implements HandshakeInterceptor {

    /**
     * 握手前
     *
     * @param request
     * @param response
     * @param wsHandler
     * @param attributes
     * @return
     * @throws Exception
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        System.out.println("握手開始");
        // 獲得請求參數
        HashMap<String, String> paramMap = HttpUtil.decodeParamMap(request.getURI().getQuery(), "utf-8");
        String uid = paramMap.get("token");
        if (StrUtil.isNotBlank(uid)) {
            // 放入屬性域
            attributes.put("token", uid);
            System.out.println("使用者 token " + uid + " 握手成功!");
            return true;
        }
        System.out.println("使用者登入已失效");
        return false;
    }

    /**
     * 握手後
     *
     * @param request
     * @param response
     * @param wsHandler
     * @param exception
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
        System.out.println("握手完成");
    }

}
           

通過實作 HandshakeInterceptor 接口來定義握手攔截器,注意這裡與上面 Handler 的事件是不同的,這裡是建立握手時的事件,分為握手前與握手後,而  Handler 的事件是在握手成功後的基礎上建立 socket 的連接配接。是以在如果把認證放在這個步驟相對來說最節省伺服器資源。它主要有兩個方法 beforeHandshake 與 **afterHandshake **,顧名思義一個在握手前觸發,一個在握手後觸發。

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.config;

import cn.coder4j.study.example.websocket.handler.HttpAuthHandler;
import cn.coder4j.study.example.websocket.interceptor.MyInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

/**
 * @author buhao
 * @version WebSocketConfig.java, v 0.1 2019-10-17 15:43 buhao
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Autowired
    private HttpAuthHandler httpAuthHandler;
    @Autowired
    private MyInterceptor myInterceptor;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry
                .addHandler(httpAuthHandler, "myWS")
                .addInterceptors(myInterceptor)
                .setAllowedOrigins("*");
    }
}
           

通過實作 WebSocketConfigurer 類并覆寫相應的方法進行 websocket 的配置。我們主要覆寫 registerWebSocketHandlers 這個方法。通過向 WebSocketHandlerRegistry 設定不同參數來進行配置。其中 **addHandler 方法添加我們上面的寫的 ws 的  handler 處理類,第二個參數是你暴露出的 ws 路徑。addInterceptors 添加我們寫的握手過濾器。setAllowedOrigins("*") **這個是關閉跨域校驗,友善本地調試,線上推薦打開。

3. TIO

<dependency>
     <groupId>org.t-io</groupId>
     <artifactId>tio-websocket-spring-boot-starter</artifactId>
     <version>3.5.5.v20191010-RELEASE</version>
</dependency>
           

application.xml

tio:
  websocket:
    server:
      port: 8989
           

這裡隻配置了 ws 的啟動端口,還有很多配置,可以通過結尾給的連結去尋找

MyHandler

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.handler;

import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.server.handler.IWsMsgHandler;

/**
 * @author buhao
 * @version MyHandler.java, v 0.1 2019-10-21 14:39 buhao
 */
@Component
public class MyHandler implements IWsMsgHandler {
    /**
     * 握手
     *
     * @param httpRequest
     * @param httpResponse
     * @param channelContext
     * @return
     * @throws Exception
     */
    @Override
    public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
        return httpResponse;
    }

    /**
     * 握手成功
     *
     * @param httpRequest
     * @param httpResponse
     * @param channelContext
     * @throws Exception
     */
    @Override
    public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
        System.out.println("握手成功");
    }

    /**
     * 接收二進制檔案
     *
     * @param wsRequest
     * @param bytes
     * @param channelContext
     * @return
     * @throws Exception
     */
    @Override
    public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
        return null;
    }

    /**
     * 斷開連接配接
     *
     * @param wsRequest
     * @param bytes
     * @param channelContext
     * @return
     * @throws Exception
     */
    @Override
    public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
        System.out.println("關閉連接配接");
        return null;
    }

    /**
     * 接收消息
     *
     * @param wsRequest
     * @param s
     * @param channelContext
     * @return
     * @throws Exception
     */
    @Override
    public Object onText(WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception {
        System.out.println("接收文本消息:" + s);
        return "success";
    }
}
           

這個同上個例子中的 handler 很像,也是通過實作接口覆寫方法來進行事件處理,實作的接口是IWsMsgHandler,它的方法功能如下

  1. handshake
  2. 在握手的時候觸發
  3. onAfterHandshaked
  4. 在握手成功後觸發
  5. onBytes
  6. 用戶端發送二進制消息觸發
  7. onClose
  8. 用戶端關閉連接配接時觸發
  9. onText
  10. 用戶端發送文本消息觸發

StudyWebsocketExampleApplication

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */

package cn.coder4j.study.example.websocket;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.tio.websocket.starter.EnableTioWebSocketServer;

@SpringBootApplication
@EnableTioWebSocketServer
public class StudyWebsocketExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(StudyWebsocketExampleApplication.class, args);
    }
}

           

這個類的名稱不重要,它其實是你的 spring boot 啟動類,隻要記得加上@EnableTioWebSocketServer注解就可以了

STOMP

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
           

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * @author buhao
 * @version WebSocketConfig.java, v 0.1 2019-10-21 16:32 buhao
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 配置用戶端嘗試連接配接位址
        registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 設定廣播節點
        registry.enableSimpleBroker("/topic", "/user");
        // 用戶端向服務端發送消息需有/app 字首
        registry.setApplicationDestinationPrefixes("/app");
        // 指定使用者發送(一對一)的字首 /user/
        registry.setUserDestinationPrefix("/user/");
    }
}
           
  1. 通過實作 WebSocketMessageBrokerConfigurer 接口和加上@EnableWebSocketMessageBroker來進行 stomp 的配置與注解掃描。
  2. 其中覆寫 registerStompEndpoints 方法來設定暴露的 stomp 的路徑,其它一些跨域、用戶端之類的設定。
  3. 覆寫 **configureMessageBroker **方法來進行節點的配置。
  4. 其中 **enableSimpleBroker **配置的廣播節點,也就是服務端發送消息,用戶端訂閱就能接收消息的節點。
  5. 覆寫**setApplicationDestinationPrefixes **方法,設定用戶端向服務端發送消息的節點。
  6. 覆寫 setUserDestinationPrefix 方法,設定一對一通信的節點。

WSController

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.controller;

import cn.coder4j.study.example.websocket.model.RequestMessage;
import cn.coder4j.study.example.websocket.model.ResponseMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * @author buhao
 * @version WSController.java, v 0.1 2019-10-21 17:22 buhao
 */
@Controller
public class WSController {

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @MessageMapping("/hello")
    @SendTo("/topic/hello")
    public ResponseMessage hello(RequestMessage requestMessage) {
        System.out.println("接收消息:" + requestMessage);
        return new ResponseMessage("服務端接收到你發的:" + requestMessage);
    }

    @GetMapping("/sendMsgByUser")
    public @ResponseBody
    Object sendMsgByUser(String token, String msg) {
        simpMessagingTemplate.convertAndSendToUser(token, "/msg", msg);
        return "success";
    }

    @GetMapping("/sendMsgByAll")
    public @ResponseBody
    Object sendMsgByAll(String msg) {
        simpMessagingTemplate.convertAndSend("/topic", msg);
        return "success";
    }

    @GetMapping("/test")
    public String test() {
        return "test-stomp.html";
    }
}
           
  1. 通過 @MessageMapping 來暴露節點路徑,有點類似 @RequestMapping。注意這裡雖然寫的是 hello ,但是我們用戶端調用的真正位址是** /app/hello。 因為我們在上面的 config 裡配置了registry.setApplicationDestinationPrefixes("/app")**。
  2. @SendTo這個注解會把傳回值的内容發送給訂閱了 /topic/hello 的用戶端,與之類似的還有一個@SendToUser 隻不過他是發送給使用者端一對一通信的。這兩個注解一般是應答時響應的,如果服務端主動發送消息可以通過 simpMessagingTemplate類的convertAndSend方法。注意 simpMessagingTemplate.convertAndSendToUser(token, "/msg", msg) ,聯系到我們上文配置的 registry.setUserDestinationPrefix("/user/"),這裡用戶端訂閱的是/user/{token}/msg,千萬不要搞錯。

Session 共享的問題

上面反複提到一個問題就是,服務端如果要主動發送消息給用戶端一定要用到 session。而大家都知道的是 session 這個東西是不跨 jvm 的。如果有多台伺服器,在 http 請求的情況下,我們可以通過把 session 放入緩存中間件中來共享解決這個問題,通過 spring session 幾條配置就解決了。但是 web socket  不可以。他的 session 是不能序列化的,當然這樣設計的目的不是為了為難你,而是出于對 http 與 web socket 請求的差異導緻的。

目前網上找到的最簡單方案就是通過 redis 訂閱廣播的形式,主要代碼跟第二種方式差不多,你要在本地放個 map 儲存請求的 session。也就是說每台伺服器都會儲存與他連接配接的 session 于本地。然後發消息的地方要修改,并不是現在這樣直接發送,而通過 redis 的訂閱機制。伺服器要發消息的時候,你通過 redis 廣播這條消息,所有訂閱的服務端都會收到這個消息,然後本地嘗試發送。最後肯定隻有有這個對應使用者 session 的那台才能發送出去。

如何選擇

  1. 如果你在使用 tio,那推薦使用 tio 的內建。因為它已經實作了很多功能,包括上面說的通過 redis 的 session 共享,隻要加幾個配置就可以了。但是 tio 是半開源,文檔是需要收費的。如果沒有使用,那就忘了他。
  2. 如果你的業務要求比較靈活多變,推薦使用前兩種,更推薦第二種 Spring 封裝的形式。
  3. 如果隻是簡單的伺服器雙向通信,推薦 stomp 的形式,因為他更容易規範使用。

其它

  1. websocket 線上驗證

寫完服務端代碼後想調試,但是不會前端代碼怎麼辦,點這裡,這是一個線上的 websocket 用戶端,功能完全夠我們調試了。

  1. stomp 驗證

這個沒找到線上版的,但是網上有很多 demo 可以下載下傳到本地進行調試,也可以通過後文的連接配接找到。

  1. 另外由于篇幅有限,并不能放上所有代碼,但是測試代碼全都上傳 gitlab,保證可以正常運作,可以在 這裡 找到

參考連結

  1. SpringBoot 系統 - 內建 WebSocket 實時通信
  2. WebSocket 的故事(二)—— Spring 中如何利用 STOMP 快速建構 WebSocket 廣播式消息模式
  3. SpringBoot內建WebSocket【基于純H5】進行點對點[一對一]和廣播[一對多]實時推送
  4. Spring Framework 參考文檔(WebSocket STOMP)
  5. Spring Boot中使用WebSocket總結(一):幾種實作方式詳解
  6. Spring Boot 系列 - WebSocket 簡單使用
  7. tio-websocket-spring-boot-starter