1.ActiveMQ介紹:
Apache ActiveMQ是Apache軟體基金會所研發的開放源代碼消息中間件
2.消息中間件應用場景
異步處理 應用解耦 流量削鋒
(1)異步處理
場景說明:使用者注冊,需要執行三個業務邏輯,分别為寫入使用者表,發注冊郵件以及注冊短信。
串行方式
将注冊資訊寫入資料庫成功後,發送注冊郵件,再發送注冊短信。以上三個任務全部完成後,傳回給客 戶端。
并行方式
将注冊資訊寫入資料庫成功後,發送注冊郵件的同時,發送注冊短信。以上三個任務完成後,傳回給客 戶端。與串行的差别是,并行的方式可以提高處理的時間
異步處理
引入消息中間件,将部分的業務邏輯,進行異步處理。改造後的架構如下:
按照以上約定,使用者的響應時間相當于是注冊資訊寫入資料庫的時間,也就是50毫秒。注冊郵件,發送 短信寫入消息隊列後,直接傳回,是以寫入消息隊列的速度很快,基本可以忽略,是以使用者的響應時間 可能是50毫秒。是以架構改變後,系統的吞吐量提高啦,比串行提高了3倍,比并行提高了兩倍。
(2)應用解耦
場景說明:使用者下單後,訂單系統需要通知庫存系統。 傳統的做法是,訂單系統調用庫存系統的接口。如下圖:
傳統模式的缺點:假如庫存系統無法通路,則訂單減庫存将失敗,進而導緻訂單失敗,訂單系統與庫存 系統耦合。如何解決以上問題呢?引入應用消息隊列後的方案,如下圖
訂單系統:使用者下單後,訂單系統完成持久化處理,将消息寫入消息隊列,傳回使用者訂單下單成功 庫存 系統:訂閱下單的消息,采用拉/推的方式,擷取下單資訊,庫存系統根據下單資訊,進行庫存操作 假 如:在下單時庫存系統不能正常使用。也不影響正常下單,因為下單後,訂單系統寫入消息隊列就不再 關心其他的後續操作了。實作訂單系統與庫存系統的應用解耦。
(3)流量消峰
流量削鋒也是消息隊列中的常用場景,一般在秒殺或團搶活動中使用廣泛。應用場景:秒殺活動,一般 會因為流量過大,導緻流量暴增,應用挂掉。為解決這個問題,一般需要在應用前端加入消息隊列。
a、可以控制活動的人數 b、可以緩解短時間内高流量壓垮應用
使用者的請求,伺服器接收後,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接抛棄使用者請 求或跳轉到錯誤頁面。秒殺業務根據消息隊列中的請求資訊,再做後續處理
目前常用的消息隊列
特性MQ | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
生産者消費者模式 | 支援 | |||
釋出訂閱模式 | ||||
請求回應模式 | 不支援 | |||
Api完備性 | 高 | |||
多語言支援 | java | |||
單機吞吐量 | 萬級 | 十萬級 | ||
消息延遲 | 無 | 微秒級 | 毫秒級 | |
可用性 | 高(主從) | 非常高(分布式) | ||
消息丢失 | 低 | 理論上不會丢失 | ||
文檔的完備性 | ||||
提供快速入門 | 有 | |||
社群活躍度 | ||||
商業支援 | 商業雲 |
3.什麼是JMS?
介紹
JMS(Java Messaging Service)是Java平台上有關面向消息中間件的技術規範,它便于消息系統中的Java應用程式進行消息交換,并且通過提供标準的産生、發送、接收消息的接口簡化企業應用的開發。 JMS本身隻定義了一系列的接口規範,是一種與廠商無關的 API,用來通路消息收發系統。
JMS消息模型
消息中間件一般有兩種傳遞模式:點對點模式(P2P)和釋出-訂閱模式(Pub/Sub)。
(1) P2P (Point to Point) 點對點模型(Queue隊列模型)
(2) Publish/Subscribe(Pub/Sub) 釋出/訂閱模型(Topic主題模型)
常用術語
- Provider/MessageProvider:生産者
- Consumer/MessageConsumer:消費者
- PTP:Point To Point,點對點通信消息模型
- Pub/Sub:Publish/Subscribe,釋出訂閱消息模型
- Queue:隊列,目标類型之一,和PTP結合
- Topic:主題,目标類型之一,和Pub/Sub結合
- ConnectionFactory:連接配接工廠,JMS用它建立連接配接
- Connnection:JMS Client到JMS Provider的連接配接
- Destination:消息目的地,由Session建立
- Session:會話,由Connection建立,實質上就是發送、接受消息的一個線程,是以生産者、消費者都是Session建立的
點對點模型
點對點模型(Pointer-to-Pointer):即生産者和消費者之間的消息往來
每個消息都被發送到特定的消息隊列,接收者從隊列中擷取消息。隊列保留着消息,直到他們被消費或逾時。
點對點模型的特點:
每個消息隻有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中);
發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之後,不管接收者有沒有正在運作,它不會影響到消息被發送到隊列;
接收者在成功接收消息之後需向隊列應答成功。
釋出/訂閱模型
包含三個角色:主題(Topic),釋出者(Publisher),訂閱者(Subscriber),多個釋出者将消息發
送到topic,系統将這些消息投遞到訂閱此topic的訂閱者
釋出者發送到topic的消息,隻有訂閱了topic的訂閱者才會收到消息。topic實作了釋出和訂閱,當你發
布一個消息,所有訂閱這個topic的服務都能得到這個消息,是以從1到N個訂閱者都能得到這個消息的拷貝。
釋出/訂閱模型的特點:
每個消息可以有多個消費者;
釋出者和訂閱者之間有時間上的依賴性(先訂閱主題,再來發送消息)。
訂閱者必須保持運作的狀态,才能接受釋出者釋出的消息;
4.JMS的消息格式
JMS消息組成
- 消息頭:
每個消息頭字段都有相應的getter和setter方法。
- 消息屬性:
如果需要除消息頭字段以外的值,那麼可以使用消息屬性。
message.setStringProperty("Property",Property); //自定義屬性
- 消息體:
JMS定義的消息類型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage(對象實作序列化接口)。
ObjectMessage:
發送
//發送ObjectMessage消息
@Test
public void test2(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
User user = new User();
user.setName("小蒼");
user.setAge(18);
ObjectMessage objectMessage = session.createObjectMessage(user);
return objectMessage;
}
});
}
接受
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof ObjectMessage){
ObjectMessage objectMessage = (ObjectMessage)message;
try {
User user = (User)objectMessage.getObject();
System.out.println(user.getUsername());
System.out.println(user.getPassword());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
ActiveMQ5.12後 ,為了安全考慮,ActiveMQ預設不接受自定義的序列化對象,需要将自定義的 加入到受信任的清單。
spring:
activemq:
broker-url: tcp://192.168.66.133:61616
user: admin
password: admin
packages:
trust-all: true # 添加所有包到信任清單
兩種模式比較
1 | Topic | Queue |
---|---|---|
Publish Subscribe messaging 釋出 訂閱消息 | Point-to-Point 點對點 | |
有無狀态 | topic 資料預設不落地,是無狀态的。 | Queue 資料預設會在 mq 服 務器上以檔案形式儲存,比如 Active MQ 一 般 保 存 在 $AMQ_HOME\data\kahadb 下 面。也可以配置成 DB 存儲。 |
完整性保障 | 并不保證 publisher 釋出的每條數 據,Subscriber 都能接受到。 | Queue 保證每條資料都能 被 receiver 接收。消息不逾時。 |
消息是否會丢失 | 一般來說 publisher 釋出消息到某 一個 topic 時,隻有正在監聽該 topic 位址的 sub 能夠接收到消息;如果沒 有 sub 在監聽,該 topic 就丢失了。 | Sender 發 送 消 息 到 目 标 Queue, receiver 可以異步接收這 個 Queue 上的消息。Queue 上的 消息如果暫時沒有 receiver 來 取,也不會丢失。前提是消息不 逾時。 |
消息釋出接 收政策 | 一對多的消息釋出接收政策,監 聽同一個topic位址的多個sub都能收 到 publisher 發送的消息。Sub 接收完 通知 mq 伺服器 | 一對一的消息釋出接收策 略,一個 sender 發送的消息,隻 能有一個 receiver 接收。 receiver 接收完後,通知 mq 伺服器已接 收,mq 伺服器對 queue 裡的消 息采取删除或其他操作。 |
5.ActiveMQ的安裝
Linux安裝流程
第一步:安裝 jdk(略)
第二步:把 activemq的壓縮包(apache-activemq-5.14.5-bin.tar.gz)上傳到 linux 系統
第三步:解壓縮壓縮包
tar -zxvf apache-activemq-5.14.5-bin.tar.gz
第四步:進入apache-activemq-5.14.5的bin目錄
cd apache-activemq-5.14.5/bin
第五步:啟動 activemq
./activemq start (執行2次:第一次:生成配置資訊;第二次:啟動)
第六步:停止activemq:
./activemq stop
頁面控制台: http://ip:8161 (監控)
請求位址: tcp://ip:61616 (java代碼通路消息中間件)
賬号:admin
密碼:admin
登入後清單各列資訊含義如下:
Number Of Pending Messages :等待消費的消息 這個是目前未出隊列的數量。
Number Of Consumers :消費者 這個是消費者端的消費者數量
Messages Enqueued :進入隊列的消息 進入隊列的總數量,包括出隊列的。
Messages Dequeued :出了隊列的消息 可以了解為是消費這消費掉的數量。
windows安裝
下載下傳後直接解壓縮直接就能用(免安裝)。
bin/是服務啟動相關的指令檔案所在目錄
data/是預設持久化檔案所在目錄
docs/裡面放的是使用者手冊
conf/是配置檔案所在目錄,任何配置檔案修改後,必須重新開機ActiveMQ,才能生效.
有幾個配置檔案需要提下,後面會用到:
1.activemq.xml
就是spring配置檔案。配置的是ActiveMQ應用使用的預設對象元件.
transportConnectors标簽 - 配置連結端口資訊的. 其中的端口号61616是ActiveMQ對外釋出的tcp協定通路端口. 就是java代碼通路ActiveMQ時使用的端口.
配置安全認證和持久化都是在這個檔案裡面。
2.jetty.xml
spring配置檔案, ActiveMQ使用的是jetty提供HTTP服務,是以需要該檔案用于配置jetty伺服器的預設對象元件.
3.users.properties
内容資訊: 使用者名=密碼
是用于配置用戶端通過協定通路ActiveMQ時,使用的使用者名和密碼.
4.groups.properties
内容資訊: 使用者組=使用者1,使用者2(多個使用者中間用逗号隔開)
輕按兩下bin\win64目錄下的activemq批處理檔案,即可啟動
通路http://localhost:8161/admin/,輸入預設的使用者名和密碼admin/admin,即可看到管理台頁面
6.原生JMS API操作ActiveMQ
PTP模式
引入坐标
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>
</dependencies>
生産者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 示範點對點模式 -- 消息生産者
*/
public class PTP_Producer {
public static void main(String[] args) throws JMSException {
//1.建立連接配接工廠
ConnectionFactory factory
= new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
//2.建立連接配接
Connection connection = factory.createConnection();
//3.打開連接配接
connection.start();
//4.建立session
/**
* 參數一:是否開啟事務操作
* 參數二:消息确認機制
*/
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5.建立目标位址(Queue:點對點消息,Topic:釋出訂閱消息)
Queue queue = session.createQueue("queue01");
//6.建立消息生産者
MessageProducer producer = session.createProducer(queue);
//7.建立消息
//createTextMessage: 文本類型
TextMessage textMessage = session.createTextMessage("test message");
//8.發送消息
producer.send(textMessage);
System.out.println("消息發送完成");
//9.釋放資源
session.close();
connection.close();
}
}
消費者
方案一(1)
package com.itheima.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 示範點對點模式- 消息消費者(第一種方案)
*/
public class PTP_Consumer1 {
public static void main(String[] args) throws JMSException {
//1.建立連接配接工廠
ConnectionFactory factory
= new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
//2.建立連接配接
Connection connection = factory.createConnection();
//3.打開連接配接
connection.start();
//4.建立session
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5.指定目标位址
Queue queue = session.createQueue("queue01");
//6.建立消息的消費者
MessageConsumer consumer = session.createConsumer(queue);
//7.接收消息
while(true){
Message message = consumer.receive();
//如果已經沒有消息了,結束啦
if(message==null){
break;
}
//如果還有消息,判斷什麼類型的消息
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
System.out.println("接收的消息:"+textMessage.getText());
}
}
}
}
方案二(2)
package com.itheima.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 示範點對點模式- 消息消費者(第二種方案) -- 更加推薦
*/
public class PTP_Consumer2 {
public static void main(String[] args) throws JMSException {
//1.建立連接配接工廠
ConnectionFactory factory
= new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
//2.建立連接配接
Connection connection = factory.createConnection();
//3.打開連接配接
connection.start();
//4.建立session
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5.指定目标位址
Queue queue = session.createQueue("queue01");
//6.建立消息的消費者
MessageConsumer consumer = session.createConsumer(queue);
//7.設定消息監聽器來接收消息
consumer.setMessageListener(new MessageListener() {
//處理消息
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收的(2):"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//注意:在監聽器的模式下千萬不要關閉連接配接,一旦關閉,消息無法接收
}
}
Pub/Sub模式
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 主題消息,消息的發送方
*/
public class TopicProducer {
public static void main(String[] args) throws Exception {
//1.建立連接配接工廠
ConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
//2.建立連接配接
Connection connection = factory.createConnection();
//3.打開連接配接
connection.start();
//4.建立session
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5.建立目标位址(Queue:點對點消息,Topic:釋出訂閱消息)
Topic topic = session.createTopic("sms");
//6.建立消息生産者
MessageProducer producer = session.createProducer(topic);
//7.建立消息
TextMessage message = session.createTextMessage("發短信...");
//8.發送消息
producer.send(message);
System.out.println("發送消息:發短信...");
session.close();;
connection.close();
}
}
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 主題消息,消息的消費方
*/
public class TopicConsumer {
public static void main(String[] args) throws Exception {
//1.建立連接配接工廠
ConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
//2.建立連接配接
Connection connection = factory.createConnection();
//3.打開連接配接
connection.start();
//4.建立session
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5.建立目标位址(Queue:點對點消息,Topic:釋出訂閱消息)
Topic topic = session.createTopic("sms");
//6.建立消息的消費者
MessageConsumer consumer = session.createConsumer(topic);
//7.配置消息監聽器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消費消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
7.SpringBoot整合ActiveMQ
1.依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2.配置:
server:
port: 9001 #端口
spring:
application:
name: activemq-producer # 服務名稱
# springboot與activemq整合配置
activemq:
broker-url: tcp://192.168.66.133:61616 # 連接配接位址
user: admin # activemq使用者名
password: admin # activemq密碼
# 指定發送模式 (點對點 false , 釋出訂閱 true)
jms:
pub-sub-domain: false
3.啟動類
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 生産者啟動類
*/
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class,args);
}
}
4.編寫生産者
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* 示範SpringBoot與ActiveMQ整合- 消息生産者
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class SpringBootProducer {
//JmsMessagingTemplate: 用于工具類發送消息
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Test
public void ptpSender(){
/**
* 參數一:隊列的名稱或主題名稱
* 參數二:消息内容
*/
jmsMessagingTemplate.convertAndSend("springboot_queue","spring boot
message");
}
}
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2.配置
server:
port: 9002 #端口
spring:
application:
name: activemq-consumer # 服務名稱
# springboot與activemq整合配置
activemq:
broker-url: tcp://192.168.66.133:61616 # 連接配接位址
user: admin # activemq使用者名
password: admin # activemq密碼
# 指定發送模式 (點對點 false , 釋出訂閱 true)
jms:
pub-sub-domain: false
activemq:
name: springboot_queue
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 消息消費者啟動類
*/
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class,args);
}
}
4.監聽消息類
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
/**
* 用于監聽消息類(既可以用于隊列的監聽,也可以用于主題監聽)
*/
@Component // 放入IOC容器
public class MsgListener {
/**
* 用于接收消息的方法
* destination: 隊列的名稱或主題的名稱
*/
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
8.練習SpringBoot 內建 ActiveMQ
1、建立一個springboot項目,添加依賴
<!--ActiveMq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>1.5.0.RELEASE</version>
</dependency>
<!--消息隊列連接配接池-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.0</version>
</dependency>
2、application.yml檔案的配置
server:
port: 8080
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
close-timeout: 15s # 在考慮結束之前等待的時間
in-memory: true # 預設代理URL是否應該在記憶體中。如果指定了顯式代理,則忽略此值。
non-blocking-redelivery: false # 是否在復原復原消息之前停止消息傳遞。這意味着當啟用此指令時,消息順序不會被保留。
send-timeout: 0 # 等待消息發送響應的時間。設定為0等待永遠。
queue-name: active.queue
topic-name: active.topic.name.model
# packages:
# trust-all: true #不配置此項,會報錯
pool:
enabled: true
max-connections: 10 #連接配接池最大連接配接數
idle-timeout: 30000 #空閑的連接配接過期時間,預設為30秒
# jms:
# pub-sub-domain: true #預設情況下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
# 是否信任所有包
#spring.activemq.packages.trust-all=
# 要信任的特定包的逗号分隔清單(當不信任所有包時)
#spring.activemq.packages.trusted=
# 當連接配接請求和池滿時是否阻塞。設定false會抛“JMSException異常”。
#spring.activemq.pool.block-if-full=true
# 如果池仍然滿,則在抛出異常前阻塞時間。
#spring.activemq.pool.block-if-full-timeout=-1ms
# 是否在啟動時建立連接配接。可以在啟動時用于加熱池。
#spring.activemq.pool.create-connection-on-startup=true
# 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
#spring.activemq.pool.enabled=false
# 連接配接過期逾時。
#spring.activemq.pool.expiry-timeout=0ms
# 連接配接空閑逾時
#spring.activemq.pool.idle-timeout=30s
# 連接配接池最大連接配接數
#spring.activemq.pool.max-connections=1
# 每個連接配接的有效會話的最大數目。
#spring.activemq.pool.maximum-active-session-per-connection=500
# 當有"JMSException"時嘗試重新連接配接
#spring.activemq.pool.reconnect-on-exception=true
# 在空閑連接配接清除線程之間運作的時間。當為負數時,沒有空閑連接配接驅逐線程運作。
#spring.activemq.pool.time-between-expiration-check=-1ms
# 是否隻使用一個MessageProducer
#spring.activemq.pool.use-anonymous-producers=true
3、啟動類增加 @EnableJms 注解
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;
@SpringBootApplication
@EnableJms //啟動消息隊列
public class SpringbootActivemqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootActivemqApplication.class, args);
}
}
4、初始化和配置 ActiveMQ 的連接配接
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.core.JmsMessagingTemplate;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;
@Configuration
publicclass BeanConfig
{
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String username;
@Value("${spring.activemq.topic-name}")
private String password;
@Value("${spring.activemq.queue-name}")
private String queueName;
@Value("${spring.activemq.topic-name}")
private String topicName;
@Bean(name = "queue")
public Queue queue() {
return new ActiveMQQueue(queueName);
}
@Bean(name = "topic")
public Topic topic() {
return new ActiveMQTopic(topicName);
}
@Bean
public ConnectionFactory connectionFactory(){
return new ActiveMQConnectionFactory(username, password, brokerUrl); }
@Bean
public JmsMessagingTemplate jmsMessageTemplate(){
return new JmsMessagingTemplate(connectionFactory());
}
// 在Queue模式中,對消息的監聽需要對containerFactory進行配置
@Bean("queueListener")
public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(false);
return factory;
}
//在Topic模式中,對消息的監聽需要對containerFactory進行配置
@Bean("topicListener")
public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true);
return factory;
}
}
5、生産者(queue 和 topic)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;
@RestController
public class ProducerController
{
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
@PostMapping("/queue/test")
public String sendQueue(@RequestBody String str) {
this.sendMessage(this.queue, str);
return "success";
}
@PostMapping("/topic/test")
public String sendTopic(@RequestBody String str) {
this.sendMessage(this.topic, str);
return "success";
}
// 發送消息,destination是發送到的隊列,message是待發送的消息
private void sendMessage(Destination destination, final String message){
jmsMessagingTemplate.convertAndSend(destination, message);
}
}
6、Queue模式的消費者
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class QueueConsumerListener
{
//queue模式的消費者
@JmsListener(destination="${spring.activemq.queue-name}", containerFactory="queueListener")
public void readActiveQueue(String message) {
System.out.println("queue接受到:" + message);
}
}
7、topic模式的消費者
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class TopicConsumerListener
{
//topic模式的消費者
@JmsListener(destination="${spring.activemq.topic-name}", containerFactory="topicListener")
public void readActiveQueue(String message) {
System.out.println("topic接受到:" + message);
}
}
8、測試(使用Postman發消息)
(1) POST: http://localhost:8080/queue/test 消息體:{"aaa" : "queue"}
控制台列印:queue接受到:{"aaa" : "queue"}
(2) POST: http://localhost:8080/topic/test 消息體:{"aaa" : "topic"}
控制台列印:topic接受到:{"aaa" : "topic"}
topic模式有普通訂閱和持久化訂閱
普通訂閱:在消費者啟動之前發送過來的消息,消費者啟動之後不會去消費;
持久化訂閱: 在消費者啟動之前發送過來的消息,消費者啟動之後會去消費;
9.持久化
分類:
1.Memory 消息存儲-基于記憶體的消息存儲。
2.基于日志消息存儲方式,KahaDB是ActiveMQ的預設日志存儲方式,它提供了容量的提升和恢複 能力。
3.基于JDBC的消息存儲方式-資料存儲于資料庫(例如:MySQL)中。
修改jms配置檔案
delivery-mode: non_persistent # 非持久化(記憶體)
delivery-mode: persistent # 持久化(日志KahaDB)
JDBC消息存儲
1)application.yml
server:
port: 9001
spring:
activemq:
broker-url: tcp://192.168.66.133:61616
user: admin
password: admin
jms:
pub-sub-domain: false # false:點對點隊列模式, true:釋出/訂閱模式
template:
delivery-mode: persistent # 持久化
activemq:
name: springboot-queue01
2)修改activemq.xml
<!--配置資料庫連接配接池-->
<bean name="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://192.168.66.133:3306/db_activemq" />
<property name="username" value="root" />
<property name="password" value="123456"/>
</bean>
<!--JDBC Jdbc用于master/slave模式的資料庫分享 -->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
3)拷貝mysql及durid資料源的jar包到activemq的lib目錄下
4)重新開機activemq
10.消息事務
一個事務性發送,其中一組消息要麼能夠全部保證到達伺服器,要麼都不到達伺服器。 生産者、消費者與消息伺服器直接都支援事務性;
一、生産者事務
/**
* 事務性發送--方案一
*/
@Test
public void sendMessageTx(){
//擷取連接配接工廠
ConnectionFactory connectionFactory=
jmsMessagingTemplate.getConnectionFactory();
Session session=null;
try{
//建立連接配接
Connection connection=connectionFactory.createConnection();
/**
* 參數一:是否開啟消息事務
*/a
session=connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//建立生産者
MessageProducer producer=
session.createProducer(session.createQueue(name));
for(int i=1;i<=10;i++){
//模拟異常
if(i==4){
int a=10/0;
}
TextMessage textMessage=session.createTextMessage("消息--"+
i);
producer.send(textMessage);
}
//注意:一旦開啟事務發送,那麼就必須使用commit方法進行事務送出,否則消息無法到達
MQ伺服器
session.commit();
}catch(JMSException e){
e.printStackTrace();
//消息事務復原
try{
session.rollback();
}catch(JMSException e1){
e1.printStackTrace();
}
}
}
方式二:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/**
*
*/
@Configuration
public class ActiveMqConfig {
@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory
connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
}
生産者業務類:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 消息發送的業務類
*/
@Service
public class MessageService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Value("${activemq.name}")
private String name;
@Transactional // 對消息發送加入事務管理(同時也對JDBC資料庫的事務生效)
public void sendMessage(){
for(int i=1;i<=10;i++) {
//模拟異常
if(i==4){
int a = 10/0;
}
jmsMessagingTemplate.convertAndSend(name, "消息---"+i);
}
}
}
測試發送方法:
@Autowired
private MessageService messageService;
/**
* 事務性發送--方案二: Spring的JmsTransactionManager功能
*/
@Test
public void sendMessageTx2(){
messageService.sendMessage();
}
二、消費者事務
/**
* 消息消費者
*/
@Component
public class Consumer {
/**
* 接收消息的方法
*/
@JmsListener(destination="${activemq.name}",containerFactory =
"jmsQueryListenerFactory")
public void receiveMessage(TextMessage textMessage,Session session) throws
JMSException {
try {
System.out.println("消息内容:" + textMessage.getText() + ",是否重發:"
+ textMessage.getJMSRedelivered());
int i = 100/0; //模拟異常
session.commit();//送出事務
} catch (JMSException e) {
try {
session.rollback();//復原事務
} catch (JMSException e1) {
}
e.printStackTrace();
}
}
}
11.消息确認機制
MS消息隻有在被确認之後,才認為已經被成功地消費了。消息的成功消費通常包含三個階段:客戶接 收消息、客戶處理消息和消息被确認。在事務性會話中,當一個事務被送出的時候,确認自動發生。在 非事務性會話中,消息何時被确認取決于建立會話時的應答模式(acknowledgement mode)。該參 數有以下三個可選值:
Session.AUTO_ACKNOWLEDGE:當客戶成功的從receive方法傳回的時候,或者從 MessageListener.onMessage方法成功傳回的時候,會話 自動确認客戶收到的消息
Session.CLIENT_ACKNOWLEDGE:客戶通過消息的acknowledge方法确認消息。需要注意的 是,在這種模式中,确認是在會話層上進行:确認一個被消 費的消息将自動确認所有已被會話消費的消息。例如,如果 一個消息消費者消費了10個消息,然後确認第5個消息,那 麼所有10個消息都被确認
Session.DUPS_ACKNOWLEDGE:該選擇隻是會話遲鈍确認消息的送出。如果JMS provider失 敗,那麼可能會導緻一些重複的消息。如果是重複的消息, 那麼JMS provider必須把消息頭的JMSRedelivered字段設定 為true