消息中間件之ActiveMQ-基礎篇
注:該部落格參考 尚矽谷周陽老師,僅供個人學習使用,轉載請标明作者和此段文字
一、消息中間件是什麼?
MQ = 消息中間件
1. MQ産品總類
kafka、RabbitMQ、RocketMQ、ActiveMQ
2. 是什麼
利用可靠的消息傳遞機制進行與平台無關的資料交流,并基于資料通信來進行分布式系統的內建。
通過提供消息傳遞和消息排隊模型在分布式環境下提供應用解耦、彈性伸縮、備援存儲、流量削峰、異步通信、資料同步等功能。
3. 能幹嗎
解耦、削峰、異步
4. 過程
發送者把消息發送給消息伺服器,消息伺服器将消息存放在若幹隊列/主題中,在合适的時候,消息伺服器會将消息轉發給接收者。在這個過程中,發送和接受是異步的,也就是發送無需等待,而且發送者和接收者的生命周期也沒有必然關系。
什麼是異步?
消息發送者可以發送一個消息而無需等待響應。消息發送者将消息發送到一條虛拟的通道(主題或隊列)上;消息接收者則訂閱或監聽該通道。一條消息可能最終轉發給一個或多個消息接收者,這些消息接收者都無需對消息發送者做出同步回應。整個過程都是異步的。
什麼是系統之間解耦?
發送者和接收者不必了解對方,隻需要确認消息;發送者和接收者不必同時線上。
5. 去哪下
ActiveMq官網
http://activemq.apache.org
二、安裝
1. 安裝步驟
- 官網下載下傳
- 解壓縮
2. 啟動
- 普通啟動
cd activemq解壓包/bin
./activemq start #預設程序端口号61616
- 關閉
cd activemq解壓包/bin
./activemq stop
- 重新開機
cd activemq解壓包/bin
./activemq restart #預設程序端口号61616(背景端口号)
- 帶日志的啟動
3. 通路
http://ip:8161/admin #預設前台端口号8161
# 預設的使用者名和密碼是admin/admin
/*如果通路不了?
1. 檢視雲伺服器是否将8161和61616端口(61616端口可以不配置)加入開放
2. activemq 配置檔案 jetty.xml 中的host 從127.0.0.1變成0.0.0.0
3. 檢視防火牆是否開放8161
systemctl status firewalld 檢視防火牆狀态(是否啟動)
firewall-cmd --list-port 檢視防火牆開放的端口
firewall-cmd --zone=public --add-port=61616/tcp --permanent 開放61616端口
firewall-cmd --zone=public --add-port=8161/tcp --permanent 開放8161端口
firewall-cmd --reload 使配置生效
firewall-cmd --query-port=61616/tcp 檢視61616是否開放
*/
備注:
- 采用61616端口提供JMS服務
- 采用8161端口提供管理控制台服務
三、java實作ActiveMQ通訊
- IDEA建立Maven工程
- POM.xml檔案導入包
<!--activemq所需要的jar包-->
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
- JMS編碼總體架構
粗說目的地Destination (隊列和主題)
4.1 在點對點的消息傳遞域中,目的地被稱為隊列(queue)
4.2 在釋出訂閱消息傳遞域中,目的地被稱為主題(topic)
- java編碼(隊列)
public class JmsProduce {
//defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT; 模闆
public static final String ACTIVEMQ_URL = "tcp://***:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) {
//1. 建立連接配接工廠,按照給定的url位址,采用預設的使用者名和密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = null;
try {
//2. 通過連接配接工廠,獲得連接配接connection并啟動通路
connection = activeMQConnectionFactory.createConnection();
connection.start();
//3. 建立會話session
//兩個參數,第一個叫事務,第二個叫簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地(具體是隊列還是主題topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5. 建立消息的生産者
MessageProducer producer = session.createProducer(queue);
//6. 通過消息生産者producer生産三條消息發送的MQ的隊列裡面
for (int i = 0; i < 3; i++) {
//7. 建立消息
TextMessage textMessage = session.createTextMessage("message" + i);
//8. 通過producer發送給MQ
producer.send(textMessage);
}
//9. 關閉資源
producer.close();
session.close();
connection.close();
System.out.println("消息發送到MQ成功!");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
/*消費者方法1
同步阻塞方式(receive()):
訂閱者或者接收者調用MessageConsumer的receive()方法來接收消息,receive方法能夠接收到消息之前(或逾時之前)将一直阻塞。
*/
public class JmsConsumer {
//defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT; 模闆
public static final String ACTIVEMQ_URL = "tcp://***:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) {
//1. 建立連接配接工廠,按照給定的url位址,采用預設的使用者名和密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = null;
try {
//2. 通過連接配接工廠,獲得連接配接connection并啟動通路
connection = activeMQConnectionFactory.createConnection();
connection.start();
//3. 建立會話session
//兩個參數,第一個叫事務,第二個叫簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地(具體是隊列還是主題topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5. 建立消息的消費者
MessageConsumer consumer = session.createConsumer(queue);
//6. 消費者接收消息
while(true){
/*
* receive() 死等
* receive(long timeout) 等timeout時間後,消費者就走了*/
TextMessage textMessage = (TextMessage) consumer.receive();
if(null != textMessage){
System.out.println("***消費者接收到消息:"+ textMessage.getText());
}else{
break;
}
}
//9. 關閉資源
consumer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
/*消費者方法2
通過監聽的方式來接收消息。
異步非阻塞方式(監聽器onMessage())
訂閱者或接受者通過MessageConsumer的setMessageListener(MessageListener listener)注冊一個消息監聽器,
當消息到達之後,系統自動調用監聽器MessageListener的onMessage(Message message)方法。
*/
public class JmsConsumer2 {
//defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT; 模闆
public static final String ACTIVEMQ_URL = "tcp://***:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) {
//1. 建立連接配接工廠,按照給定的url位址,采用預設的使用者名和密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = null;
try {
//2. 通過連接配接工廠,獲得連接配接connection并啟動通路
connection = activeMQConnectionFactory.createConnection();
connection.start();
//3. 建立會話session
//兩個參數,第一個叫事務,第二個叫簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地(具體是隊列還是主題topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5. 建立消息的消費者
MessageConsumer consumer = session.createConsumer(queue);
//6. 消費者接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message != null && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("***消費者接收到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read(); //使控制台不關閉,等待輸入,使下面的代碼不立即執行
//9. 關閉資源
consumer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}catch (IOException e){
e.printStackTrace();
}
}
}
- 控制台說明
消費者3消費情況(隊列)
6.1 先生産,隻啟動1号消費者。問題:1号消費者能消費消息嗎?
可以
6.2 先生産,先啟動1号消費者,再啟動2号消費者。問題:2号消費者能消費消息嗎?
1号可以消費;2号消費者不可以消費,因為已經被1号消費完。
6.3 先啟動兩個消費者,再生産6條消息,請問,消費情況如何?
一人一半。
釋出訂閱者消息傳遞域的特點(主題):
(1)生産者将消息釋出到topic中,每個消息可以有多個消費者,屬于1:N的關系。
(2)生産者和消費者之間有時間上的相關性。訂閱某一個主題的消費者隻能消費自它訂閱之後釋出的消息。
(3)生産者生産時,topic 不儲存消息,它是無狀态的,假如無人訂閱就去生産,那就是一條廢消息,是以,一般先啟動消費者再啟動生産者。
JMS規範允許客戶建立持久訂閱,這在一定程度上發送了時間上的相關要求。持久訂閱允許消費者消費它在未處于激活狀态時發送的消息。一句話,好比微信公衆号訂閱
- java代碼(主題)
//消費者
public class JmsConsumer_Topic {
//defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT; 模闆
public static final String ACTIVEMQ_URL = "tcp://***:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) {
//1. 建立連接配接工廠,按照給定的url位址,采用預設的使用者名和密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = null;
try {
//2. 通過連接配接工廠,獲得連接配接connection并啟動通路
connection = activeMQConnectionFactory.createConnection();
connection.start();
//3. 建立會話session
//兩個參數,第一個叫事務,第二個叫簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.建立目的地(具體是隊列還是主題topic)
Topic topic = session.createTopic(TOPIC_NAME);
//5. 建立消息的消費者
MessageConsumer consumer = session.createConsumer(topic);
//6. 消費者接收消息
while (true) {
/*
* receive() 死等
* receive(long timeout) 等timeout時間後,消費者就走了*/
TextMessage textMessage = (TextMessage) consumer.receive();
if (null != textMessage) {
System.out.println("***消費者接收到消息:" + textMessage.getText());
} else {
break;
}
}
//9. 關閉資源
consumer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
//生産者
public class JmsProduce_Topic {
public static final String ACTIVEMQ_URL = "tcp://***:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = null;
try {
connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < 3; i++) {
//7. 建立消息
TextMessage textMessage = session.createTextMessage("message" + i);
//8. 通過producer發送給MQ
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("消息發送到MQ成功!");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
- 兩大模式對比
四、JMS 規範和落地産品
- javaEE 是什麼?
JMS 是什麼?
Java Message Service,Java消息服務是javaEE中的一個技術。Java消息服務指的是兩個應用程式之問進行異步通信的APl,它為标準消息協定和消息服務提供了一組通用接口,包括建立、發送、讀取消息等,用于支援JAVA應用程式開發。在 Javaee中,當兩個應用程式使用JMS進行通信時,它們之間并不是直接相連的,而是通過一個共同的消息收發服務件關聯起來以達到解耦/異步/削峰的效果。
- MQ中間件其它落地産品? 對比?
- JMS的組成結構和特點
4.1 消息頭
JMS的消息頭有哪些屬性:
JMSDestination:消息目的地
JMSDeliveryMode:消息持久化模式
JMSExpiration:消息過期時間
JMSPriority:消息的優先級
JMSMessageID:消息的唯一辨別符。後面我們會介紹如何解決幂等性。
說明:消息的生産者可以set這些屬性,消息的消費者可以get這些屬性。
這些屬性在send方法裡面也可以設定。
package com.at.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce_topic {
public static final String ACTIVEMQ_URL = "tcp://***:61626";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("topic_name--" + i);
// 這裡可以指定每個消息的目的地
textMessage.setJMSDestination(topic);
/*
持久模式和非持久模式。
一條持久性的消息:應該被傳送“一次僅僅一次”,這就意味着如果JMS提供者出現故障,該消息并不會丢失,它會在伺服器恢複之後再次傳遞。
一條非持久的消息:最多會傳遞一次,這意味着伺服器出現故障,該消息将會永遠丢失。
*/
textMessage.setJMSDeliveryMode(0);
/*
可以設定消息在一定時間後過期,預設是永不過期。
消息過期時間,等于Destination的send方法中的timeToLive值加上發送時刻的GMT時間值。
如果timeToLive值等于0,則JMSExpiration被設為0,表示該消息永不過期。
如果發送後,在消息過期時間之後還沒有被發送到目的地,則該消息被清除。
*/
textMessage.setJMSExpiration(1000);
/* 消息優先級,從0-9十個級别,0-4是普通消息5-9是加急消息。
JMS不要求MQ嚴格按照這十個優先級發送消息但必須保證加急消息要先于普通消息到達。預設是4級。
*/
textMessage.setJMSPriority(10);
// 唯一辨別每個消息的辨別。MQ會給我們預設生成一個,我們也可以自己指定。
textMessage.setJMSMessageID("ABCD");
// 上面有些屬性在send方法裡也能設定
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** TOPIC_NAME消息發送到MQ完成 ****");
}
}
4.2 消息體
封裝具體的消息資料,有5種消息體格式,發送和接收的消息體類型必須一緻對應。
/*
5種消息體格式:
TextMessage 普通字元串消息,包含一個string
MapMessage 一個Map類型的消息,key為string類型,而值為Java的基本類型
ByteMessage 二進制數組消息,包含一個byte[]
StreamMessage Java資料流消息,用标準流操作來順序的填充和讀取
ObjectMessage 對象消息,包含一個可序列化的Java對象
*/
//消息生産者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce_topic {
public static final String ACTIVEMQ_URL = "tcp://***:61626";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
for (int i = 1; i < 4 ; i++) {
// 發送TextMessage消息體
TextMessage textMessage = session.createTextMessage("topic_name--" + i);
messageProducer.send(textMessage);
// 發送MapMessage 消息體。set方法: 添加,get方式:擷取
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name", "張三"+i);
mapMessage.setInt("age", 18+i);
messageProducer.send(mapMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** TOPIC_NAME消息發送到MQ完成 ****");
}
}
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsummer_topic {
public static final String ACTIVEMQ_URL = "tcp://***:61626";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer messageConsumer = session.createConsumer(topic);
messageConsumer.setMessageListener( (message) -> {
// 判斷消息是哪種類型之後,再強轉。
if (null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("****消費者text的消息:"+textMessage.getText());
}catch (JMSException e) {
}
}
if (null != message && message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
try {
System.out.println("****消費者的map消息:"+mapMessage.getString("name"));
System.out.println("****消費者的map消息:"+mapMessage.getInt("age"));
}catch (JMSException e) {
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
4.3 消息屬性
如果需要除消息頭字段之外的值,那麼可以使用消息屬性。他是識别/去重/重點标注等操作,非常有用的方法。
他們是以屬性名和屬性值對的形式制定的。可以将屬性是為消息頭得擴充,屬性指定一些消息頭沒有包括的附加資訊,比如可以在屬性裡指定消息選擇器。消息的屬性就像可以配置設定給一條消息的附加消息頭一樣。它們允許開發者添加有關消息的不透明附加資訊。它們還用于暴露消息選擇器在消息過濾時使用的資料。
下圖是設定消息屬性的API:
//消息生産者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce_topic {
public static final String ACTIVEMQ_URL = "tcp://***:61626";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("topic_name--" + i);
// 調用Message的set*Property()方法,就能設定消息屬性。根據value的資料類型的不同,有相應的API。
textMessage.setStringProperty("From","[email protected]");
textMessage.setByteProperty("Spec", (byte) 1);
textMessage.setBooleanProperty("Invalide",true);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** TOPIC_NAME消息發送到MQ完成 ****");
}
}
//消費者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsummer_topic {
public static final String ACTIVEMQ_URL = "tcp://***:61626";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer messageConsumer = session.createConsumer(topic);
messageConsumer.setMessageListener( (message) -> {
if (null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("消息體:"+textMessage.getText());
System.out.println("消息屬性:"+textMessage.getStringProperty("From"));
System.out.println("消息屬性:"+textMessage.getByteProperty("Spec"));
System.out.println("消息屬性:"+textMessage.getBooleanProperty("Invalide"));
}catch (JMSException e) {
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
- 消息的持久化
什麼是持久化消息?
保證消息隻被傳送一次和成功使用一次。在持久性消息傳送至目标時,消息服務将其放入持久性資料存儲。如果消息服務由于某種原因導緻失敗,它可以恢複此消息并将此消息傳送至相應的消費者。雖然這樣增加了消息傳送的開銷,但卻增加了可靠性。
了解:在消息生産者将消息成功發送給MQ消息中間件之後。無論是出現任何問題,如:MQ伺服器當機、消費者掉線等。都保證(topic要之前注冊過,queue不用)消息消費者,能夠成功消費消息。如果消息生産者發送消息就失敗了,那麼消費者也不會消費到該消息。
5.1 queue消息非持久和持久
queue非持久,當伺服器當機,消息不存在(消息丢失了)。
queue持久化,當伺服器當機,消息依然存在。queue消息預設是持久化的。
持久化消息,這是隊列預設的傳送方式,保證這些消息隻被傳送一次和成功使用一次。對于這些消息,可靠性是優先考慮的因素。
可靠性的另一個重要方面是確定持久性消息傳送至目标後,消息服務在向消費者傳送它們之前不會丢失這些消息。
5.2 topic消息持久化
topic預設就是非持久化的,因為生産者生産消息時,消費者也要線上,這樣消費者才能消費到消息。
topic消息持久化,隻要消費者向MQ伺服器注冊過,所有生産者釋出成功的消息,該消費者都能收到,不管是MQ伺服器當機還是消費者不線上。類似微信公衆号訂閱釋出。
注意:
- 一定要先運作一次消費者,等于向MQ注冊,類似我訂閱了這個主題。
- 然後再運作生産者發送消息。
- 之後無論消費者是否線上,都會收到消息。如果不線上的話,下次連接配接的時候,會把沒有收過的消息都接收過來。
//生産者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
// 持久化topic 的消息生産者
public class JmsProduce_persistence {
public static final String ACTIVEMQ_URL = "tcp://***:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
// 設定持久化topic
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 設定持久化topic之後再,啟動連接配接
connection.start();
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("topic_name--" + i);
messageProducer.send(textMessage);
MapMessage mapMessage = session.createMapMessage();
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** TOPIC_NAME消息發送到MQ完成 ****");
}
}
//消費者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
// 持久化topic 的消息消費者
public class JmsConsummer_persistence {
public static final String ACTIVEMQ_URL = "tcp://***:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
// 設定用戶端ID。向MQ伺服器注冊自己的名稱
connection.setClientID("marrry");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
// 建立一個topic訂閱者對象。一參是topic,二參是訂閱者名稱
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
// 之後再開啟連接配接
connection.start();
Message message = topicSubscriber.receive();
while (null != message){
TextMessage textMessage = (TextMessage)message;
System.out.println(" 收到的持久化 topic :"+textMessage.getText());
message = topicSubscriber.receive();
}
session.close();
connection.close();
}
}
控制台介紹:
topic頁面還是和之前的一樣。另外在subscribers頁面也會顯示。如下:
- 消息的事務性
(1) 生産者開啟事務後,執行commit方法,這批消息才真正的被送出。不執行commit方法,這批消息不會送出。執行rollback方法,之前的消息會復原掉。生産者的事務機制,要高于簽收機制,當生産者開啟事務,簽收機制不再重要。
(2) 消費者開啟事務後,執行commit方法,這批消息才算真正的被消費。不執行commit方法,這些消息不會标記已消費,下次還會被消費。執行rollback方法,是不能復原之前執行過的業務邏輯,但是能夠復原之前的消息,復原後的消息,下次還會被消費。消費者利用commit和rollback方法,甚至能夠違反一個消費者隻能消費一次消息的原理。
(3) 問:消費者和生産者需要同時操作事務才行嗎?
答:消費者和生産者的事務,完全沒有關聯,各自是各自的事務。
//生産者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "tcp://***:61616";
private static final String ACTIVEMQ_QUEUE_NAME = "Queue-TX";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//1.建立會話session,兩個參數transacted=事務,acknowledgeMode=确認模式(簽收)
//設定為開啟事務
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
producer.send(textMessage);
if(i == 2){
throw new RuntimeException("GG.....");
}
}
// 2. 開啟事務後,使用commit送出事務,這樣這批消息才能真正的被送出。
session.commit();
System.out.println("消息發送完成");
} catch (Exception e) {
System.out.println("出現異常,消息復原");
// 3. 工作中一般,當代碼出錯,我們在catch代碼塊中復原。這樣這批發送的消息就能復原。
session.rollback();
} finally {
//4. 關閉資源
producer.close();
session.close();
connection.close();
}
}
}
//消費者
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "tcp://***:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "Queue-TX";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 建立會話session,兩個參數transacted=事務,acknowledgeMode=确認模式(簽收)
// 消費者開啟了事務就必須手動送出,不然會重複消費消息
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
int a = 0;
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("***消費者接收到的消息: " + textMessage.getText());
if(a == 0){
System.out.println("commit");
session.commit();
}
if (a == 2) {
System.out.println("rollback");
session.rollback();
}
a++;
} catch (Exception e) {
System.out.println("出現異常,消費失敗,放棄消費");
try {
session.rollback();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
}
});
//關閉資源
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
- 消息的簽收機制
一、簽收的幾種方式
① 自動簽收(Session.AUTO_ACKNOWLEDGE):該方式是預設的。該種方式,無需我們程式做任何操作,架構會幫我們自動簽收收到的消息。
② 手動簽收(Session.CLIENT_ACKNOWLEDGE):手動簽收。該種方式,需要我們手動調用Message.acknowledge(),來簽收消息。如果不簽收消息,該消息會被我們反複消費,隻到被簽收。
③ 允許重複消息(Session.DUPS_OK_ACKNOWLEDGE):多線程或多個消費者同時消費到一個消息,因為線程不安全,可能會重複消費。該種方式很少使用到。
④ 事務下的簽收(Session.SESSION_TRANSACTED):開始事務的情況下,可以使用該方式。該種方式很少使用到。
二、事務和簽收的關系
① 在事務性會話中,當一個事務被成功送出則消息被自動簽收。如果事務復原,則消息會被再次傳送。事務優先于簽收,開始事務後,簽收機制不再起任何作用。
② 非事務性會話中,消息何時被确認取決于建立會話時的應答模式。
③ 生産者事務開啟,隻有commit後才能将全部消息變為已消費。
④ 事務偏向生産者,簽收偏向消費者。也就是說,生産者使用事務更好點,消費者使用簽收機制更好點。
//生産者
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "tcp://***:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "Queue-ACK";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
producer.send(textMessage);
}
System.out.println("消息發送完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
session.close();
connection.close();
}
}
}
//消費者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "tcp://***:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "Queue-ACK";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("***消費者接收到的消息: " + textMessage.getText());
/* 設定為Session.CLIENT_ACKNOWLEDGE後,要調用該方法,标志着該消息已被簽收(消費)。
如果不調用該方法,該消息的标志還是未消費,下次啟動消費者或其他消費者還會收到改消息。
*/
textMessage.acknowledge();
} catch (Exception e) {
System.out.println("出現異常,消費失敗,放棄消費");
}
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
- JMS的點對點總結
點對點模型是基于隊列的,生産者發消息到隊列,消費者從隊列接收消息,隊列的存在使得消息的異步傳輸成為可能。和我們平時給朋友發送短信類似。
如果在Session關閉時有部分消息己被收到但還沒有被簽收(acknowledged),那當消費者下次連接配接到相同的隊列時,這些消息還會被再次接收
隊列可以長久地儲存消息直到消費者收到消息。消費者不需要因為擔心消息會丢失而時刻和隊列保持激活的連接配接狀态,充分展現了異步傳輸模式的優勢
- JMS的釋出訂閱總結
(1) JMS的釋出訂閱總結
JMS Pub/Sub 模型定義了如何向一個内容節點釋出和訂閱消息,這些節點被稱作topic。
主題可以被認為是消息的傳輸中介,釋出者(publisher)釋出消息到主題,訂閱者(subscribe)從主題訂閱消息。
主題使得消息訂閱者和消息釋出者保持互相獨立不需要解除即可保證消息的傳送
(2) 非持久訂閱
非持久訂閱隻有當用戶端處于激活狀态,也就是和MQ保持連接配接狀态才能收發到某個主題的消息。
如果消費者處于離線狀态,生産者發送的主題消息将會丢失廢棄,消費者永遠不會收到。
一句話:先訂閱注冊才能接受到釋出,隻給訂閱者釋出消息。
(3) 持久訂閱
用戶端首先向MQ注冊一個自己的身份ID識别号,當這個用戶端處于離線時,生産者會為這個ID儲存所有發送到主題的消息,當客戶再次連接配接到MQ的時候,會根據消費者的ID得到所有當自己處于離線時發送到主題的消息
當持久訂閱狀态下,不能恢複或重新派送一個未簽收的消息。
持久訂閱才能恢複或重新派送一個未簽收的消息。
(4) 非持久和持久化訂閱如何選擇
當所有的消息必須被接收,則用持久化訂閱。當消息丢失能夠被容忍,則用非持久訂閱。
五、ActiveMQ的broker
(1) broker是什麼
相當于一個ActiveMQ伺服器執行個體。說白了,Broker其實就是實作了用代碼的形式啟動ActiveMQ将MQ嵌入到Java代碼中,以便随時用随時啟動,再用的時候再去啟動這樣能節省了資源,也保證了可用性。這種方式,我們實際開發中很少采用,因為他缺少太多了東西,如:日志,資料存儲等等。
(2) 啟動broker時指定配置檔案
啟動broker時指定配置檔案,可以幫助我們在一台伺服器上啟動多個broker。實際工作中一般一台伺服器隻啟動一個broker。
(3) 嵌入式的broker啟動
用ActiveMQ Broker作為獨立的消息伺服器來建構Java應用。
ActiveMQ也支援在vm中通信基于嵌入的broker,能夠無縫的內建其他java應用。
//pom.xml添加一個依賴
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
//嵌入式broke的啟動類
import org.apache.activemq.broker.BrokerService;
public class EmbedBroker {
public static void main(String[] args) throws Exception {
//ActiveMQ也支援在vm中通信基于嵌入的broker
BrokerService brokerService = new BrokerService();
brokerService.setPopulateJMSXUserID(true);
brokerService.addConnector("tcp://127.0.0.1:61616");
brokerService.start();
}
}
六、Spring整合ActiveMQ
1. 添加依賴
<dependencies>
<!-- activemq核心依賴包 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<!-- 嵌入式activemq的broker所需要的依賴包 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
<!-- activemq連接配接池 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.9</version>
</dependency>
<!-- spring支援jms的包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<!-- Spring核心依賴 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-orm</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
</dependencies>
2. spring.xml檔案
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd">
<!-- 包掃描、隻要是标注了@Service、@Repository、@Component、@Controller -->
<context:component-scan base-package="com.kun">
</context:component-scan>
<!--配置連接配接池-->
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<!--真正生産Connection的ConnectionFactory,由對應的JMS服務商提供-->
<property name="connectionFactory">
<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://***:61616"/>
</bean>
</property>
<property name="maxConnections" value="100"/>
</bean>
<!--隊列的目的地,點對點的Queue-->
<bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue">
<!--通過構造注入Queue的名-->
<constructor-arg index="0" value="spring-active-queue"></constructor-arg>
</bean>
<!--主題的目的地,釋出訂閱的主題Topic-->
<bean class="org.apache.activemq.command.ActiveMQTopic" id="activeMQTopic">
<constructor-arg index="0" value="spring-active-topic"/>
</bean>
<!--Spring 提供的JMS工具類,它可以進行消息發送,接收等-->
<bean class="org.springframework.jms.core.JmsTemplate" id="jmsTemplate">
<!--傳入連接配接工廠-->
<property name="connectionFactory" ref="connectionFactory"/>
<!--傳入目的地,目前是隊列-->
<property name="defaultDestination" ref="activeMQQueue"/>
<!-- <property name="defaultDestination" ref="activeMQTopic"/>-->
<!--消息自動轉換器-->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean>
</property>
</bean>
</beans>
3. 隊列編碼
//生産者
package com.kun;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
@Service
public class SpringMQ_Produce {
@Resource(name = "jmsTemplate")
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring.xml");
SpringMQ_Produce produce = applicationContext.getBean("springMQ_Produce", SpringMQ_Produce.class);
produce.jmsTemplate.send((Session session) -> {
TextMessage textMessage = session.createTextMessage("***spring和ActiveMQ的整合***");
return textMessage;
});
System.out.println("********send task over");
}
}
//消費者
package com.kun;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class SpringMQ_Consumer {
@Resource
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring.xml");
SpringMQ_Consumer consumer = applicationContext.getBean("springMQ_Consumer", SpringMQ_Consumer.class);
String retValue = (String) consumer.jmsTemplate.receiveAndConvert();
System.out.println("**********消費者收到的消息:"+ retValue);
}
}
4. 主題編碼
5.配置消費者的監聽類
不啟動消費者,生産者生産消息,消費者自動接收消息
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd">
<!-- 包掃描、隻要是标注了@Service、@Repository、@Component、@Controller -->
<context:component-scan base-package="com.kun">
</context:component-scan>
<!--配置連接配接池-->
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<!--真正生産Connection的ConnectionFactory,由對應的JMS服務商提供-->
<property name="connectionFactory">
<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://***:61616"/>
</bean>
</property>
<property name="maxConnections" value="100"/>
</bean>
<!--隊列的目的地,點對點的Queue-->
<bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue">
<!--通過構造注入Queue的名-->
<constructor-arg index="0" value="spring-active-queue"></constructor-arg>
</bean>
<!--主題的目的地,釋出訂閱的主題Topic-->
<bean class="org.apache.activemq.command.ActiveMQTopic" id="activeMQTopic">
<constructor-arg index="0" value="spring-active-topic"/>
</bean>
<!--Spring 提供的JMS工具類,它可以進行消息發送,接收等-->
<bean class="org.springframework.jms.core.JmsTemplate" id="jmsTemplate">
<!--傳入連接配接工廠-->
<property name="connectionFactory" ref="connectionFactory"/>
<!--傳入目的地-->
<property name="defaultDestination" ref="activeMQTopic"/>
<!--消息自動轉換器-->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean>
</property>
</bean>
<!--配置消費者監聽程式-->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="jmsContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="activeMQTopic"/>
<property name="messageListener" ref="myMessageListener"/>
</bean>
</beans>
package com.kun;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
@Component
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
七、Springboot整合ActiveMQ
1. 隊列
- pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.kun</groupId>
<artifactId>boot_mq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot_mq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.junit.vintage</groupId>-->
<!-- <artifactId>junit-vintage-engine</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- application.yml
spring:
activemq:
# activemq的broker的url
broker-url: tcp://121.199.70.188:61616
# 連接配接activemq的broker所需的賬号和密碼
user: admin
password: admin
jms:
# 目的地是queue還是topic,false(預設) = queue , true = topic
pub-sub-domain: false
# 自定義隊列名稱,隻是個常量
myqueue: boot-activemq-queue
- ConfigBean.java
package com.kun.boot_mq.config;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
@Component
@EnableJms //開啟Jms的适配注解
public class ConfigBean {
@Value("${myqueue}")
private String myQueue;
@Bean //bean id="" class = ""
public Queue queue(){
return new ActiveMQQueue(myQueue);
}
}
- Queue_Produce.java 生産者
package com.kun.boot_mq;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.jms.Queue;
import java.util.UUID;
@Component
public class Queue_Produce {
@Resource
private JmsMessagingTemplate jmsMessagingTemplate;
//注入目的地
@Resource
private Queue queue;
public void produceMsg(){
jmsMessagingTemplate.convertAndSend(queue,"*******:"+ UUID.randomUUID().toString().substring(0,6));
}
//定時任務,每3秒執行一次。
@Scheduled(fixedDelay = 3000)
public void produceMessageScheduled(){
}
}
- Queue_Consumer.java 消費者
package com.kun.boot_mq;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.TextMessage;
@Component
public class Queue_Consumer {
// 注冊一個監聽器。destination指定監聽的主題。
@JmsListener(destination = "${myqueue}")
public void receive(TextMessage textMessage) throws Exception{
System.out.println(" *** 消費者收到消息 ***"+textMessage.getText());
}
}
- TestActiveMQ.java
package com.kun.boot_mq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import javax.annotation.Resource;
//加載主類
@SpringBootTest(classes = BootMqApplication.class)
//加載spring的junit,如果不能用,把spring-boot-starter-test中的exclusion删除
@RunWith(SpringJUnit4ClassRunner.class)
//加載web
@WebAppConfiguration
public class TestActiveMQ {
@Resource
private Queue_Produce queue_produce;
@Test
public void testSend(){
queue_produce.produceMsg();
}
}
- BootMqApplication.java
package com.kun.boot_mq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling //使@Scheduled生效
public class BootMqApplication {
public static void main(String[] args) {
SpringApplication.run(BootMqApplication.class, args);
}
}
2. 主題
- pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.kun</groupId>
<artifactId>boot_mq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot_mq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.junit.vintage</groupId>-->
<!-- <artifactId>junit-vintage-engine</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- application.yml
spring:
activemq:
# activemq的broker的url
broker-url: tcp://121.199.70.188:61616
# 連接配接activemq的broker所需的賬号和密碼
user: admin
password: admin
jms:
# 目的地是queue還是topic,false(預設) = queue , true = topic
pub-sub-domain: true
# 自定義隊列名稱,隻是個常量
mytopic: boot-activemq-topic
- ConfigBean.java
package com.kun.boot_mq.config;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.stereotype.Component;
import javax.jms.Topic;
@Component
@EnableJms
public class ConfigBean {
@Value("${mytopic}")
private String topicName ;
@Bean
public Topic topic() {
return new ActiveMQTopic(topicName);
}
}
- Topic_Produce.java 生産者
package com.kun.boot_mq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.jms.Topic;
import java.util.UUID;
@Component
public class Topic_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate ;
@Autowired
private Topic topic ;
@Scheduled(fixedDelay = 3000)
public void produceTopic(){
jmsMessagingTemplate.convertAndSend(topic,"主題消息"+ UUID.randomUUID().toString().substring(0,6));
}
}
- Topic_Consummer.java 消費者
package com.kun.boot_mq;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.TextMessage;
@Component
public class Topic_Consummer {
@JmsListener(destination = "${mytopic}")
public void receive(TextMessage textMessage) throws Exception{
System.out.println("消費者受到訂閱的主題:"+textMessage.getText());
}
}
- BootMqApplication.java
package com.kun.boot_mq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling //使@Scheduled生效
public class BootMqApplication {
public static void main(String[] args) {
SpringApplication.run(BootMqApplication.class, args);
}
}
八、ActiveMQ的傳輸協定
簡介:
ActiveMQ支援的client-broker通訊協定有:TCP、NIO、UDP、SSL、Http(s)、VM。其中配置Transport Connector的檔案在ActiveMQ安裝目錄的conf/activemq.xml中的标簽之内。
activemq傳輸協定的官方文檔:http://activemq.apache.org/configuring-version-5-transports.html
activemq.xml
<!--
在配置資訊中,URI描述資訊的頭部都是采用協定名稱:例如
描述amqp協定的監聽端口時,采用的URI描述格式為“amqp://······”;
描述Stomp協定的監聽端口時,采用URI描述格式為“stomp://······”;
唯獨在進行openwire協定描述時,URI頭卻采用的“tcp://······”。這是因為ActiveMQ中預設的消息協定就是openwire
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
1. 支援的傳輸協定
個人說明:除了tcp和nio協定,其他的了解就行。各種協定有各自擅長該協定的中間件,工作中一般不會使用activemq去實作這些協定。如: mqtt是物聯網專用協定,采用的中間件一般是mosquito。ws是websocket的協定,是和前端對接常用的,一般在java代碼中内嵌一個基站(中間件)。stomp好像是郵箱使用的協定的,各大郵箱公司都有基站(中間件)。
注意:協定不同,我們的代碼都會不同。
2. TCP協定
(1) Transmission Control Protocol(TCP)是預設的。TCP的Client監聽端口61616
(2) 在網絡傳輸資料前,必須要先序列化資料,消息是通過一個叫wire protocol的來序列化成位元組流。
(3) TCP連接配接的URI形式如:tcp://HostName:port?key=value&key=value,後面的參數是可選的。
(4) TCP傳輸的的優點:
TCP協定傳輸可靠性高,穩定性強
高效率:位元組流方式傳遞,效率很高
有效性、可用性:應用廣泛,支援任何平台
(5) 關于Transport協定的可選配置參數可以參考官網http://activemq.apache.org/tcp-transport-reference
3. NIO協定
(1) New I/O API Protocol(NIO)
(2) NIO協定和TCP協定類似,但NIO更側重于底層的通路操作。它允許開發人員對同一資源可有更多的client調用和伺服器端有更多的負載。
(3) 适合使用NIO協定的場景:
可能有大量的Client去連接配接到Broker上,一般情況下,大量的Client去連接配接Broker是被作業系統的線程所限制的。是以,NIO的實作比TCP需要更少的線程去運作,是以建議使用NIO協定。
可能對于Broker有一個很遲鈍的網絡傳輸,NIO比TCP提供更好的性能。
(4) NIO連接配接的URI形式:nio://hostname:port?key=value&key=value
(5) 關于Transport協定的可選配置參數可以參考官網http://activemq.apache.org/configuring-version-5-transports.html
4. AMQP協定
5. STOMP協定
6. MQTT協定
7. NIO協定案例
- ActiveMQ這些協定傳輸的底層預設都是使用BIO網絡的IO模型。隻有當我們指定使用nio才使用NIO的IO模型。
(1) 修改配置檔案activemq.xml① 修改配置檔案activemq.xml在 節點下添加如下内容:
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" />
② 修改完成後重新開機activemq:
③ 檢視管理背景,可以看到頁面多了nio
service activemq restart
(2) 編寫代碼
//生産者
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "nio://***:61618";
private static final String ACTIVEMQ_QUEUE_NAME = "nio-test";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
producer.send(textMessage);
}
System.out.println("消息發送完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
//8.關閉資源
producer.close();
session.close();
connection.close();
}
}
}
//消費者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "nio://***:61618";
private static final String ACTIVEMQ_QUEUE_NAME = "nio-test";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("***消費者接收到的消息: " + textMessage.getText());
} catch (Exception e) {
System.out.println("出現異常,消費失敗,放棄消費");
}
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
8. NIO協定案例增強
(1) 目的
- 上面是Openwire協定傳輸底層使用NIO網絡IO模型。 如何讓其他協定傳輸底層也使用NIO網絡IO模型呢?
(2) 修改配置檔案activemq.xml
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61626?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5682?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61623?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1893?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61624?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" />
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&org.apache.activemq.transport.nio.Se1ectorManager.maximumPoo1Size=50"/>
</transportConnectors>
官方文檔:http://activemq.apache.org/auto
auto : 針對所有的協定,他會識别我們是什麼協定(openwire、stomp、amqp、mqtt會被識别)
nio :使用NIO網絡IO模型
修改配置檔案後重新開機activemq。
(3) 代碼
//使用nio模型的tcp協定生産者。其他代碼和之前一樣
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61608";
private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
public static void main(String[] args) throws JMSException {
......
}
}
//使用nio模型的tcp協定消費者。其他代碼和之前一樣
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61608";
private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
public static void main(String[] args) throws JMSException, IOException {
......
}
}
//使用nio模型的nio協定生産者。其他代碼和之前一樣
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "nio://118.24.20.3:61608";
private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
public static void main(String[] args) throws JMSException {
......
}
}
//使用nio模型的nio協定消費者。其他代碼和之前一樣
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "nio://118.24.20.3:61608";
private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
public static void main(String[] args) throws JMSException, IOException {
......
}
}
九、ActiveMQ的消息存儲和持久化
1. 介紹
(1) 此處持久化和之前的持久化的差別MQ高可用:事務、可持久、簽收,是屬于MQ自身特性,自帶的。這裡的持久化是外力,是外部插件。之前講的持久化是MQ的外在表現,現在講的的持久是是底層實作。
(2) 是什麼:
官網文檔:http://activemq.apache.org/persistence
持久化是什麼?一句話就是:ActiveMQ當機了,消息不會丢失的機制。
說明:為了避免意外當機以後丢失資訊,需要做到重新開機後可以恢複消息隊列,消息系統一半都會采用持久化機制。ActiveMQ的消息持久化機制有JDBC,AMQ,KahaDB和LevelDB,無論使用哪種持久化方式,消息的存儲邏輯都是一緻的。就是在發送者将消息發送出去後,消息中心首先将消息存儲到本地資料檔案、記憶體資料庫或者遠端資料庫等。再試圖将消息發給接收者,成功則将消息從存儲中删除,失敗則繼續嘗試嘗試發送。消息中心啟動以後,要先檢查指定的存儲位置是否有未成功發送的消息,如果有,則會先把存儲位置中的消息發出去。
2. 有哪些?
(1) AMQ Message Store
基于檔案的存儲機制,是以前的預設機制,現在不再使用。
AMQ是一種檔案存儲形式,它具有寫入速度快和容易恢複的特點。消息存儲再一個個檔案中檔案的預設大小為32M,當一個檔案中的消息已經全部被消費,那麼這個檔案将被辨別為可删除,在下一個清除階段,這個檔案被删除。AMQ适用于ActiveMQ5.3之前的版本
(2) kahaDB
現在預設的。下面我們再詳細介紹。
(3) JDBC消息存儲
下面我們再詳細介紹。
(4) LevelDB消息存儲
過于新興的技術,現在有些不确定。
(5) JDBC Message Store with ActiveMQ Journal
下面我們再詳細介紹。
3. kahaDB消息存儲
(1) 介紹
基于日志檔案,從ActiveMQ5.4(含)開始預設的持久化插件。
官網文檔:http://activemq.aache.org/kahadb
官網上還有一些其他配置參數。
配置檔案activemq.xml中,如下:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
日志檔案的存儲目錄在:%activemq安裝目錄%/data/kahadb
(2) 說明
(3) KahaDB的存儲原理
4. JDBC消息存儲
4.1 設定步驟
(1) 原理圖
(2) 添加mysql資料庫的驅動包到lib檔案夾
(3) jdbcPersistenceAdapter配置 修改配置檔案activemq.xml。将之前的替換為jdbc的配置。如下:
<!--
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTableOnStartup="true"/>
</persistenceAdapter>
(4) 資料庫連接配接池配置
需要我們準備一個mysql資料庫,并建立一個名為activemq的資料庫。
在标簽和标簽之間插入資料庫連接配接池配置 具體操作如下:
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://mysql資料庫URL/activemq?relaxAutoCommit=true"/>
<property name="username" value="mysql資料庫使用者名"/>
<property name="password" value="mysql資料庫密碼"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
<import resource="jetty.xml"/>
之後需要建一個資料庫,名為activemq。建立的資料庫要采用latin1 或者ASCII編碼。https://blog.csdn.net/JeremyJiaming/article/details/88734762
預設是的dbcp資料庫連接配接池,如果要換成其他資料庫連接配接池,需要将該連接配接池jar包,也放到lib目錄下。
(5) 建庫SQL和創表說明
先建立一個資料庫activemq
重新開機activemq。會自動生成如下3張表。如果沒有自動生成,需要我們手動執行SQL。個人建議要自動生成,我在操作過程中檢視日志檔案,發現了不少問題,最終解決了這些問題後,是能夠自動生成的。
ACTIVEMQ_MSGS資料表:
ACTIVEMQ_ACKS資料表: ACTIVEMQ_LOCK資料表:
4.2 queue驗證和資料表變化
queue模式,非持久化不會将消息持久化到資料庫。
queue模式,持久化會将消息持久化資料庫。
我們使用queue模式持久化,釋出3條消息後,發現ACTIVEMQ_MSGS資料表多了3條資料。 啟動消費者,消費了所有的消息後,發現資料表的資料消失了。 queue模式非持久化,不會持久化消息到資料表。
4.3 topic驗證和說明
我們先啟動一下持久化topic的消費者。看到ACTIVEMQ_ACKS資料表多了一條消息。
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
// 持久化topic 的消息消費者
public class JmsConsummer_persistence {
private static final String ACTIVEMQ_URL = "tcp://***:61626";
public static final String TOPIC_NAME = "jdbc-02";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("marrry");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
connection.start();
Message message = topicSubscriber.receive();
while (null != message){
TextMessage textMessage = (TextMessage)message;
System.out.println(" 收到的持久化 topic :"+textMessage.getText());
message = topicSubscriber.receive();
}
session.close();
connection.close();
}
}
ACTIVEMQ_ACKS資料表,多了一個消費者的身份資訊。一條記錄代表:一個持久化topic消費者。 我們啟動持久化生産者釋出3個資料,ACTIVEMQ_MSGS資料表新增3條資料,消費者消費所有的資料後,ACTIVEMQ_MSGS資料表的資料并沒有消失。持久化topic的消息不管是否被消費,是否有消費者,産生的資料永遠都存在,且隻存儲一條。這個是要注意的,持久化的topic大量資料後可能導緻性能下降。這裡就像公總号一樣,消費者消費完後,消息還會保留。
4.4 總結
5. JDBC Message Store with ActiveMQ Journal
(1) 說明
這種方式克服了JDBC Store的不足,JDBC每次消息過來,都需要去寫庫讀庫。ActiveMQ Journal,使用高速緩存寫入技術,大大提高了性能。當消費者的速度能夠及時跟上生産者消息的生産速度時,journal檔案能夠大大減少需要寫入到DB中的消息。
舉個例子:生産者生産了1000條消息,這1000條消息會儲存到journal檔案,如果消費者的消費速度很快的情況下,在journal檔案還沒有同步到DB之前,消費者已經消費了90%的以上消息,那麼這個時候隻需要同步剩餘的10%的消息到DB。如果消費者的速度很慢,這個時候journal檔案可以使消息以批量方式寫到DB。
為了高性能,這種方式使用日志檔案存儲+資料庫存儲。先将消息持久到日志檔案,等待一段時間再将未消費的消息持久到資料庫。該方式要比JDBC性能要高。
(2) 配置
下面是基于上面JDBC配置,再做一點修改:
6. 總結
① jdbc效率低,kahaDB效率高,jdbc+Journal效率較高。
② 持久化消息主要指的是:MQ所在伺服器當機了消息不會丢試的機> 制。
③ 持久化機制演變的過程:
從最初的AMQ Message Store方案到ActiveMQ V4版本退出的High Performance Journal(高性能事務支援)附件,并且同步推出了關于關系型資料庫的存儲方案。ActiveMQ5.3版本又推出了對KahaDB的支援(5.4版本後被作為預設的持久化方案),後來ActiveMQ 5.8版本開始支援LevelDB,到現在5.9提供了标準的Zookeeper+LevelDB叢集化方案。
④ ActiveMQ消息持久化機制有:
AMQ | 基于日志檔案 |
KahaDB | 基于日志檔案,從ActiveMQ5.4開始預設使用 |
JDBC | 基于第三方資料庫 |
Replicated LevelDB Store | 從5.9開始提供了LevelDB和Zookeeper的資料複制方法,用于Master-Slave方式的首選資料複制方案。 |
十、ActiveMQ多節點叢集
面試題?
引入消息隊列之後該如何保證其高可用性
是什麼?
基于 Zookeeper和 LevelDB搭建 Activemq叢集。叢集僅提供主備方式的高可用叢集功能,避免單點故障。
zookeeper + replicated-leveldb-store的主從叢集
(1) 三種叢集方式對比:http://activemq.apache.org/masterslave.html
基于sharedFileSystem共享檔案系統(KahaDB)
基于JDBC
基于可複制的LevelDB
(2)
zookeeper + replicated-leveldb-store的主從叢集
(重點)
下面重點介紹
zookeeper + replicated-leveldb-store的主從叢集
4.1 是什麼
http://activemq.apache.org/replicated-leveldb-store.html
4.2 官網叢集管理圖
它使用Apache ZooKeeper來協調叢集中哪個節點成為主要節點。當選的主代理節點啟動并接受用戶端連接配接。其他節點進入從屬模式,連接配接主節點,并同步他們的持久狀态/w它。從屬節點不接受用戶端連接配接。所有的持久化操作都會複制到連接配接的從機上。如果主節點死亡,有最新更新的從節點會被提升為主節點。然後,失敗的節點可以重新上線,它将進入從機模式。
所有需要同步到磁盤上的消息傳遞操作都會等待更新複制到法定數量的節點上才會完成。是以,如果你配置的存儲是 replicas=“3”,那麼法定人數就是(3/2+1)=2。主站會将更新存儲在本地,并等待另外1個從站存儲更新後再報告成功。另一種思路是,store會對法定人數的複制節點進行同步複制,對任何其他節點進行異步複制複制。
當選出一個新的主要時,你還需要至少有一個法定人數的節點線上,才能夠找到一個有最新更新的節點。擁有最新更新的節點将成為新的主站。是以,建議你至少使用3個複制節點來運作,這樣你就可以在不遭受服務中斷的情況下幹掉一個節點。
說明:
4.3 部署規劃和步驟
(1)環境和版本
CentOS release 6.8(Final)、JDK1.8、zookeeper-3.4.9、apache-activemq-5.15.9
(2)關閉防火牆并保證win可以ping通ActiveMQ伺服器
(3)要求具備ZK叢集并可以成功啟動
(4)叢集部署規劃清單
(5)建立3台叢集目錄
mkdir /data/mq_cluster/
cd /data/mq_cluster/
cp -r /data/activemq/apache-activemq-5.16.0 mq_node01
cp -r mq_node01 mq_node02
cp -r mq_node01 mq_node03
(6)修改管理控制台端口
mq_node01全部預設不動
mq_node02修改:
mq_node03修改:同上。
(7)hostname名字映射
題外話:在windows下在C:\Windows\System32\drivers\etc下的hosts檔案中配置ip和域名的映射。
linux下:
(8)ActiveMQ叢集配置
01/02/03節點路徑
3個節點的BrokerName要求全部一緻 3個節點的持久化配置,在activemq.xml中修改
<persistenceAdapter>
<replicatedLevelDB
directory="activemq-data"
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="zoo1.example.org:2181,zoo2.example.org:2181,zoo3.example.org:2181"
zkPassword="password"
zkPath="/activemq/leveldb-stores"
hostname="broker1.example.org"
/>
</persistenceAdapter>
(9)修改各節點的消息端口
mq_node01全部預設不動 61616
mq_node02修改:61617
mq_node03修改:61618
(10)按順序啟動3個ActiveMQ節點,到這步前提是zk叢集已經成功啟動運作
zk啟動批處理:
mq啟動批處理:(11)zk叢集的節點狀态說明
3台zk叢集連接配接任意一台
檢視master
Replicated LevelDB叢集故障遷移和驗證
(1) 叢集可用性測試
(2)代碼修改(由單機到叢集) (3) 幹掉一台ActiveMQ節點,它會自動切換到另外一個活着的。
十一、進階特性
1. 異步投遞
(1) 是什麼:自我了解:此處的異步是指生産者和broker之間發送消息的異步。不是指生産者和消費者之間異步。
官網介紹:http://activemq.apache.org/async-sends
總結:
① 異步發送可以讓生産者發的更快。
② 如果異步投遞不需要保證消息是否發送成功,發送者的效率會有所提高。如果異步投遞還需要保證消息是否成功發送,并采用了回調的方式,發送者的效率提高不多,這種就有些雞肋。
(2) 代碼實作
官網上3中代碼實作:
//代碼示範。
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Jms_TX_Producer {
// 方式1。3種方式任選一種
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626?jms.useAsyncSend=true";
private static final String ACTIVEMQ_QUEUE_NAME = "Async";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 方式2
activeMQConnectionFactory.setUseAsyncSend(true);
Connection connection = activeMQConnectionFactory.createConnection();
// 方式3
((ActiveMQConnection)connection).setUseAsyncSend(true);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
producer.send(textMessage);
}
System.out.println("消息發送完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
session.close();
connection.close();
}
}
}
(3) 異步發送如何确認發送成功
//下面示範異步發送的回調
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;
import javax.jms.*;
import java.util.UUID;
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "Async";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
activeMQConnectionFactory.setUseAsyncSend(true);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)session.createProducer(queue);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
textMessage.setJMSMessageID(UUID.randomUUID().toString()+"orderAtguigu");
final String msgId = textMessage.getJMSMessageID();
activeMQMessageProducer.send(textMessage, new AsyncCallback() {
public void onSuccess() {
System.out.println("成功發送消息Id:"+msgId);
}
public void onException(JMSException e) {
System.out.println("失敗發送消息Id:"+msgId);
}
});
}
System.out.println("消息發送完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
activeMQMessageProducer.close();
session.close();
connection.close();
}
}
}
控制台觀察發送消息的資訊:
2. 延遲投遞和定時投遞
(1) 介紹
官網文檔:http://activemq.apache.org/delay-and-schedule-message-delivery.html
- 修改配置檔案并重新開機
<!-- 在activemq.xml添加如下灰色背景代碼:-->
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >
<destinationPolicy>
<!-- 之後重新開機activemq -->
- 代碼實作
//java代碼裡面封裝的輔助消息類型:ScheduleMessage
//生産者代碼。
package com.activemq.demo;
import org.apache.activemq.*;
import javax.jms.*;
import java.util.UUID;
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "Schedule01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(queue);
long delay = 10*1000;
long period = 5*1000;
int repeat = 3 ;
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
// 延遲的時間
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
// 重複投遞的時間間隔
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
// 重複投遞的次數
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
// 此處的意思:該條消息,等待10秒,之後每5秒發送一次,重複發送3次。
messageProducer.send(textMessage);
}
System.out.println("消息發送完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
messageProducer.close();
session.close();
connection.close();
}
}
}
//消費者代碼
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "Schedule01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("***消費者接收到的消息: " + textMessage.getText());
textMessage.acknowledge();
} catch (Exception e) {
System.out.println("出現異常,消費失敗,放棄消費");
}
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
3. 消息消費的重試機制
(1) 是什麼
官網文檔:http://activemq.apache.org/redelivery-policy
是什麼: 消費者收到消息,之後出現異常了,沒有告訴broker确認收到該消息,broker會嘗試再将該消息發送給消費者。嘗試n次,如果消費者還是沒有确認收到該消息,那麼該消息将被放到死信隊列中,之後broker不會再将該消息發送給消費者。
(2) 具體哪些情況會引發消息重發
① Client用了transactions且在session中調用了rollback
② Client用了transactions且在調用commit之前關閉或者沒有commit
③ Client在CLIENT_ACKNOWLEDGE的傳遞模式下,session中調用了recover
(3) 請說說消息重發時間間隔和重發次數
間隔:1
次數:6
每秒發6次
(4) 有毒消息Poison ACK
一個消息被redelivedred超過預設的最大重發次數(預設6次)時,消費的回個MQ發一個“poison ack”表示這個消息有毒,告訴broker不要再發了。這個時候broker會把這個消息放到DLQ(死信隊列)。
(5) 屬性說明
(6) 代碼驗證
//生産者。發送3條資料。代碼省略.....
/*消費者。開啟事務,卻沒有commit。重新開機消費者,前6次都能收到消息,到第7次,不會再收到消息。代碼:*/
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "dead01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("***消費者接收到的消息: " + textMessage.getText());
//session.commit();
}catch (Exception e){
e.printStackTrace();
}
}
}
});
//關閉資源
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
activemq管理背景。多了一個名為ActiveMQ.DLQ隊列,裡面多了3條消息。
(7) 代碼修改預設參數
//消費者。修改重試次數為3(系統預設6次)。更多的設定請參考官網文檔。
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import javax.jms.*;
import java.io.IOException;
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "tcp://***:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "dead01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 修改預設參數,設定消息消費重試3次
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("***消費者接收到的消息: " + textMessage.getText());
//session.commit();
}catch (Exception e){
e.printStackTrace();
}
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
(8) 整合spring
4 死信隊列
(1) 是什麼
官網文檔: http://activemq.apache.org/redelivery-policy
死信隊列:異常消息規避處理的集合,主要處理失敗的消息。
(2) 死信隊列的配置(一般采用預設)
sharedDeadLetterStrategy
不管是queue還是topic,失敗的消息都放到這個隊列中。下面修改activemq.xml的配置,可以達到修改隊列的名字。
individualDeadLetterStrategy
可以為queue和topic單獨指定兩個死信隊列。還可以為某個話題,單獨指定一個死信隊列。
自動删除過期消息
過期消息是值生産者指定的過期時間,超過這個時間的消息。
- 存放非持久消息到死信隊列中
5 消息不被重複消費,幂等性
如何保證消息不被重複消費呢?幕等性問題你談談 幂等性如何解決,根據messageid去查這個消息是否被消費了。
十二、擴充
activemq的API文檔
http://activemq.apache.org/maven/apidocs/index.html