天天看点

spring boot通过stomp协议实现通信功能

一、stomp协议简介

STOMP是简单(或流式)文本定向消息传递协议。

STOMP提供可互操作的有线格式,以便STOMP客户端可以与任何STOMP消息代理进行通信,从而在多种语言,平台和代理之间提供简单而广泛的消息传递互操作性。

二、在spring boot开发基于stomp的通信功能

1、在pom.xml中添加依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-messaging</artifactId>
        </dependency>
</dependencies>
           

如果你的spring boot版本高于1.5的话,要在配置文件中添加以下设置,否则在启动时会报错,因为高于1.5的它里面已经定义了subProtocolWebSocketHandler这个bean,与我们后面要定义的冲突了。

spring.main.allow-bean-definition-overriding=true
           

2、stomp配置

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig  extends WebSocketMessageBrokerConfigurationSupport implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        super.configureWebSocketTransport(registry);
    }

    @Override
    public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
        return super.configureMessageConverters(messageConverters);
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        super.configureClientInboundChannel(registration);
    }

    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        super.configureClientOutboundChannel(registration);
    }


    @Override
    public void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
        super.addArgumentResolvers(argumentResolvers);
    }

    @Override
    public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
        super.addReturnValueHandlers(returnValueHandlers);
    }

    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stomp")
        .setHandshakeHandler(new DefaultHandshakeHandler() {
            @Override
            protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
                  //握手成功后调用,可以在这里保存用户id
                return new Principal(((ServletServerHttpRequest) request).getServletRequest().getParameter("name"));
            }
      })
                .setAllowedOrigins("*") // 添加允许跨域访问
                //添加socket拦截器,用于从请求中获取客户端标识参数
                .addInterceptors(new MyHandShakeInterceptor()).withSockJS();
        
    }
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //客户端发送消息的请求前缀
        registry.setApplicationDestinationPrefixes("/app");
        //客户端订阅消息的请求前缀,topic一般用于广播推送,queue用于点对点推送
        registry.enableSimpleBroker("/topic", "/queue");
        //服务端通知客户端的前缀,可以不设置,默认为user
        registry.setUserDestinationPrefix("/user");
        /*  如果是用自己的消息中间件,则按照下面的去配置,删除上面的配置
         *   registry.enableStompBrokerRelay("/topic", "/queue")
            .setRelayHost("rabbit.someotherserver")
            .setRelayPort(62623)
            .setClientLogin("marcopolo")
            .setClientPasscode("letmein01");
            registry.setApplicationDestinationPrefixes("/app", "/foo");
         * */
        

    }

    @Bean
    public WebSocketHandler subProtocolWebSocketHandler() {
        return new CustomSubProtocolWebSocketHandler(clientInboundChannel(), clientOutboundChannel());
    }

//定义一个自己的权限验证类
    class FastPrincipal implements Principal {

        private final String name;

        public FastPrincipal(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }
}
           

3、编写拦截器

import com.company.industrial_park.Util.JwtToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;

import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

import java.util.Map;
/**
 * 拦截器
 * */
@Component
public class MyHandShakeInterceptor implements HandshakeInterceptor {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
                                   Map<String, Object> attributes) throws Exception {
  //  System.out.println(this.getClass().getCanonicalName() + "握手前...");
        return true;
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {

      //  System.out.println(this.getClass().getCanonicalName() + "握手成功后...");
    }
}
           

4、自定义websocket处理器

自定义这个主要是用来监听连接的建立与关闭,可以用来判断用户的在线状态等等。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;

public class CustomSubProtocolWebSocketHandler extends SubProtocolWebSocketHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(CustomSubProtocolWebSocketHandler.class);

    @Autowired
    private SessionHandler sessionHandler;

    public CustomSubProtocolWebSocketHandler(MessageChannel clientInboundChannel, SubscribableChannel clientOutboundChannel) {
        super(clientInboundChannel, clientOutboundChannel);
    }


    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        LOGGER.info("New websocket connection was established");
        sessionHandler.register(session);
        super.afterConnectionEstablished(session);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        LOGGER.info("New websocket connection:"+session.getPrincipal().getName()+" was closed");
        sessionHandler.remove(session);
        super.afterConnectionClosed(session,closeStatus);
    }
}
           

5、定义session处理器

在这里我们可以对用户的session进行管理

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.WebSocketSession;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Service
public class SessionHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(SessionHandler.class);
 //   private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    private final Map<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();

    public SessionHandler() {

    }

    public void register(WebSocketSession session) {
        sessionMap.put(session.getPrincipal().getName(), session);
    }

    public void remove(WebSocketSession session) {
        sessionMap.remove(session.getPrincipal().getName());
    }

    public boolean isOnline(String openid){
        return sessionMap.get(openid)!=null;
    }

}
           

6、编写Controller

在这里我实现了点对点的通信,接受到消息后会

package com.company.industrial_park.controller.chat;

import java.io.UnsupportedEncodingException;
import java.security.Principal;
import java.util.logging.Logger;

import com.mysql.cj.log.LogFactory;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;

@Controller
public class StompController {


    @Autowired
    SimpMessagingTemplate SMT;

    @Autowired
    SessionHandler sessionHandler;

    @MessageMapping("/message")
    public void subscription(String str, Principal principal) throws MessagingException, UnsupportedEncodingException {
        JSONObject obj=new JSONObject(str);
        String openid=obj.getString("name");
        SMT.convertAndSendToUser(openid,"/topic/chat",
            "消息:"+obj.getString("content")+" from "+principal.getName());//发送消息到指定用户
        boolean online=sessionHandler.isOnline(openid);
        System.out.println(openid+" "+online);
    }

}
           

7、stomp客户端,html文件

把下面的html文件放到resources文件夹里的public里

<html xmlns="http://www.w3.org/1999/xhtml"
      xmlns:th="http://www.thymeleaf.org">

<head>
    <meta charset="utf-8" />

    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>stomp</title>
    <link href="https://cdn.bootcss.com/bootstrap/4.1.1/css/bootstrap.min.css" target="_blank" rel="external nofollow"  rel="stylesheet">
    <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
    <script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
    <script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>

</head>
<body>
<noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websocket relies on Javascript being
    enabled. Please enable
    Javascript and reload this page!</h2></noscript>
<div id="main-content" class="container">
    <div class="row">
        <div class="col-md-6">
            <form class="form-inline">
                <input placeholder="url" id='addr' value="/stomp" />
                <div class="form-group">
                    <label for="connect">register an user,input name:</label>
                    <input type="text" id="username" class="form-control" value="tony" placeholder="Your name here...">
                    <button id="confirm" class="btn btn-default" type="submit">confirm</button>
                </div>
                <div class="form-group">
                    <label for="connect">WebSocket connection:</label>
                    <button id="connect" class="btn btn-default" type="submit">Connect</button>
                    <button id="disconnect" class="btn btn-default" type="submit" disabled="disabled">Disconnect
                    </button>
                </div>
            </form>
        </div>
        <div class="col-md-6">
            <form class="form-inline">
                <div class="form-group">
                    <label for="name">What is your name?</label>
                    <input type="text" id="name" class="form-control" placeholder="Your name here...">
                </div>
                <button id="send" class="btn btn-default" type="submit">Send</button>
            </form>
        </div>
        <textarea id="token"></textarea>
    </div>
    <div class="row">
        <div class="col-md-12">
            <table id="conversation" class="table table-striped">
                <thead>
                <tr>
                    <th>Greetings</th>
                </tr>
                </thead>
                <tbody id="greetings">
                </tbody>
            </table>
        </div>
    </div>
</div>
<script type="text/javascript">
    /**
     *
     */
    var stompClient = null;


    function setConnected(connected) {
        $("#connect").prop("disabled", connected);
        $("#disconnect").prop("disabled", !connected);
        if (connected) {
            $("#conversation").show();
        }
        else {
            $("#conversation").hide();
        }
        $("#greetings").html("");
    }
    var hostaddr = window.location.host ;
    var url = 'ws://' + hostaddr;
    function connect() {
        var username=$("#username").val();

        var socket = new SockJS("/stomp?name="+$("username").val());
        stompClient = Stomp.over(socket);
        stompClient.connect({}, function (frame) {
            setConnected(true);
            console.log('Connected: ' + frame);
            stompClient.subscribe('/topic/sub', function (greeting) {
                showGreeting(greeting.body);
            });
            stompClient.subscribe("/user/topic/chat", function (greeting) {
                showGreeting(greeting.body);
            });
        });
    }

    function disconnect() {
        if (stompClient !== null) {
            stompClient.disconnect();
        }
        setConnected(false);
        console.log("Disconnected");
    }

    function sendName() {
        stompClient.send("/app/message", {},$("#name").val());
    }

    function showGreeting(message) {
        $("#greetings").append("<tr><td>" + message + "</td></tr>");
    }

    $(function () {
        $("form").on('submit', function (e) {
            e.preventDefault();
        });
        $( "#connect" ).click(function() { connect(); });
        $( "#disconnect" ).click(function() { disconnect(); });
        $( "#send" ).click(function() { sendName(); });
        $("#confirm").click(function(){username=$("#username").val();connect();});


    });
</script>
</body>
</html>
           

测试

因为我代码是默认字符串是json格式的,所以发送的数据要是json格式,首先连接,

spring boot通过stomp协议实现通信功能
spring boot通过stomp协议实现通信功能

可以看到用户名为123的用户超过接收到了消息。

继续阅读