天天看點

WebSocket訂閱ActiveMQ消息

作者:程式員養家

ActiveMQ安裝

下載下傳

ActiveMQ官網: http://activemq.apache.org

找到最新的穩定版,下載下傳并解壓到一個全英文路徑

啟動

進入bin目錄,在控制台執行指令

activemq-admin.bat start

打開浏覽器

http://127.0.0.1:8161

預設賬号: admin/admin

JMS 的兩種模式

  • 在點對點的消息傳遞時,目的地稱為 隊列 queue
  • 在釋出訂閱消息傳遞中,目的地稱為 主題 topic

SpringBoot後端

application.yaml配置

#=============== activemq配置 =======================

activemq:

broker-url: failover:(tcp://192.168.1.99:61616)?initialReconnectDelay=1000&startupMaxReconnectAttempts=2

user: admin

password: admin

# 在考慮結束之前等待的時間

close-timeout: 15s

# 預設代理URL是否應該在記憶體中。如果指定了顯式代理,則忽略此值。

in-memory: true

# 是否在復原消息之前停止消息傳遞。這意味着當啟用此指令時,消息順序不會被保留。

non-blocking-redelivery: false

# 等待消息發送響應的時間。設定為0等待永遠。

send-timeout: 0s

# 消息隊列名稱

pool:

enabled: true

max-connections: 5

idle-timeout: 60s

#=============== jsm消費配置 =======================

jms:

listener:

# 預設開啟多少個消費者

concurrency: 3

#在這裡消費者是可以随動的。最大配置消費者并行最大數量

max-concurrency: 3

自定義配置類

@Configuration

@EnableConfigurationProperties({ActiveMQProperties.class, JmsProperties.class})

public class ActiveMqConfiguration {

/**

* Jms message template jms messaging template.

*

* @param jmsConnectionFactory the jms connection factory

* @return the jms messaging template

*/

@Bean

public JmsMessagingTemplate jmsMessageTemplate(ConnectionFactory jmsConnectionFactory) {

return new JmsMessagingTemplate(jmsConnectionFactory);

}

/**

* 隊列模式 - 連接配接工廠

*

* @param connectionFactory the connection factory

* @return the jms listener container factory

*/

@Bean({"queueListener"})

public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory) {

DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

factory.setConnectionFactory(connectionFactory);

// false 代表隊列模式

factory.setPubSubDomain(false);

return factory;

}

/**

* 訂閱模式 - 連接配接工廠

*

* @param connectionFactory the connection factory

* @return the jms listener container factory

*/

public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory) {

DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

factory.setConnectionFactory(connectionFactory);

// true 代表topic模式

factory.setPubSubDomain(true);

return factory;

}

}

建立消息通道

@Slf4j

@Configuration

public class MyActiveMqConfig {

/**

* 告警消息接收, 消息類型全部為 {@link javax.jms.TextMessage}

*

* @param msg the msg

*/

@JmsListener(

destination = "warm-queue" ,

containerFactory = "queueListener",

concurrency = "${spring.jms.listener.concurrency:3}"

)

public void receiveAlarmMsg(String msg) {

log.info("Received alarm mq msg : {}", msg); }

/**

* Receive business msg.

*

* @param msg the msg

*/

@JmsListener(

destination = "buz-queue",

containerFactory = "queueListener",

concurrency = "${spring.jms.listener.concurrency:3}"

)

public void receiveBusinessMsg(String msg) {

log.info("Received business mq msg : {}" , msg); }

/**

* Receive resource msg.

*

* @param msg the msg

*/

@JmsListener(

destination = "res-queue",

containerFactory = "queueListener",

concurrency = "${spring.jms.listener.concurrency:3}"

)

public void receiveResourceMsg(String msg) {

log.info("Received resource mq msg : {}", msg);

}

/**

* 模拟接收 push msg

*

* @param msg the push msg

*/

@JmsListener(

destination = "push-org-1",

containerFactory = "queueListener",

concurrency = "${spring.jms.listener.concurrency:3}"

)

public void receivePushMsg(String msg) {

log.info("Received push msg : {}", msg);

}

/**

* 告警隊列

*

* @return the queue

*/

@Bean

public Queue alarmQueue() {

return new ActiveMQQueue("warm-queue" + messageProperties.getEnv());

}

/**

* 資料隊列

*

* @return the queue

*/

@Bean

public Queue businessQueue() {

return new ActiveMQQueue("buz-queue" + messageProperties.getEnv());

}

/**

* 資源資料隊列

*

* @return the queue

*/

@Bean

public Queue resourceQueue() {

return new ActiveMQQueue("res-queue" + messageProperties.getEnv());

}

/**

* 模拟測試前端訂閱的隊列

*

* @return the queue

*/

@Bean

public Topic pushTopic() {

return new ActiveMQTopic("topic-push");

}

}

後端發送消息

/**

* 發送消息

* @param message */

private static void sendMessage(String message) {

JmsMessagingTemplate jmt = SpringUtils.getBean(JmsMessagingTemplate.class);

log.info("向前端推送消息: {}", message);

jmt.convertAndSend("topic-push", message);

}

前端接收消息

下載下傳stomp.js

https://svn.apache.org/repos/asf/activemq/trunk/activemq-web-demo/src/main/webapp/websocket/stomp.js

初始化

var client = Stomp.client(url);

client.connect(username, password, onconnect);

Onconnect是回調方法,在裡面做訂閱

訂閱

client.subscribe(destination, function(message) {

//消息内容主體是message.body

});

如:

var onconnect = function(frame) {

client.msg("connected to Stomp");

client.subscribe(destination, function(message) {

client.msg(message.body);

});

client.msg("subscribe to " + destination);

};

完整示例

<script type="text/javascript" src="jquery-1.4.2.min.js"></script>

<script type="text/javascript" src="stomp.js"></script>

<script type="text/javascript">

var url = 'ws://192.168.1.99:61614/stomp/’;

var username= 'admin';

var password= admin;

var destination = 'topic-push';

$(function(){

$('#connect_btn').click(function() {

var client = Stomp.client(url);

client.msg = function(str) {

var t = document.createTextNode(str);

var p = document.createElement("p");

p.appendChild(t);

$("#messages").append(p);

};

var onconnect = function(frame) {

client.msg("connected to Stomp");

client.subscribe(destination, function(message) {

client.msg(message.body);

});

client.msg("subscribe to " + destination);

};

client.connect(username, password, onconnect);

});

$('#send_btn').click(function() {

var text = $('#send_form_input').val();

if (text) {

client.send(destination, {foo: 1}, text);

$('#send_form_input').val("");

}

return false;

});

});

</script>

<div id='connect'>

<dl>

<dd><input type="button" id='connect_btn' value="Connect"></dd>

<dd><input type="button" id='send_btn' value="send"></dd>

</dl>

<input id='send_form_input' placeholder="message to send />

</div>

<div id="messages">

</div>

安裝包裡示例

在exmaples/stomp/websocket目錄

WebSocket訂閱ActiveMQ消息