天天看點

springboot整合websocket(Stomp協定)

過濾器

public class NecpWebSocketInterceptor implements HandshakeInterceptor {
    Logger logger = LoggerFactory.getLogger(NecpWebSocketInterceptor.class);

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler handle, Map<String, Object> map) throws Exception {
        logger.info("啦啦啦,握手開始啦");
        return true;
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler handle, Exception e) {
        logger.info("啦啦啦,握手成功啦");
    }
}
           

配置

@EnableWebSocketMessageBroker
@Configuration
public class WebSocket implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {

//        long[] hert = {1000L,1000L};
        //這裡使用的是記憶體模式,生産環境可以使用rabbitmq或者其他mq。
        //registry.enableStompBrokerRelay().setRelayHost().setRelayPort() 其他方式。

        // 廣播式配置名為 /topic 消息代理 , 這個消息代理必須和 controller 中的 @SendTo 配置的位址字首一樣或者全比對
        //表示消息得訂閱
        config.enableSimpleBroker("/topic","/user","/queue");
//        .setHeartbeatValue(hert) //心跳設定
//        ;

        //一對一發送字首
        config.setUserDestinationPrefix("/user");

        //背景應用接收浏覽器消息端點字首,這個将直接路由到@MessageMapping上
        config.setApplicationDestinationPrefixes("/ws");
    }

    /**
     * 注冊stomp端點,主要是起到連接配接作用
     * @param registry registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {

        端點名稱
        registry.addEndpoint("/necp_ws")
                .addInterceptors(new  NecpWebSocketInterceptor())
                //跨域
                .setAllowedOrigins("*")
//                .setHandshakeHandler()

                //使用sockJS
                .withSockJS();
    }


}
           

使用

/**
 * Demo
 * @author lgs
 */
@Controller
public class WsTestController {

    Logger logger = LoggerFactory.getLogger(WsTestController.class);

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;


    /**
     *  廣播訂閱
     * @param name
     * @return
     * @throws InterruptedException
     */
    @MessageMapping("/topic1")
    @SendTo("/topic/getResponse")
    public String sub(String name) throws InterruptedException {
        logger.info("廣播注解方式訂閱執行了");
        Thread.sleep(1000);
        return "廣播注解方式訂閱成功了";
    }

    /**
     *  廣播訂閱
     * @param name
     * @return
     * @throws InterruptedException
     */
    @MessageMapping("/topic2")
    public void sub2(String name) throws InterruptedException {
        logger.info("廣播模闆方式訂閱執行了");
        Thread.sleep(1000);
        simpMessagingTemplate.convertAndSend("/topic/getResponse","廣播模闆方式訂閱成功了");
    }


    /**
     * 單點發送
     * @return
     * @throws Exception
     */
    @MessageMapping("/welcome1")
    @SendToUser//預設發送到/user/queue/welcome1
    public String welcome(String name) throws Exception {

        logger.info(name + "單點訂閱注解方式執行了");
        Thread.sleep(1000);

        return "單點訂閱注解方式成功了";
    }


    /**
     * 單點發送
     * @return
     * @throws Exception
     */
    @MessageMapping("/welcome2/{userId}/{stationId}")
    public void say2(@DestinationVariable("userId") String userId , @DestinationVariable("stationId") String stationId) throws Exception {

        logger.info(userId + "單點訂閱模闆方式執行了" + stationId);
        Thread.sleep(1000);
        simpMessagingTemplate.convertAndSendToUser("123","/queue/getResponse","單點訂閱模闆方式成功了");
    }

    /**
     * 單點發送
     * @return
     * @throws Exception
     */
    @MessageMapping("/welcome2")
    public void say2(String name) throws Exception {

        logger.info(name + "單點訂閱模闆方式執行了");
        Thread.sleep(1000);
        simpMessagingTemplate.convertAndSendToUser("123","/queue/getResponse","單點訂閱模闆方式成功了");
    }

    /**
     * 定時廣播
     */
    @Scheduled(fixedRate = 10000)
    public void sendTopicMessage() {
        System.out.println("背景廣播推送!");
        simpMessagingTemplate.convertAndSend("/topic/getResponse","感謝您訂閱了我");
    }

}
           

繼續閱讀