一、stomp协议简介
STOMP是简单(或流式)文本定向消息传递协议。
STOMP提供可互操作的有线格式,以便STOMP客户端可以与任何STOMP消息代理进行通信,从而在多种语言,平台和代理之间提供简单而广泛的消息传递互操作性。
二、在spring boot开发基于stomp的通信功能
1、在pom.xml中添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</dependency>
</dependencies>
如果你的spring boot版本高于1.5的话,要在配置文件中添加以下设置,否则在启动时会报错,因为高于1.5的它里面已经定义了subProtocolWebSocketHandler这个bean,与我们后面要定义的冲突了。
spring.main.allow-bean-definition-overriding=true
2、stomp配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig extends WebSocketMessageBrokerConfigurationSupport implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
super.configureWebSocketTransport(registry);
}
@Override
public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
return super.configureMessageConverters(messageConverters);
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
super.configureClientInboundChannel(registration);
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
super.configureClientOutboundChannel(registration);
}
@Override
public void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
super.addArgumentResolvers(argumentResolvers);
}
@Override
public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
super.addReturnValueHandlers(returnValueHandlers);
}
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/stomp")
.setHandshakeHandler(new DefaultHandshakeHandler() {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
//握手成功后调用,可以在这里保存用户id
return new Principal(((ServletServerHttpRequest) request).getServletRequest().getParameter("name"));
}
})
.setAllowedOrigins("*") // 添加允许跨域访问
//添加socket拦截器,用于从请求中获取客户端标识参数
.addInterceptors(new MyHandShakeInterceptor()).withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//客户端发送消息的请求前缀
registry.setApplicationDestinationPrefixes("/app");
//客户端订阅消息的请求前缀,topic一般用于广播推送,queue用于点对点推送
registry.enableSimpleBroker("/topic", "/queue");
//服务端通知客户端的前缀,可以不设置,默认为user
registry.setUserDestinationPrefix("/user");
/* 如果是用自己的消息中间件,则按照下面的去配置,删除上面的配置
* registry.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("rabbit.someotherserver")
.setRelayPort(62623)
.setClientLogin("marcopolo")
.setClientPasscode("letmein01");
registry.setApplicationDestinationPrefixes("/app", "/foo");
* */
}
@Bean
public WebSocketHandler subProtocolWebSocketHandler() {
return new CustomSubProtocolWebSocketHandler(clientInboundChannel(), clientOutboundChannel());
}
//定义一个自己的权限验证类
class FastPrincipal implements Principal {
private final String name;
public FastPrincipal(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
}
3、编写拦截器
import com.company.industrial_park.Util.JwtToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
/**
* 拦截器
* */
@Component
public class MyHandShakeInterceptor implements HandshakeInterceptor {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
// System.out.println(this.getClass().getCanonicalName() + "握手前...");
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
// System.out.println(this.getClass().getCanonicalName() + "握手成功后...");
}
}
4、自定义websocket处理器
自定义这个主要是用来监听连接的建立与关闭,可以用来判断用户的在线状态等等。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
public class CustomSubProtocolWebSocketHandler extends SubProtocolWebSocketHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(CustomSubProtocolWebSocketHandler.class);
@Autowired
private SessionHandler sessionHandler;
public CustomSubProtocolWebSocketHandler(MessageChannel clientInboundChannel, SubscribableChannel clientOutboundChannel) {
super(clientInboundChannel, clientOutboundChannel);
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
LOGGER.info("New websocket connection was established");
sessionHandler.register(session);
super.afterConnectionEstablished(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
LOGGER.info("New websocket connection:"+session.getPrincipal().getName()+" was closed");
sessionHandler.remove(session);
super.afterConnectionClosed(session,closeStatus);
}
}
5、定义session处理器
在这里我们可以对用户的session进行管理
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Service
public class SessionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SessionHandler.class);
// private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<String, WebSocketSession> sessionMap = new ConcurrentHashMap<>();
public SessionHandler() {
}
public void register(WebSocketSession session) {
sessionMap.put(session.getPrincipal().getName(), session);
}
public void remove(WebSocketSession session) {
sessionMap.remove(session.getPrincipal().getName());
}
public boolean isOnline(String openid){
return sessionMap.get(openid)!=null;
}
}
6、编写Controller
在这里我实现了点对点的通信,接受到消息后会
package com.company.industrial_park.controller.chat;
import java.io.UnsupportedEncodingException;
import java.security.Principal;
import java.util.logging.Logger;
import com.mysql.cj.log.LogFactory;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
@Controller
public class StompController {
@Autowired
SimpMessagingTemplate SMT;
@Autowired
SessionHandler sessionHandler;
@MessageMapping("/message")
public void subscription(String str, Principal principal) throws MessagingException, UnsupportedEncodingException {
JSONObject obj=new JSONObject(str);
String openid=obj.getString("name");
SMT.convertAndSendToUser(openid,"/topic/chat",
"消息:"+obj.getString("content")+" from "+principal.getName());//发送消息到指定用户
boolean online=sessionHandler.isOnline(openid);
System.out.println(openid+" "+online);
}
}
7、stomp客户端,html文件
把下面的html文件放到resources文件夹里的public里
<html xmlns="http://www.w3.org/1999/xhtml"
xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>stomp</title>
<link href="https://cdn.bootcss.com/bootstrap/4.1.1/css/bootstrap.min.css" target="_blank" rel="external nofollow" rel="stylesheet">
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
<script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>
</head>
<body>
<noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websocket relies on Javascript being
enabled. Please enable
Javascript and reload this page!</h2></noscript>
<div id="main-content" class="container">
<div class="row">
<div class="col-md-6">
<form class="form-inline">
<input placeholder="url" id='addr' value="/stomp" />
<div class="form-group">
<label for="connect">register an user,input name:</label>
<input type="text" id="username" class="form-control" value="tony" placeholder="Your name here...">
<button id="confirm" class="btn btn-default" type="submit">confirm</button>
</div>
<div class="form-group">
<label for="connect">WebSocket connection:</label>
<button id="connect" class="btn btn-default" type="submit">Connect</button>
<button id="disconnect" class="btn btn-default" type="submit" disabled="disabled">Disconnect
</button>
</div>
</form>
</div>
<div class="col-md-6">
<form class="form-inline">
<div class="form-group">
<label for="name">What is your name?</label>
<input type="text" id="name" class="form-control" placeholder="Your name here...">
</div>
<button id="send" class="btn btn-default" type="submit">Send</button>
</form>
</div>
<textarea id="token"></textarea>
</div>
<div class="row">
<div class="col-md-12">
<table id="conversation" class="table table-striped">
<thead>
<tr>
<th>Greetings</th>
</tr>
</thead>
<tbody id="greetings">
</tbody>
</table>
</div>
</div>
</div>
<script type="text/javascript">
/**
*
*/
var stompClient = null;
function setConnected(connected) {
$("#connect").prop("disabled", connected);
$("#disconnect").prop("disabled", !connected);
if (connected) {
$("#conversation").show();
}
else {
$("#conversation").hide();
}
$("#greetings").html("");
}
var hostaddr = window.location.host ;
var url = 'ws://' + hostaddr;
function connect() {
var username=$("#username").val();
var socket = new SockJS("/stomp?name="+$("username").val());
stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
setConnected(true);
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/sub', function (greeting) {
showGreeting(greeting.body);
});
stompClient.subscribe("/user/topic/chat", function (greeting) {
showGreeting(greeting.body);
});
});
}
function disconnect() {
if (stompClient !== null) {
stompClient.disconnect();
}
setConnected(false);
console.log("Disconnected");
}
function sendName() {
stompClient.send("/app/message", {},$("#name").val());
}
function showGreeting(message) {
$("#greetings").append("<tr><td>" + message + "</td></tr>");
}
$(function () {
$("form").on('submit', function (e) {
e.preventDefault();
});
$( "#connect" ).click(function() { connect(); });
$( "#disconnect" ).click(function() { disconnect(); });
$( "#send" ).click(function() { sendName(); });
$("#confirm").click(function(){username=$("#username").val();connect();});
});
</script>
</body>
</html>
测试
因为我代码是默认字符串是json格式的,所以发送的数据要是json格式,首先连接,
可以看到用户名为123的用户超过接收到了消息。