ActiveMQ安裝
下載下傳
ActiveMQ官網: http://activemq.apache.org
找到最新的穩定版,下載下傳并解壓到一個全英文路徑
啟動
進入bin目錄,在控制台執行指令
activemq-admin.bat start
打開浏覽器
http://127.0.0.1:8161
預設賬号: admin/admin
JMS 的兩種模式
- 在點對點的消息傳遞時,目的地稱為 隊列 queue
- 在釋出訂閱消息傳遞中,目的地稱為 主題 topic
SpringBoot後端
application.yaml配置
#=============== activemq配置 =======================
activemq:
broker-url: failover:(tcp://192.168.1.99:61616)?initialReconnectDelay=1000&startupMaxReconnectAttempts=2
user: admin
password: admin
# 在考慮結束之前等待的時間
close-timeout: 15s
# 預設代理URL是否應該在記憶體中。如果指定了顯式代理,則忽略此值。
in-memory: true
# 是否在復原消息之前停止消息傳遞。這意味着當啟用此指令時,消息順序不會被保留。
non-blocking-redelivery: false
# 等待消息發送響應的時間。設定為0等待永遠。
send-timeout: 0s
# 消息隊列名稱
pool:
enabled: true
max-connections: 5
idle-timeout: 60s
#=============== jsm消費配置 =======================
jms:
listener:
# 預設開啟多少個消費者
concurrency: 3
#在這裡消費者是可以随動的。最大配置消費者并行最大數量
max-concurrency: 3
自定義配置類
@Configuration
@EnableConfigurationProperties({ActiveMQProperties.class, JmsProperties.class})
public class ActiveMqConfiguration {
/**
* Jms message template jms messaging template.
*
* @param jmsConnectionFactory the jms connection factory
* @return the jms messaging template
*/
@Bean
public JmsMessagingTemplate jmsMessageTemplate(ConnectionFactory jmsConnectionFactory) {
return new JmsMessagingTemplate(jmsConnectionFactory);
}
/**
* 隊列模式 - 連接配接工廠
*
* @param connectionFactory the connection factory
* @return the jms listener container factory
*/
@Bean({"queueListener"})
public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// false 代表隊列模式
factory.setPubSubDomain(false);
return factory;
}
/**
* 訂閱模式 - 連接配接工廠
*
* @param connectionFactory the connection factory
* @return the jms listener container factory
*/
public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// true 代表topic模式
factory.setPubSubDomain(true);
return factory;
}
}
建立消息通道
@Slf4j
@Configuration
public class MyActiveMqConfig {
/**
* 告警消息接收, 消息類型全部為 {@link javax.jms.TextMessage}
*
* @param msg the msg
*/
@JmsListener(
destination = "warm-queue" ,
containerFactory = "queueListener",
concurrency = "${spring.jms.listener.concurrency:3}"
)
public void receiveAlarmMsg(String msg) {
log.info("Received alarm mq msg : {}", msg); }
/**
* Receive business msg.
*
* @param msg the msg
*/
@JmsListener(
destination = "buz-queue",
containerFactory = "queueListener",
concurrency = "${spring.jms.listener.concurrency:3}"
)
public void receiveBusinessMsg(String msg) {
log.info("Received business mq msg : {}" , msg); }
/**
* Receive resource msg.
*
* @param msg the msg
*/
@JmsListener(
destination = "res-queue",
containerFactory = "queueListener",
concurrency = "${spring.jms.listener.concurrency:3}"
)
public void receiveResourceMsg(String msg) {
log.info("Received resource mq msg : {}", msg);
}
/**
* 模拟接收 push msg
*
* @param msg the push msg
*/
@JmsListener(
destination = "push-org-1",
containerFactory = "queueListener",
concurrency = "${spring.jms.listener.concurrency:3}"
)
public void receivePushMsg(String msg) {
log.info("Received push msg : {}", msg);
}
/**
* 告警隊列
*
* @return the queue
*/
@Bean
public Queue alarmQueue() {
return new ActiveMQQueue("warm-queue" + messageProperties.getEnv());
}
/**
* 資料隊列
*
* @return the queue
*/
@Bean
public Queue businessQueue() {
return new ActiveMQQueue("buz-queue" + messageProperties.getEnv());
}
/**
* 資源資料隊列
*
* @return the queue
*/
@Bean
public Queue resourceQueue() {
return new ActiveMQQueue("res-queue" + messageProperties.getEnv());
}
/**
* 模拟測試前端訂閱的隊列
*
* @return the queue
*/
@Bean
public Topic pushTopic() {
return new ActiveMQTopic("topic-push");
}
}
後端發送消息
/**
* 發送消息
* @param message */
private static void sendMessage(String message) {
JmsMessagingTemplate jmt = SpringUtils.getBean(JmsMessagingTemplate.class);
log.info("向前端推送消息: {}", message);
jmt.convertAndSend("topic-push", message);
}
前端接收消息
下載下傳stomp.js
https://svn.apache.org/repos/asf/activemq/trunk/activemq-web-demo/src/main/webapp/websocket/stomp.js
初始化
var client = Stomp.client(url);
client.connect(username, password, onconnect);
Onconnect是回調方法,在裡面做訂閱
訂閱
client.subscribe(destination, function(message) {
//消息内容主體是message.body
});
如:
var onconnect = function(frame) {
client.msg("connected to Stomp");
client.subscribe(destination, function(message) {
client.msg(message.body);
});
client.msg("subscribe to " + destination);
};
完整示例
<script type="text/javascript" src="jquery-1.4.2.min.js"></script>
<script type="text/javascript" src="stomp.js"></script>
<script type="text/javascript">
var url = 'ws://192.168.1.99:61614/stomp/’;
var username= 'admin';
var password= admin;
var destination = 'topic-push';
$(function(){
$('#connect_btn').click(function() {
var client = Stomp.client(url);
client.msg = function(str) {
var t = document.createTextNode(str);
var p = document.createElement("p");
p.appendChild(t);
$("#messages").append(p);
};
var onconnect = function(frame) {
client.msg("connected to Stomp");
client.subscribe(destination, function(message) {
client.msg(message.body);
});
client.msg("subscribe to " + destination);
};
client.connect(username, password, onconnect);
});
$('#send_btn').click(function() {
var text = $('#send_form_input').val();
if (text) {
client.send(destination, {foo: 1}, text);
$('#send_form_input').val("");
}
return false;
});
});
</script>
<div id='connect'>
<dl>
<dd><input type="button" id='connect_btn' value="Connect"></dd>
<dd><input type="button" id='send_btn' value="send"></dd>
</dl>
<input id='send_form_input' placeholder="message to send />
</div>
<div id="messages">
</div>
安裝包裡示例
在exmaples/stomp/websocket目錄