目录
Queue
Pom
application.yml
队列生成者
Config
队列生成者代码
队列消费者
队列消费者代码
Topic
Pom
application.yml
主题生产者
Config
主题生产者代码
主题消费者
主题消费者代码
Websocket
H5配置方式
Config
h5WebSocket代码
Controller中结合MQ调用
Html中代码
Springboot配置方式Stomp协议
Config
Controller代码结合mq
Html代码
H5中用js直接和activemq联通
Html代码
Queue
Pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
application.yml
server:
port: 7777
spring:
activemq:
broker-url: tcp://127.0.0.1:61616 # 自己的mq服务器地址
user: admin
password: admin
jms:
pub-sub-domain: false #fasle=queue true=topic,不写默认是队列queue
#自己定义队列名称
myqueue: boot-activemq-queue
队列生成者
Config
package boot_mq_produce.config;
import javax.jms.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;
@Component
@EnableJms
@EnableScheduling
public class ConfigMq {
@Value("${myqueue}")
private String myQueue;
@Bean
public Queue queue() {
return new ActiveMQQueue(myQueue);
}
}
队列生成者代码
package boot_mq_produce.product;
import java.util.UUID;
import javax.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class QueueProduce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
public void produceMsg() {
jmsMessagingTemplate.convertAndSend(queue,"***"+UUID.randomUUID());
}
@Scheduled(fixedDelay=2000)
public void produceMsgScheduled() {
jmsMessagingTemplate.convertAndSend(queue,"***"+UUID.randomUUID());
System.out.println("间隔发送");
}
}
队列消费者
队列消费者代码
package boot_mq_consumer.cconsumer;
import javax.jms.TextMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class QueueConsumer {
@JmsListener(destination="${myqueue}")
public void recive(TextMessage textMessage) throws Exception {
System.out.println("消费者接到的消息"+textMessage.getText());
}
}
Topic
Pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
application.yml
主题生产者和消费者相同
server:
port: 6666
spring:
activemq:
broker-url: tcp://127.0.0.1:61616 # 自己的mq服务器地址
user: admin
password: admin
jms:
pub-sub-domain: true #fasle=queue true=topic,不写默认是队列queue
#自己定义队列名称
mytopic: boot-activemq-topic
主题生产者
Config
package boot_mq_topic_produce.config;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;
@Component
@EnableJms
@EnableScheduling
public class ConfigBean {
@Value("${mytopic}")
private String mytopic;
@Bean
public Topic topic() {
return new ActiveMQTopic(mytopic);
}
}
主题生产者代码
package boot_mq_topic_produce.product;
import java.util.UUID;
import javax.jms.Topic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class TopicProduce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
@Scheduled(fixedDelay=2000)
public void topicProduce() {
jmsMessagingTemplate.convertAndSend(topic,"主题消息:"+UUID.randomUUID().toString());
}
}
主题消费者
主题消费者代码
不需要其他代码配置,直接监听就行
package boot_mq_topic_consumer.consumer;
import javax.jms.TextMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import boot_mq_topic_consumer.websocket.H5.H5WebSocket;
@Component
public class TopicConsumer {
@JmsListener(destination="${mytopic}")
public void receive(TextMessage textMessage) {
try {
System.out.println("收到订阅者的消息:"+textMessage.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
}
Websocket
MQ的生产者不需要改变
H5配置方式
Config
这个配置只是让springboot开启websocket
package boot_mq_topic_consumer.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class ConfigWebSocket {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
h5WebSocket代码
根据代码和倒包可以看出,都是java自带的websocket包,并不是spring的
package boot_mq_topic_consumer.websocket.H5;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;
@ServerEndpoint(value = "/h5WebSocket")
@Component
public class H5WebSocket {
// concurrent包的线程安全Set,用来存放每个客户端对应的H5WebSocket对象。
private static CopyOnWriteArraySet<H5WebSocket> webSocketSet = new CopyOnWriteArraySet<H5WebSocket>();
// 与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
public Session getSession() {
return session;
}
@OnOpen
public void onOpen(Session session) {
this.session = session;
webSocketSet.add(this); // 加入set中
System.out.println("有新连接加入!");
try {
sendMessage("测试");
} catch (IOException e) {
System.out.println("IO异常");
}
}
@OnClose
public void onClose() {
webSocketSet.remove(this); // 从set中删除
System.out.println("有一连接关闭!");
}
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("来自客户端的消息:" + message);
// 群发消息
for (H5WebSocket item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void onError(Session session, Throwable error) {
System.out.println("发生错误");
error.printStackTrace();
}
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
public static void sendInfo(String message) throws IOException {
for (H5WebSocket item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
continue;
}
}
}
}
Controller中结合MQ调用
主要使用的是H5WebSocket.sendInfo方法
package boot_mq_topic_consumer.consumer;
import javax.jms.TextMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import boot_mq_topic_consumer.websocket.H5.H5WebSocket;
@Component
public class TopicConsumer {
@JmsListener(destination="${mytopic}")
public void receive(TextMessage textMessage) {
try {
//System.out.println("收到订阅者的消息:"+textMessage.getText());
H5WebSocket.sendInfo(textMessage.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
}
Html中代码
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>H5websocket</title>
</head>
<body>
Welcome<br/>
<input id="text" type="text" /><button onclick="send()">发送</button> <button onclick="closeWebSocket()">关闭</button>
<div id="message">
</div>
</body>
<script type="text/javascript">
var websocket = null;
//判断当前浏览器是否支持WebSocket
if('WebSocket' in window){
websocket = new WebSocket("ws://localhost:5555/h5WebSocket");
}
else{
alert('对不起,您的浏览器不支持websocket')
}
//连接发生错误的回调方法
websocket.onerror = function(){
setMessageInnerHTML("error");
};
//连接成功建立的回调方法
websocket.onopen = function(event){
setMessageInnerHTML("open");
}
//接收到消息的回调方法
websocket.onmessage = function(event){
setMessageInnerHTML(event.data);
}
//连接关闭的回调方法
websocket.onclose = function(){
setMessageInnerHTML("close");
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function(){
websocket.close();
}
//将消息显示在网页上
function setMessageInnerHTML(innerHTML){
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
//关闭连接
function closeWebSocket(){
websocket.close();
}
//发送消息
function send(){
var message = document.getElementById('text').value;
websocket.send(message);
}
</script>
</body>
</html>
Springboot配置方式Stomp协议
Config
根据代码和倒包可以看出,都是spring的包
package boot_mq_topic_consumer.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class ConfigStompSocket implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/endpointWisely").withSockJS();
// 允许使用socketJs方式访问,访问点为endpointWisely,允许跨域
//registry.addEndpoint("/endpointWisely").setAllowedOrigins("*").withSockJS();
}
}
Controller代码结合mq
package boot_mq_topic_consumer.websocket.STOMP;
import javax.jms.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
public class StompSocketController {
@Autowired
private SimpMessagingTemplate template;
@MessageMapping("/welcome")
@SendTo("/topic/getResponse")
public String say(String msg){
return msg;
}
@JmsListener(destination="${mytopic}")
public void all(TextMessage textMessage) throws Exception{
template.convertAndSend("/topic/getResponse",textMessage.getText());
System.out.println("tomp消息:"+textMessage.getText());
}
@RequestMapping(value="/stomp")
public String index() {
return "stomp";
}
}
Html代码
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body onload="disconnect()">
<div>
<div>
<button id="connect" onclick="connect();">连接</button>
<button id="disconnect" onclick="disconnect();">断开连接</button>
</div>
<div id="conversationDIV">
<label>输入你的名字</label>
<input type="text" id="name"/>
<button id="sendName" onclick="sendName();">发送</button>
<p id="response"></p>
</div>
</div>
<script src="https://cdn.bootcss.com/jquery/3.2.1/jquery.min.js"></script>
<script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>
<script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script>
<script type="text/javascript">
var stompClient = null;
function connect(){
//链接Socket的endpoint名称为endpointWisely
var socket = new SockJS('/endpointWisely');
//使用STOMP子协议的WebSocket客户端
stompClient = Stomp.over(socket);
//链接WebSocket服务端
stompClient.connect({},function (frame) {
console.log('Connected:' + frame);
//通过stompClient.subscribe订阅/topic/getResponse目标发送的消息,即控制器中的@SendTo
stompClient.subscribe('/topic/getResponse',function (response) {
showResponse(response.body);
});
});
}
function disconnect(){
if(stompClient != null){
stompClient.disconnect();
}
}
function sendName(){
var name = $('#name').val();
stompClient.send("/welcome",{},name);
}
function showResponse(message){
$('#response').append($('#response').val() + message + '<br/>');
}
</script>
</body>
</html>
H5中用js直接和activemq联通
Html代码
收先js用stomp.js进行websocket通信,接收的直接是activemq生产者所指定的主题或队列名称
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>H5_STOMP_avtivemq</title>
<style type="text/css">
body { padding-top: 40px; }
</style>
</head>
<body>
<div class="container-fluid">
<div class="row-fluid">
<div class="span6">
<div id="connect">
<div class="page-header">
<h2>Server Login</h2>
</div>
<form class="form-horizontal" id='connect_form'>
<fieldset>
<div class="control-group">
<label>WebSocket URL地址</label>
<div class="controls">
<input name=url id='connect_url' value='ws://localhost:61614' type="text">
</div>
</div>
<div class="control-group">
<label>用户名</label>
<div class="controls">
<input id='connect_login' placeholder="User Login" value="admin" type="text">
</div>
</div>
<div class="control-group">
<label>密码</label>
<div class="controls">
<input id='connect_passcode' placeholder="User Password" value="admin" type="password">
</div>
</div>
<div class="control-group">
<label>目的地(主题或队列名)</label>
<div class="controls">
<!-- /topic/boot-activemq-topic路径中的 /topic应该是指定主题还是队列-->
<input id='destination' placeholder="Destination" value="/topic/boot-activemq-topic" type="text">
</div>
</div>
<div class="form-actions">
<button id='connect_submit' type="submit" class="btn btn-large btn-primary">链接</button>
</div>
</fieldset>
</form>
</div>
<div id="connected" style="display:none">
<div class="page-header">
<h2>Chat Room</h2>
</div>
<div id="messages">
</div>
<form class="well form-search" id='send_form'>
<button class="btn" type="button" id='disconnect' style="float:right">Disconnect</button>
<input class="input-medium" id='send_form_input' placeholder="Type your message here" class="span6"/>
<button class="btn" type="submit">Send</button>
</form>
</div>
</div>
<!-- <div class="span4">
<div class="page-header">
<h2>Debug Log</h2>
</div>
<pre id="debug"></pre>
</div> -->
</div>
</div>
<script src="https://cdn.bootcss.com/jquery/3.2.1/jquery.min.js"></script>
<script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script>
<script>//<![CDATA[
$(document).ready(function() {
if(window.WebSocket) {
var client, destination;
$('#connect_form').submit(function() {
var url = $("#connect_url").val();
var login = $("#connect_login").val();
var passcode = $("#connect_passcode").val();
destination = $("#destination").val();
client = Stomp.client(url);
//打开连接
client.connect(login, passcode, function(frame) {
$('#connect').fadeOut({ duration: 'fast' });
$('#connected').fadeIn();
client.subscribe(destination, function(message) {
$("#messages").append("<p>" + message.body + "</p>\n");
});
});
return false;
});
$('#disconnect').click(function() {
//断开连接
client.disconnect(function() {
$('#connected').fadeOut({ duration: 'fast' });
$('#connect').fadeIn();
$("#messages").html("")
});
return false;
});
$('#send_form').submit(function() {
var text = $('#send_form_input').val();
if (text) {
client.send(destination, {}, text);
$('#send_form_input').val("");
}
return false;
});
} else {
$("#connect").html("浏览器不支持");
}
});
</script>
</body>
</html>