編碼測試
一定要開啟持久化!!!
messageProducer.setDeliverMode(DeliveryMode.PERSISTENT);
隊列
生産者
public class JmsProduceJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws Exception {
//1.按照給定的url建立連接配接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,ACTIVEMQ_URL);
// 2.通過工廠連接配接connection 和啟動
Connection connection = activeMQConnectionFactory.createConnection();
// 3.啟動
connection.start();
// 4.建立會話session
//兩個參數,第一個事務,第二個簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.建立目的地,隊列、主題,這裡用隊列
Queue queue = session.createQueue(QUEUE_NAME);
// 6.建立消息的生産者
MessageProducer messageProducer = session.createProducer(queue);
/**
* 持久化必須設定
*/
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 7.通過MessageProducer生産3條消息發送到消息隊列中
for (int i = 1; i <= 6; i++) {
//8.建立消息
TextMessage textMessage = session.createTextMessage("msg:" + LocalDateTime.now());
//9.發送消息
messageProducer.send(textMessage);
}
// 10.關閉資源
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** 消息發送到MQ完成 **** ");
}
}
生産6條消息:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLyYDMzEjNzATM4EDOwEjMwITLldWYtl2LcdWbp9CXyVGdzFWbvw1dhJ3LcFTZnFWbp9CX0IDMxUGbvwVbvNmLlVGdpd2Lc9CX6MHc0RHaiojIsJye.png)
在資料庫
ACTIVEMQ_MSGS
表中,會生成6條資料,就是上一步生産的消息
消費者
public class JmsConsumerJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws Exception {
//建立連接配接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
// 建立連接配接connection
Connection connection = activeMQConnectionFactory.createConnection();
//開啟連接配接
connection.start();
//建立會話session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立隊列,同生産者一緻
Queue queue = session.createQueue(QUEUE_NAME);
//建立消息消費者
MessageConsumer messageConsumer = session.createConsumer(queue);
/**
* 方法2:通過監聽器的方式
*/
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("**** 消費者接收到消息 ****:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read(); // 必須加行代碼,不然程式會直接往下執行結束了
messageConsumer.close();
session.close();
connection.close();
System.out.println("**** 消費者消費消息完成 ****");
}
}
啟動消費者,會消息掉已生産的消息,mq控制台和資料庫資料都會消費
隊列消費總結:
- 當DeliveryMode設定為NON_PERSISTENCE時,消息被儲存在記憶體中
- 當DeliveryMode設定為PERSISTENCE時,消息儲存在broker的相應的檔案或者資料庫中
隊列中的消息一旦被consumer消費就從Broker中删除
主題
一定是先啟動消費者訂閱主題
消費者
public class JmsConsumerTopicJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String TOPIC_NAME = "topic-jdbc";
public static void main(String[] args) throws Exception {
/**
* 持久化主題消息訂閱,類似于微信公衆号訂閱
* 需要先啟動消費者,訂閱上主題之後,後續生産主題消息,消費者(訂閱者)就會接收到消息
* 消費者(訂閱者)訂閱主題之後,不管是線上還是離線狀态,隻要保持正常訂閱狀态,期間生産的消息都會接收到。離線的會在再次線上後接收到之前的消息
*/
System.out.println("jdbc-1"); //模拟訂閱使用者
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("jdbc-1"); // 設定clientId,表明訂閱者
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "jdbc-1");
connection.start();
Message message = topicSubscriber.receive();
while (null != message) {
TextMessage textMessage = (TextMessage) message;
System.out.println("收到的持久化topic消息:" + textMessage.getText());
message = topicSubscriber.receive();
}
session.close();
connection.close();
}
}
啟動消費者:
檢視資料庫,
ACTIVEMQ_ACKS
表中新增一條記錄,為目前訂閱者的資訊
生産者
public class JmsProduceTopicJDBC {
public static final String ACTIVEMQ_URL = "tcp://localhost61616";
public static final String USERNAME = "admin";
public static final String PASSWORD = "hll123";
public static final String TOPIC_NAME = "topic-jdbc";
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
// connection啟動之前必須設定持久化主題
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 1; i <= 3; i++) {
TextMessage textMessage = session.createTextMessage("jdbc-msg:" + i);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** 持久化消息發送到MQ完成 **** ");
}
}
啟動生産者:
檢視資料庫:
ACTIVEMQ_MSGS
會新增消費的資料,
ACTIVEMQ_ACKS
的LAST_ACKED_ID會更新為最後消費消息的ID
ACTIVEMQ_MSGS
裡的topic消息在消費後是不會立刻删除的,而queue在消費後自動删除
小總結
-
queue
生産的消息在沒有消費的情況下,消息會存在
表中,隻要任意一個消費者消費這些消息後,這些消息就會立即删除activemq_msgs
-
topic
一般是先啟動消費者訂閱之後,再通過生産者生産消息,之後消息也會存在
表中,activemq_msgs
表存的是消費者訂閱資訊activemq_acks
-
開發注意事項
1.mysql驅動包(或者其他資料庫)和對應的資料庫連接配接池jar包需要放到activemq目錄下的lib中
2.初次配置完成,資料庫生成表之後,activemq.xml中配置
createTablesOnStartup=false
3.BeanFactory not initialized or already closed異常
将作業系統的機器名帶有的"_"符号去掉,重新開機作業系統