天天看点

SpringBoot整合ActiveMQ+websocket QueueTopicWebsocket

目录

Queue

Pom

application.yml

队列生成者

Config

队列生成者代码

队列消费者

队列消费者代码

Topic

Pom

application.yml

主题生产者

Config

主题生产者代码

主题消费者

主题消费者代码

Websocket

H5配置方式

Config

h5WebSocket代码

Controller中结合MQ调用

Html中代码

Springboot配置方式Stomp协议

Config

Controller代码结合mq

Html代码

H5中用js直接和activemq联通

Html代码

Queue

Pom

<dependency>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-activemq</artifactId>

      </dependency>

application.yml

server:

  port: 7777

spring:

  activemq:

    broker-url: tcp://127.0.0.1:61616 # 自己的mq服务器地址

    user: admin

    password: admin

  jms:

    pub-sub-domain: false   #fasle=queue  true=topic,不写默认是队列queue

#自己定义队列名称

myqueue: boot-activemq-queue

队列生成者

Config

package boot_mq_produce.config;

import javax.jms.Queue;

import org.apache.activemq.command.ActiveMQQueue;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.jms.annotation.EnableJms;

import org.springframework.scheduling.annotation.EnableScheduling;

import org.springframework.stereotype.Component;

@Component

@EnableJms

@EnableScheduling

public class ConfigMq {

   @Value("${myqueue}")

   private String myQueue;

   @Bean

   public Queue queue() {

      return new ActiveMQQueue(myQueue);

   }

}

队列生成者代码

package boot_mq_produce.product;

import java.util.UUID;

import javax.jms.Queue;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jms.core.JmsMessagingTemplate;

import org.springframework.scheduling.annotation.Scheduled;

import org.springframework.stereotype.Component;

@Component

public class QueueProduce {

   @Autowired

   private JmsMessagingTemplate jmsMessagingTemplate;

   @Autowired

   private Queue queue;

   public void produceMsg() {

      jmsMessagingTemplate.convertAndSend(queue,"***"+UUID.randomUUID());

   }

   @Scheduled(fixedDelay=2000)

   public void produceMsgScheduled() {

      jmsMessagingTemplate.convertAndSend(queue,"***"+UUID.randomUUID());

      System.out.println("间隔发送");

   }

}

队列消费者

队列消费者代码

package boot_mq_consumer.cconsumer;

import javax.jms.TextMessage;

import org.springframework.jms.annotation.JmsListener;

import org.springframework.stereotype.Component;

@Component

public class QueueConsumer {

   @JmsListener(destination="${myqueue}")

   public void recive(TextMessage textMessage) throws Exception {

      System.out.println("消费者接到的消息"+textMessage.getText());

   }

}

Topic

Pom

<dependency>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-activemq</artifactId>

      </dependency>

application.yml

主题生产者和消费者相同

server:

  port: 6666

spring:

  activemq:

    broker-url: tcp://127.0.0.1:61616 # 自己的mq服务器地址

    user: admin

    password: admin

  jms:

    pub-sub-domain: true   #fasle=queue  true=topic,不写默认是队列queue

#自己定义队列名称

mytopic: boot-activemq-topic

主题生产者

Config

package boot_mq_topic_produce.config;

import javax.jms.Topic;

import org.apache.activemq.command.ActiveMQTopic;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.jms.annotation.EnableJms;

import org.springframework.scheduling.annotation.EnableScheduling;

import org.springframework.stereotype.Component;

@Component

@EnableJms

@EnableScheduling

public class ConfigBean {

   @Value("${mytopic}")

   private String mytopic;

   @Bean

   public Topic topic() {

      return new ActiveMQTopic(mytopic);

   }

}

主题生产者代码

package boot_mq_topic_produce.product;

import java.util.UUID;

import javax.jms.Topic;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jms.core.JmsMessagingTemplate;

import org.springframework.scheduling.annotation.Scheduled;

import org.springframework.stereotype.Component;

@Component

public class TopicProduce {

   @Autowired

   private JmsMessagingTemplate jmsMessagingTemplate;

   @Autowired

   private Topic topic;

   @Scheduled(fixedDelay=2000)

   public void topicProduce() {

      jmsMessagingTemplate.convertAndSend(topic,"主题消息:"+UUID.randomUUID().toString());

   }

}

主题消费者

主题消费者代码

不需要其他代码配置,直接监听就行

package boot_mq_topic_consumer.consumer;

import javax.jms.TextMessage;

import org.springframework.jms.annotation.JmsListener;

import org.springframework.stereotype.Component;

import boot_mq_topic_consumer.websocket.H5.H5WebSocket;

@Component

public class TopicConsumer {

   @JmsListener(destination="${mytopic}")

   public void receive(TextMessage textMessage) {

      try {

         System.out.println("收到订阅者的消息:"+textMessage.getText());

      } catch (Exception e) {

         e.printStackTrace();

      }

   }

}

Websocket

MQ的生产者不需要改变

H5配置方式

Config

这个配置只是让springboot开启websocket

package boot_mq_topic_consumer.config;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration

public class ConfigWebSocket {

   @Bean 

    public ServerEndpointExporter serverEndpointExporter() { 

        return new ServerEndpointExporter(); 

    } 

}

h5WebSocket代码

根据代码和倒包可以看出,都是java自带的websocket包,并不是spring的

package boot_mq_topic_consumer.websocket.H5;

import java.io.IOException;

import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.OnClose;

import javax.websocket.OnMessage;

import javax.websocket.OnOpen;

import javax.websocket.Session;

import javax.websocket.server.ServerEndpoint;

import org.springframework.stereotype.Component;

@ServerEndpoint(value = "/h5WebSocket")

@Component

public class H5WebSocket {

   // concurrent包的线程安全Set,用来存放每个客户端对应的H5WebSocket对象。

   private static CopyOnWriteArraySet<H5WebSocket> webSocketSet = new CopyOnWriteArraySet<H5WebSocket>();

   // 与某个客户端的连接会话,需要通过它来给客户端发送数据

   private Session session;

   public Session getSession() {

      return session;

   }

   @OnOpen

   public void onOpen(Session session) {

      this.session = session;

      webSocketSet.add(this); // 加入set中

      System.out.println("有新连接加入!");

      try {

         sendMessage("测试");

      } catch (IOException e) {

         System.out.println("IO异常");

      }

   }

   @OnClose

   public void onClose() {

      webSocketSet.remove(this); // 从set中删除

      System.out.println("有一连接关闭!");

   }

   @OnMessage

   public void onMessage(String message, Session session) {

      System.out.println("来自客户端的消息:" + message);

      // 群发消息

      for (H5WebSocket item : webSocketSet) {

         try {

            item.sendMessage(message);

         } catch (IOException e) {

            e.printStackTrace();

         }

      }

   }

   public void onError(Session session, Throwable error) {

      System.out.println("发生错误");

      error.printStackTrace();

   }

   public void sendMessage(String message) throws IOException {

      this.session.getBasicRemote().sendText(message);

   }

   public static void sendInfo(String message) throws IOException {

      for (H5WebSocket item : webSocketSet) {

         try {

            item.sendMessage(message);

         } catch (IOException e) {

            continue;

         }

      }

   }

}

Controller中结合MQ调用

主要使用的是H5WebSocket.sendInfo方法

package boot_mq_topic_consumer.consumer;

import javax.jms.TextMessage;

import org.springframework.jms.annotation.JmsListener;

import org.springframework.stereotype.Component;

import boot_mq_topic_consumer.websocket.H5.H5WebSocket;

@Component

public class TopicConsumer {

   @JmsListener(destination="${mytopic}")

   public void receive(TextMessage textMessage) {

      try {

         //System.out.println("收到订阅者的消息:"+textMessage.getText());

         H5WebSocket.sendInfo(textMessage.getText());

      } catch (Exception e) {

         e.printStackTrace();

      }

   }

}

Html中代码

<!DOCTYPE html>

<html>

<head>

<meta charset="UTF-8">

<title>H5websocket</title>

</head>

<body>

Welcome<br/>

<input id="text" type="text" /><button onclick="send()">发送</button>    <button onclick="closeWebSocket()">关闭</button>

<div id="message">

</div>

</body>

<script type="text/javascript">

    var websocket = null;

    //判断当前浏览器是否支持WebSocket

    if('WebSocket' in window){

        websocket = new WebSocket("ws://localhost:5555/h5WebSocket");

    }

    else{

        alert('对不起,您的浏览器不支持websocket')

    }

    //连接发生错误的回调方法

    websocket.onerror = function(){

        setMessageInnerHTML("error");

    };

    //连接成功建立的回调方法

    websocket.onopen = function(event){

        setMessageInnerHTML("open");

    }

    //接收到消息的回调方法

    websocket.onmessage = function(event){

        setMessageInnerHTML(event.data);

    }

    //连接关闭的回调方法

    websocket.onclose = function(){

        setMessageInnerHTML("close");

    }

    //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。

    window.onbeforeunload = function(){

        websocket.close();

    }

    //将消息显示在网页上

    function setMessageInnerHTML(innerHTML){

        document.getElementById('message').innerHTML += innerHTML + '<br/>';

    }

    //关闭连接

    function closeWebSocket(){

        websocket.close();

    }

    //发送消息

    function send(){

        var message = document.getElementById('text').value;

        websocket.send(message);

    }

</script>

</body>

</html>

Springboot配置方式Stomp协议

Config

根据代码和倒包可以看出,都是spring的包

package boot_mq_topic_consumer.config;

import org.springframework.context.annotation.Configuration;

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;

import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration

@EnableWebSocketMessageBroker

public class ConfigStompSocket  implements WebSocketMessageBrokerConfigurer {

   @Override

   public void registerStompEndpoints(StompEndpointRegistry registry) {

      registry.addEndpoint("/endpointWisely").withSockJS();

  // 允许使用socketJs方式访问,访问点为endpointWisely,允许跨域

//registry.addEndpoint("/endpointWisely").setAllowedOrigins("*").withSockJS();

   }

}

Controller代码结合mq

package boot_mq_topic_consumer.websocket.STOMP;

import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jms.annotation.JmsListener;

import org.springframework.messaging.handler.annotation.MessageMapping;

import org.springframework.messaging.handler.annotation.SendTo;

import org.springframework.messaging.simp.SimpMessagingTemplate;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.RequestMapping;

@Controller

public class StompSocketController {

    @Autowired

    private SimpMessagingTemplate template;

    @MessageMapping("/welcome")

    @SendTo("/topic/getResponse")

    public String say(String msg){

        return msg;

    }

    @JmsListener(destination="${mytopic}")

    public void all(TextMessage textMessage) throws Exception{

       template.convertAndSend("/topic/getResponse",textMessage.getText());

       System.out.println("tomp消息:"+textMessage.getText());

    }

    @RequestMapping(value="/stomp")

   public String index() {

      return "stomp";

   }

}

Html代码

<!DOCTYPE html>

<html>

<head>

<meta charset="UTF-8">

<title>Insert title here</title>

</head>

<body onload="disconnect()">

<div>

    <div>

        <button id="connect" onclick="connect();">连接</button>

        <button id="disconnect"  onclick="disconnect();">断开连接</button>

    </div>

    <div id="conversationDIV">

        <label>输入你的名字</label>

        <input type="text" id="name"/>

        <button id="sendName" onclick="sendName();">发送</button>

        <p id="response"></p>

    </div>

</div>

<script src="https://cdn.bootcss.com/jquery/3.2.1/jquery.min.js"></script>

<script  src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>

<script  src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script>

<script type="text/javascript">

    var stompClient = null;

    function connect(){

        //链接Socket的endpoint名称为endpointWisely

        var socket = new SockJS('/endpointWisely');

        //使用STOMP子协议的WebSocket客户端

        stompClient = Stomp.over(socket);

        //链接WebSocket服务端

        stompClient.connect({},function (frame) {

            console.log('Connected:' + frame);

            //通过stompClient.subscribe订阅/topic/getResponse目标发送的消息,即控制器中的@SendTo

            stompClient.subscribe('/topic/getResponse',function (response) {

                showResponse(response.body);

            });

        });

    }

    function disconnect(){

        if(stompClient != null){

            stompClient.disconnect();

        }

    }

    function sendName(){

        var name = $('#name').val();

        stompClient.send("/welcome",{},name);

    }

    function showResponse(message){

        $('#response').append($('#response').val() + message + '<br/>');

    }

</script>

</body>

</html>

H5中用js直接和activemq联通

Html代码

收先js用stomp.js进行websocket通信,接收的直接是activemq生产者所指定的主题或队列名称

<!DOCTYPE html>

<html lang="en">

  <head>

    <meta charset="utf-8">

    <title>H5_STOMP_avtivemq</title>

    <style type="text/css">

      body { padding-top: 40px; }

    </style>

  </head>

  <body>

    <div class="container-fluid">

      <div class="row-fluid">

        <div class="span6">

          <div id="connect">

            <div class="page-header">

              <h2>Server Login</h2>

            </div>

            <form class="form-horizontal" id='connect_form'>

              <fieldset>

                <div class="control-group">

                  <label>WebSocket URL地址</label>

                  <div class="controls">

                    <input name=url id='connect_url' value='ws://localhost:61614' type="text">

                  </div>

                </div>

                <div class="control-group">

                  <label>用户名</label>

                  <div class="controls">

                    <input id='connect_login' placeholder="User Login" value="admin" type="text">

                  </div>

                </div>

                <div class="control-group">

                  <label>密码</label>

                  <div class="controls">

                    <input id='connect_passcode' placeholder="User Password" value="admin" type="password">

                  </div>

                </div>

                <div class="control-group">

                  <label>目的地(主题或队列名)</label>

                  <div class="controls">

                  <!-- /topic/boot-activemq-topic路径中的 /topic应该是指定主题还是队列-->

                    <input id='destination' placeholder="Destination" value="/topic/boot-activemq-topic" type="text">

                  </div>

                </div>

                <div class="form-actions">

                  <button id='connect_submit' type="submit" class="btn btn-large btn-primary">链接</button>

                </div>

              </fieldset>

            </form>

          </div>

          <div id="connected" style="display:none">

            <div class="page-header">

              <h2>Chat Room</h2>

            </div>

            <div id="messages">

            </div>

            <form class="well form-search" id='send_form'>

              <button class="btn" type="button" id='disconnect' style="float:right">Disconnect</button>

              <input class="input-medium" id='send_form_input' placeholder="Type your message here" class="span6"/>

              <button class="btn" type="submit">Send</button>

            </form>

          </div>

        </div>

       <!--  <div class="span4">

          <div class="page-header">

            <h2>Debug Log</h2>

          </div>

          <pre id="debug"></pre>

        </div> -->

      </div>

    </div>

   <script src="https://cdn.bootcss.com/jquery/3.2.1/jquery.min.js"></script>

<script  src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script>

    <script>//<![CDATA[

    $(document).ready(function() {

      if(window.WebSocket) {

        var client, destination;

        $('#connect_form').submit(function() {

          var url = $("#connect_url").val();

          var login = $("#connect_login").val();

          var passcode = $("#connect_passcode").val();

          destination = $("#destination").val();

          client = Stomp.client(url);

          //打开连接

          client.connect(login, passcode, function(frame) {

            $('#connect').fadeOut({ duration: 'fast' });

            $('#connected').fadeIn();

            client.subscribe(destination, function(message) {

              $("#messages").append("<p>" + message.body + "</p>\n");

            });

          });

          return false;

        });

        $('#disconnect').click(function() {

//断开连接

          client.disconnect(function() {

            $('#connected').fadeOut({ duration: 'fast' });

            $('#connect').fadeIn();

            $("#messages").html("")

          });

          return false;

        });

        $('#send_form').submit(function() {

          var text = $('#send_form_input').val();

          if (text) {

            client.send(destination, {}, text);

            $('#send_form_input').val("");

          }

          return false;

        });

      } else {

        $("#connect").html("浏览器不支持");

      }

    });

    </script>

  </body>

</html>