天天看点

Spring Boot+STOMP解决消息乱序问题

当我们使用Spring Boot+websocket进行前后端进行通信时,我们需要注意:服务器可以随时向客户端发送消息。默认的情况下,不保证:服务器发送的消息与到达客户端的消息的顺序是一致的。可能先发送的消息后到,后发送的消息先到。(注意:两个消息发送的时间差不多,不能相差太多,不然就是顺序的了。一般一秒以下都会造成乱序)。

如果你的需求需要服务器向客户端发送的消息是按照顺序的。请你往下看,如果你的需求不要求服务器向客户端发送的消息是顺序的。就没有必要看了。

我们先说以下造成的原因:就是websocket发送消息采用的是多线程发送消息的,造成有的先到的消息在一个线程中在阻塞,而后到的消息使用其他的线程发送出去了。就造成了乱序。

然后我们再来看一个实例,自己去体验一把最好不过了。

一、我们先导入websocket包。

<!--这个是websocket通信的JAR文件-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>


        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>webjars-locator-core</artifactId>
        </dependency>
        <!--这个是客户端连接服务器需要的文件-->
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>sockjs-client</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>stomp-websocket</artifactId>
            <version>2.3.3</version>
        </dependency>
        <!--页面布局的文件-->
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>bootstrap</artifactId>
            <version>3.3.7</version>
        </dependency>
        <dependency>
            <groupId>org.webjars</groupId>
            <artifactId>jquery</artifactId>
            <version>3.1.0</version>
        </dependency>
           

这个Jar包会将所有的SpringBoot的JAR文件都导入进去。

二、添加websocket的配置文件,进行websocket的通信。我们这里使用sockJS进行通信,他是websocket的一个改进,当浏览器不支持websocket的通信时,就会使用SockJS进行连接。

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {

    //设置心跳的时间间隔
    private static long HEART_BEAT=50000;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //设置连接websocket的地址,       注意使用    “/项目名/ws-start”   进行连接websocket。
        //setAllowedOrigins("*")是设置所有请求都可以访问,即允许跨域的问题,或者自己设置允许访问的域名。
        //withSockJS()     在不支持websocket的浏览器中,使用sockJs备选作为连接。
        registry.addEndpoint("/ws-start").setAllowedOrigins("*").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //设置简单的消息代理器,它使用Memory(内存)作为消息代理器,
        //其中/user和/topic都是我们发送到前台的数据前缀。前端必须订阅以/user开始的消息(.subscribe()进行监听)。
        //setHeartbeatValue设置后台向前台发送的心跳,
        //注意:setHeartbeatValue这个不能单独设置,不然不起作用,要配合后面setTaskScheduler才可以生效。
        //对应的解决方法的网址:https://stackoverflow.com/questions/39220647/spring-stomp-over-websockets-not-scheduling-heartbeats
        registry.enableSimpleBroker("/user","/topic").setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(new DefaultManagedTaskScheduler());
        //设置我们前端发送:websocket请求的前缀地址。即client.send("/ws-send")作为前缀,然后再加上对应的@MessageMapping后面的地址
        registry.setApplicationDestinationPrefixes("/ws-send");
    }
}
           

三、创建控制器,客户端将消息发送到哪里。

@Controller
public class StompSequenceTest {

    /**
     * 这个就是用于向客户端发送消息的类
     */
    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @MessageMapping("/sequence")
    public void sequence(){
        int index=0;
        while(index<200){
            //用于向客户端发送前200个数字,我们希望树循序接收到0到199这200个数字
            simpMessagingTemplate.convertAndSend("/topic/sequence",index++);
            try {
                //我们睡眠1毫秒,1毫秒对于发送数据已经很大了。CPU执行都是纳米级的
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}
           

然后添加HTML页面,CSS文件,JS文件,注意要放在resource下的static文件夹中。

然后在浏览器中打开:localhost:8088.

具体的项目在:https://gitee.com/chang_501/springboot_stomp_all/tree/master/springboot_stomp_sequence中下载,并使用idea打开,然后运行。在浏览器中执行localhost:8088。就会显示下面的图片:

Spring Boot+STOMP解决消息乱序问题

当我们单击send按钮之后,出现下面这个图片:

Spring Boot+STOMP解决消息乱序问题

这个是因为我们使用默认的发送器,默认的发送器是采用多线程发送的。

下面就是解决方案:就是修改默认的发送器的线程个数为1,就可以了。

解决方案:

我们只要在WebsocketConfig类中添加3行代码。

/**
     * 这是用于配置服务器向浏览器发送的消息。
     * clientOut就表示出出口。还有一个inBoundChannel用于处理浏览器向服务器发送的消息
     * @param registration
     */
    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        registration.taskExecutor().corePoolSize(1).maxPoolSize(1);
    }
           

最终的文件:

package com.kmust.springboot_stomp_sequence.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.DefaultManagedTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * 作者: 常伟
 * 创建时间: 2018/7/5 17:21
 * 邮箱: [email protected]
 * 功能:
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {

    //设置心跳的时间间隔
    private static long HEART_BEAT=50000;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //设置连接websocket的地址,       注意使用    “/项目名/ws-start”   进行连接websocket。
        //setAllowedOrigins("*")是设置所有请求都可以访问,即允许跨域的问题,或者自己设置允许访问的域名。
        //withSockJS()     在不支持websocket的浏览器中,使用sockJs备选作为连接。
        registry.addEndpoint("/ws-start").setAllowedOrigins("*").withSockJS();
    }

    /**
     * 这是用于配置服务器向浏览器发送的消息。
     * clientOut就表示出出口。还有一个inBoundChannel用于处理浏览器向服务器发送的消息
     * @param registration
     */
    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        registration.taskExecutor().corePoolSize(1).maxPoolSize(1);
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //设置简单的消息代理器,它使用Memory(内存)作为消息代理器,
        //其中/user和/topic都是我们发送到前台的数据前缀。前端必须订阅以/user开始的消息(.subscribe()进行监听)。
        //setHeartbeatValue设置后台向前台发送的心跳,
        //注意:setHeartbeatValue这个不能单独设置,不然不起作用,要配合后面setTaskScheduler才可以生效。
        //对应的解决方法的网址:https://stackoverflow.com/questions/39220647/spring-stomp-over-websockets-not-scheduling-heartbeats
        registry.enableSimpleBroker("/user","/topic").setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(new DefaultManagedTaskScheduler());
        //设置我们前端发送:websocket请求的前缀地址。即client.send("/ws-send")作为前缀,然后再加上对应的@MessageMapping后面的地址
        registry.setApplicationDestinationPrefixes("/ws-send");
    }
}
           

然后再次运行。然后访问localhost:8088.就会发现发送的消息都是顺序的了,不管实验多少次。

到这里就结束了。

如果有其他的解决方案,或者不能执行成功。可以留言。

继续阅读