天天看點

ActiveMQ

1.ActiveMQ介紹:

Apache ActiveMQ是Apache軟體基金會所研發的開放源代碼消息中間件

2.消息中間件應用場景

異步處理 應用解耦 流量削鋒

(1)異步處理

場景說明:使用者注冊,需要執行三個業務邏輯,分别為寫入使用者表,發注冊郵件以及注冊短信。

串行方式

将注冊資訊寫入資料庫成功後,發送注冊郵件,再發送注冊短信。以上三個任務全部完成後,傳回給客 戶端。

并行方式

将注冊資訊寫入資料庫成功後,發送注冊郵件的同時,發送注冊短信。以上三個任務完成後,傳回給客 戶端。與串行的差别是,并行的方式可以提高處理的時間

異步處理

引入消息中間件,将部分的業務邏輯,進行異步處理。改造後的架構如下:

按照以上約定,使用者的響應時間相當于是注冊資訊寫入資料庫的時間,也就是50毫秒。注冊郵件,發送 短信寫入消息隊列後,直接傳回,是以寫入消息隊列的速度很快,基本可以忽略,是以使用者的響應時間 可能是50毫秒。是以架構改變後,系統的吞吐量提高啦,比串行提高了3倍,比并行提高了兩倍。

(2)應用解耦

場景說明:使用者下單後,訂單系統需要通知庫存系統。 傳統的做法是,訂單系統調用庫存系統的接口。如下圖:

傳統模式的缺點:假如庫存系統無法通路,則訂單減庫存将失敗,進而導緻訂單失敗,訂單系統與庫存 系統耦合。如何解決以上問題呢?引入應用消息隊列後的方案,如下圖

訂單系統:使用者下單後,訂單系統完成持久化處理,将消息寫入消息隊列,傳回使用者訂單下單成功 庫存 系統:訂閱下單的消息,采用拉/推的方式,擷取下單資訊,庫存系統根據下單資訊,進行庫存操作 假 如:在下單時庫存系統不能正常使用。也不影響正常下單,因為下單後,訂單系統寫入消息隊列就不再 關心其他的後續操作了。實作訂單系統與庫存系統的應用解耦。

(3)流量消峰

流量削鋒也是消息隊列中的常用場景,一般在秒殺或團搶活動中使用廣泛。應用場景:秒殺活動,一般 會因為流量過大,導緻流量暴增,應用挂掉。為解決這個問題,一般需要在應用前端加入消息隊列。

a、可以控制活動的人數 b、可以緩解短時間内高流量壓垮應用

使用者的請求,伺服器接收後,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接抛棄使用者請 求或跳轉到錯誤頁面。秒殺業務根據消息隊列中的請求資訊,再做後續處理

目前常用的消息隊列

特性MQ ActiveMQ RabbitMQ RocketMQ Kafka
生産者消費者模式 支援
釋出訂閱模式
請求回應模式 不支援
Api完備性
多語言支援 java
單機吞吐量 萬級 十萬級
消息延遲 微秒級 毫秒級
可用性 高(主從) 非常高(分布式)
消息丢失 理論上不會丢失
文檔的完備性
提供快速入門
社群活躍度
商業支援 商業雲

3.什麼是JMS?

介紹

JMS(Java Messaging Service)是Java平台上有關面向消息中間件的技術規範,它便于消息系統中的Java應用程式進行消息交換,并且通過提供标準的産生、發送、接收消息的接口簡化企業應用的開發。 JMS本身隻定義了一系列的接口規範,是一種與廠商無關的 API,用來通路消息收發系統。

JMS消息模型

消息中間件一般有兩種傳遞模式:點對點模式(P2P)和釋出-訂閱模式(Pub/Sub)。

(1) P2P (Point to Point) 點對點模型(Queue隊列模型)

(2) Publish/Subscribe(Pub/Sub) 釋出/訂閱模型(Topic主題模型)

常用術語

  • Provider/MessageProvider:生産者
  • Consumer/MessageConsumer:消費者
  • PTP:Point To Point,點對點通信消息模型
  • Pub/Sub:Publish/Subscribe,釋出訂閱消息模型
  • Queue:隊列,目标類型之一,和PTP結合
  • Topic:主題,目标類型之一,和Pub/Sub結合
  • ConnectionFactory:連接配接工廠,JMS用它建立連接配接
  • Connnection:JMS Client到JMS Provider的連接配接
  • Destination:消息目的地,由Session建立
  • Session:會話,由Connection建立,實質上就是發送、接受消息的一個線程,是以生産者、消費者都是Session建立的

點對點模型

點對點模型(Pointer-to-Pointer):即生産者和消費者之間的消息往來

每個消息都被發送到特定的消息隊列,接收者從隊列中擷取消息。隊列保留着消息,直到他們被消費或逾時。

點對點模型的特點:

每個消息隻有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中);

發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之後,不管接收者有沒有正在運作,它不會影響到消息被發送到隊列;

接收者在成功接收消息之後需向隊列應答成功。

釋出/訂閱模型

包含三個角色:主題(Topic),釋出者(Publisher),訂閱者(Subscriber),多個釋出者将消息發

送到topic,系統将這些消息投遞到訂閱此topic的訂閱者

釋出者發送到topic的消息,隻有訂閱了topic的訂閱者才會收到消息。topic實作了釋出和訂閱,當你發

布一個消息,所有訂閱這個topic的服務都能得到這個消息,是以從1到N個訂閱者都能得到這個消息的拷貝。

釋出/訂閱模型的特點:

每個消息可以有多個消費者;

釋出者和訂閱者之間有時間上的依賴性(先訂閱主題,再來發送消息)。

訂閱者必須保持運作的狀态,才能接受釋出者釋出的消息;

4.JMS的消息格式

JMS消息組成

  • 消息頭:
    每個消息頭字段都有相應的getter和setter方法。
  • 消息屬性:
    如果需要除消息頭字段以外的值,那麼可以使用消息屬性。
message.setStringProperty("Property",Property); //自定義屬性
           
  • 消息體:
    JMS定義的消息類型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage(對象實作序列化接口)。

ObjectMessage:

發送

//發送ObjectMessage消息
@Test
public void test2(){
jmsTemplate.send(name, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
        User user = new User();
        user.setName("小蒼");
        user.setAge(18);
        ObjectMessage objectMessage = session.createObjectMessage(user);
        return objectMessage;
        }
    });
}
           

接受

@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
    if(message instanceof ObjectMessage){
        ObjectMessage objectMessage = (ObjectMessage)message;
        try {
            User user = (User)objectMessage.getObject();
            System.out.println(user.getUsername());
            System.out.println(user.getPassword());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

           

ActiveMQ5.12後 ,為了安全考慮,ActiveMQ預設不接受自定義的序列化對象,需要将自定義的 加入到受信任的清單。

spring:
	activemq:
        broker-url: tcp://192.168.66.133:61616
        user: admin
        password: admin
        packages:
        	trust-all: true # 添加所有包到信任清單
           

兩種模式比較

1 Topic Queue
Publish Subscribe messaging 釋出 訂閱消息 Point-to-Point 點對點
有無狀态 topic 資料預設不落地,是無狀态的。 Queue 資料預設會在 mq 服 務器上以檔案形式儲存,比如 Active MQ 一 般 保 存 在 $AMQ_HOME\data\kahadb 下 面。也可以配置成 DB 存儲。
完整性保障 并不保證 publisher 釋出的每條數 據,Subscriber 都能接受到。 Queue 保證每條資料都能 被 receiver 接收。消息不逾時。
消息是否會丢失 一般來說 publisher 釋出消息到某 一個 topic 時,隻有正在監聽該 topic 位址的 sub 能夠接收到消息;如果沒 有 sub 在監聽,該 topic 就丢失了。 Sender 發 送 消 息 到 目 标 Queue, receiver 可以異步接收這 個 Queue 上的消息。Queue 上的 消息如果暫時沒有 receiver 來 取,也不會丢失。前提是消息不 逾時。
消息釋出接 收政策 一對多的消息釋出接收政策,監 聽同一個topic位址的多個sub都能收 到 publisher 發送的消息。Sub 接收完 通知 mq 伺服器 一對一的消息釋出接收策 略,一個 sender 發送的消息,隻 能有一個 receiver 接收。 receiver 接收完後,通知 mq 伺服器已接 收,mq 伺服器對 queue 裡的消 息采取删除或其他操作。

5.ActiveMQ的安裝

Linux安裝流程

第一步:安裝 jdk(略)
第二步:把 activemq的壓縮包(apache-activemq-5.14.5-bin.tar.gz)上傳到 linux 系統
第三步:解壓縮壓縮包
tar -zxvf apache-activemq-5.14.5-bin.tar.gz
第四步:進入apache-activemq-5.14.5的bin目錄
cd apache-activemq-5.14.5/bin
第五步:啟動 activemq
./activemq start (執行2次:第一次:生成配置資訊;第二次:啟動)
第六步:停止activemq:
./activemq stop
           
頁面控制台: http://ip:8161 (監控)
請求位址: tcp://ip:61616 (java代碼通路消息中間件)
賬号:admin
密碼:admin
           
登入後清單各列資訊含義如下:
Number Of Pending Messages :等待消費的消息 這個是目前未出隊列的數量。
Number Of Consumers :消費者 這個是消費者端的消費者數量
Messages Enqueued :進入隊列的消息 進入隊列的總數量,包括出隊列的。
Messages Dequeued :出了隊列的消息 可以了解為是消費這消費掉的數量。
           

windows安裝

下載下傳後直接解壓縮直接就能用(免安裝)。

bin/是服務啟動相關的指令檔案所在目錄

data/是預設持久化檔案所在目錄

docs/裡面放的是使用者手冊

conf/是配置檔案所在目錄,任何配置檔案修改後,必須重新開機ActiveMQ,才能生效.

有幾個配置檔案需要提下,後面會用到:

1.activemq.xml

就是spring配置檔案。配置的是ActiveMQ應用使用的預設對象元件.

transportConnectors标簽 - 配置連結端口資訊的. 其中的端口号61616是ActiveMQ對外釋出的tcp協定通路端口. 就是java代碼通路ActiveMQ時使用的端口.

配置安全認證和持久化都是在這個檔案裡面。

2.jetty.xml

spring配置檔案, ActiveMQ使用的是jetty提供HTTP服務,是以需要該檔案用于配置jetty伺服器的預設對象元件.

3.users.properties

内容資訊: 使用者名=密碼

是用于配置用戶端通過協定通路ActiveMQ時,使用的使用者名和密碼.

4.groups.properties

内容資訊: 使用者組=使用者1,使用者2(多個使用者中間用逗号隔開)

輕按兩下bin\win64目錄下的activemq批處理檔案,即可啟動

通路http://localhost:8161/admin/,輸入預設的使用者名和密碼admin/admin,即可看到管理台頁面

6.原生JMS API操作ActiveMQ

PTP模式

引入坐标

<dependencies>
	<dependency>
		<groupId>org.apache.activemq</groupId>
		<artifactId>activemq-all</artifactId>
		<version>5.11.2</version>
	</dependency>
</dependencies>
           
生産者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 示範點對點模式 -- 消息生産者
 */
public class PTP_Producer {
    public static void main(String[] args) throws JMSException {
//1.建立連接配接工廠
        ConnectionFactory factory
                = new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
//2.建立連接配接
        Connection connection = factory.createConnection();
//3.打開連接配接
        connection.start();
//4.建立session
/**
 * 參數一:是否開啟事務操作
 * 參數二:消息确認機制
 */
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
//5.建立目标位址(Queue:點對點消息,Topic:釋出訂閱消息)
        Queue queue = session.createQueue("queue01");
//6.建立消息生産者
        MessageProducer producer = session.createProducer(queue);
//7.建立消息
//createTextMessage: 文本類型
        TextMessage textMessage = session.createTextMessage("test message");
//8.發送消息
        producer.send(textMessage);
        System.out.println("消息發送完成");
//9.釋放資源
        session.close();
        connection.close();
    }
}
           
消費者

方案一(1)

package com.itheima.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 示範點對點模式- 消息消費者(第一種方案)
 */
public class PTP_Consumer1 {
    public static void main(String[] args) throws JMSException {
//1.建立連接配接工廠
        ConnectionFactory factory
                = new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
//2.建立連接配接
        Connection connection = factory.createConnection();
//3.打開連接配接
        connection.start();
//4.建立session
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
//5.指定目标位址
        Queue queue = session.createQueue("queue01");
//6.建立消息的消費者
        MessageConsumer consumer = session.createConsumer(queue);
//7.接收消息
        while(true){
            Message message = consumer.receive();
//如果已經沒有消息了,結束啦
            if(message==null){
                break;
            }
//如果還有消息,判斷什麼類型的消息
            if(message instanceof TextMessage){
                TextMessage textMessage = (TextMessage)message;
                System.out.println("接收的消息:"+textMessage.getText());
            }
        }
    }
}

           

方案二(2)

package com.itheima.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 示範點對點模式- 消息消費者(第二種方案) -- 更加推薦
 */
public class PTP_Consumer2 {
    public static void main(String[] args) throws JMSException {
//1.建立連接配接工廠
        ConnectionFactory factory
                = new ActiveMQConnectionFactory("tcp://192.168.66.133:61616");
//2.建立連接配接
        Connection connection = factory.createConnection();
//3.打開連接配接
        connection.start();
//4.建立session
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
//5.指定目标位址
        Queue queue = session.createQueue("queue01");
//6.建立消息的消費者
        MessageConsumer consumer = session.createConsumer(queue);
//7.設定消息監聽器來接收消息
        consumer.setMessageListener(new MessageListener() {
            //處理消息
            @Override
            public void onMessage(Message message) {
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        System.out.println("接收的(2):"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
//注意:在監聽器的模式下千萬不要關閉連接配接,一旦關閉,消息無法接收
    }
}
           

Pub/Sub模式

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 主題消息,消息的發送方
 */
public class TopicProducer {
    public static void main(String[] args) throws Exception {
//1.建立連接配接工廠
        ConnectionFactory factory = new
                ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
//2.建立連接配接
        Connection connection = factory.createConnection();
//3.打開連接配接
        connection.start();
//4.建立session
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
//5.建立目标位址(Queue:點對點消息,Topic:釋出訂閱消息)
        Topic topic = session.createTopic("sms");
//6.建立消息生産者
        MessageProducer producer = session.createProducer(topic);
//7.建立消息
        TextMessage message = session.createTextMessage("發短信...");
//8.發送消息
        producer.send(message);
        System.out.println("發送消息:發短信...");
        session.close();;
        connection.close();
    }
}
           
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 主題消息,消息的消費方
 */
public class TopicConsumer {
    public static void main(String[] args) throws Exception {
//1.建立連接配接工廠
        ConnectionFactory factory = new
                ActiveMQConnectionFactory("tcp://192.168.12.132:61616");
//2.建立連接配接
        Connection connection = factory.createConnection();
//3.打開連接配接
        connection.start();
//4.建立session
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
//5.建立目标位址(Queue:點對點消息,Topic:釋出訂閱消息)
        Topic topic = session.createTopic("sms");
//6.建立消息的消費者
        MessageConsumer consumer = session.createConsumer(topic);
//7.配置消息監聽器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消費消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
           

7.SpringBoot整合ActiveMQ

1.依賴
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
           
2.配置:
server:
	port: 9001 #端口
spring:
	application:
		name: activemq-producer # 服務名稱
# springboot與activemq整合配置
	activemq:
		broker-url: tcp://192.168.66.133:61616 # 連接配接位址
		user: admin # activemq使用者名
		password: admin # activemq密碼
# 指定發送模式 (點對點 false , 釋出訂閱 true)
	jms:
		pub-sub-domain: false
           
3.啟動類
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
 * 生産者啟動類
 */
@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class,args);
    }
}
           
4.編寫生産者
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
 * 示範SpringBoot與ActiveMQ整合- 消息生産者
 */
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class SpringBootProducer {
    //JmsMessagingTemplate: 用于工具類發送消息
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Test
    public void ptpSender(){
/**
 * 參數一:隊列的名稱或主題名稱
 * 參數二:消息内容
 */
        jmsMessagingTemplate.convertAndSend("springboot_queue","spring boot
                message");
    }
}
           

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
           
2.配置
server:
	port: 9002 #端口
spring:
	application:
		name: activemq-consumer # 服務名稱
# springboot與activemq整合配置
	activemq:
		broker-url: tcp://192.168.66.133:61616 # 連接配接位址
		user: admin # activemq使用者名
		password: admin # activemq密碼
# 指定發送模式 (點對點 false , 釋出訂閱 true)
	jms:
		pub-sub-domain: false
activemq:
	name: springboot_queue

           
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
 * 消息消費者啟動類
 */
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class,args);
    }
}
           
4.監聽消息類
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
/**
 * 用于監聽消息類(既可以用于隊列的監聽,也可以用于主題監聽)
 */
@Component // 放入IOC容器
public class MsgListener {
    /**
     * 用于接收消息的方法
     * destination: 隊列的名稱或主題的名稱
     */
    @JmsListener(destination = "${activemq.name}")
    public void receiveMessage(Message message){
        if(message instanceof TextMessage){
            TextMessage textMessage = (TextMessage)message;
            try {
                System.out.println("接收消息:"+textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}
           

8.練習SpringBoot 內建 ActiveMQ

1、建立一個springboot項目,添加依賴

<!--ActiveMq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
            <version>1.5.0.RELEASE</version>
        </dependency>
        <!--消息隊列連接配接池-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.0</version>
        </dependency>
           

2、application.yml檔案的配置

server:
  port: 8080

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
    close-timeout: 15s   # 在考慮結束之前等待的時間
    in-memory: true      # 預設代理URL是否應該在記憶體中。如果指定了顯式代理,則忽略此值。
    non-blocking-redelivery: false  # 是否在復原復原消息之前停止消息傳遞。這意味着當啟用此指令時,消息順序不會被保留。
    send-timeout: 0     # 等待消息發送響應的時間。設定為0等待永遠。
    queue-name: active.queue
    topic-name: active.topic.name.model

#  packages:
#    trust-all: true #不配置此項,會報錯
  pool:
    enabled: true
    max-connections: 10   #連接配接池最大連接配接數
    idle-timeout: 30000   #空閑的連接配接過期時間,預設為30秒

 # jms:
 #   pub-sub-domain: true  #預設情況下activemq提供的是queue模式,若要使用topic模式需要配置下面配置

# 是否信任所有包
#spring.activemq.packages.trust-all=
# 要信任的特定包的逗号分隔清單(當不信任所有包時)
#spring.activemq.packages.trusted=
# 當連接配接請求和池滿時是否阻塞。設定false會抛“JMSException異常”。
#spring.activemq.pool.block-if-full=true
# 如果池仍然滿,則在抛出異常前阻塞時間。
#spring.activemq.pool.block-if-full-timeout=-1ms
# 是否在啟動時建立連接配接。可以在啟動時用于加熱池。
#spring.activemq.pool.create-connection-on-startup=true
# 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
#spring.activemq.pool.enabled=false
# 連接配接過期逾時。
#spring.activemq.pool.expiry-timeout=0ms
# 連接配接空閑逾時
#spring.activemq.pool.idle-timeout=30s
# 連接配接池最大連接配接數
#spring.activemq.pool.max-connections=1
# 每個連接配接的有效會話的最大數目。
#spring.activemq.pool.maximum-active-session-per-connection=500
# 當有"JMSException"時嘗試重新連接配接
#spring.activemq.pool.reconnect-on-exception=true
# 在空閑連接配接清除線程之間運作的時間。當為負數時,沒有空閑連接配接驅逐線程運作。
#spring.activemq.pool.time-between-expiration-check=-1ms
# 是否隻使用一個MessageProducer
#spring.activemq.pool.use-anonymous-producers=true
           

3、啟動類增加 @EnableJms 注解

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;

@SpringBootApplication
@EnableJms    //啟動消息隊列
public class SpringbootActivemqApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootActivemqApplication.class, args);
    }

}
           

4、初始化和配置 ActiveMQ 的連接配接

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.core.JmsMessagingTemplate;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;

@Configuration
publicclass BeanConfig
{

    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String username;

    @Value("${spring.activemq.topic-name}")
    private String password;

    @Value("${spring.activemq.queue-name}")
    private String queueName;

    @Value("${spring.activemq.topic-name}")
    private String topicName;

    @Bean(name = "queue")
    public Queue queue() {
        return new ActiveMQQueue(queueName);
    }

    @Bean(name = "topic")
    public Topic topic() {
        return new ActiveMQTopic(topicName);
    }

    @Bean
    public ConnectionFactory connectionFactory(){
        return new ActiveMQConnectionFactory(username, password, brokerUrl);    }

    @Bean
    public JmsMessagingTemplate jmsMessageTemplate(){
        return new JmsMessagingTemplate(connectionFactory());
    }

    // 在Queue模式中,對消息的監聽需要對containerFactory進行配置
    @Bean("queueListener")
    public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(false);
        return factory;
    }

    //在Topic模式中,對消息的監聽需要對containerFactory進行配置
    @Bean("topicListener")
    public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }
}
           

5、生産者(queue 和 topic)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;

@RestController
public class ProducerController
{
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue queue;

    @Autowired
    private Topic topic;

    @PostMapping("/queue/test")
    public String sendQueue(@RequestBody String str) {
        this.sendMessage(this.queue, str);
        return "success";
    }

    @PostMapping("/topic/test")
    public String sendTopic(@RequestBody String str) {
        this.sendMessage(this.topic, str);
        return "success";
    }

    // 發送消息,destination是發送到的隊列,message是待發送的消息
    private void sendMessage(Destination destination, final String message){
        jmsMessagingTemplate.convertAndSend(destination, message);
    }
}
           

6、Queue模式的消費者

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class QueueConsumerListener
{
    //queue模式的消費者
    @JmsListener(destination="${spring.activemq.queue-name}", containerFactory="queueListener")
    public void readActiveQueue(String message) {
        System.out.println("queue接受到:" + message);
    }
}
           

7、topic模式的消費者

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class TopicConsumerListener
{
    //topic模式的消費者
    @JmsListener(destination="${spring.activemq.topic-name}", containerFactory="topicListener")
    public void readActiveQueue(String message) {
        System.out.println("topic接受到:" + message);
    }
}
           

8、測試(使用Postman發消息)

(1) POST: http://localhost:8080/queue/test 消息體:{"aaa" : "queue"}

控制台列印:queue接受到:{"aaa" : "queue"}

(2) POST: http://localhost:8080/topic/test 消息體:{"aaa" : "topic"}

控制台列印:topic接受到:{"aaa" : "topic"}

topic模式有普通訂閱和持久化訂閱

普通訂閱:在消費者啟動之前發送過來的消息,消費者啟動之後不會去消費;

持久化訂閱: 在消費者啟動之前發送過來的消息,消費者啟動之後會去消費;

9.持久化

分類:

1.Memory 消息存儲-基于記憶體的消息存儲。

2.基于日志消息存儲方式,KahaDB是ActiveMQ的預設日志存儲方式,它提供了容量的提升和恢複 能力。

3.基于JDBC的消息存儲方式-資料存儲于資料庫(例如:MySQL)中。

修改jms配置檔案
delivery-mode: non_persistent # 非持久化(記憶體)

delivery-mode: persistent # 持久化(日志KahaDB)
           

JDBC消息存儲

1)application.yml

server:
	port: 9001
spring:
    activemq:
        broker-url: tcp://192.168.66.133:61616
        user: admin
        password: admin
    jms:
        pub-sub-domain: false # false:點對點隊列模式, true:釋出/訂閱模式
        template:
    		delivery-mode: persistent # 持久化
activemq:
    name: springboot-queue01
           

2)修改activemq.xml

<!--配置資料庫連接配接池-->
<bean name="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver" />
    <property name="url" value="jdbc:mysql://192.168.66.133:3306/db_activemq" />
    <property name="username" value="root" />
    <property name="password" value="123456"/>
</bean>
<!--JDBC Jdbc用于master/slave模式的資料庫分享 -->
<persistenceAdapter>
	<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>

           

3)拷貝mysql及durid資料源的jar包到activemq的lib目錄下

4)重新開機activemq

10.消息事務

一個事務性發送,其中一組消息要麼能夠全部保證到達伺服器,要麼都不到達伺服器。 生産者、消費者與消息伺服器直接都支援事務性;

一、生産者事務

/**
 * 事務性發送--方案一
 */
@Test
public void sendMessageTx(){
	//擷取連接配接工廠
    ConnectionFactory connectionFactory=
    jmsMessagingTemplate.getConnectionFactory();
    Session session=null;
    try{
	//建立連接配接
        Connection connection=connectionFactory.createConnection();
	/**
	 * 參數一:是否開啟消息事務
	 */a
        session=connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
		//建立生産者
        MessageProducer producer=
        session.createProducer(session.createQueue(name));
        for(int i=1;i<=10;i++){
        //模拟異常
            if(i==4){
            int a=10/0;
        }
        TextMessage textMessage=session.createTextMessage("消息--"+
        i);
        producer.send(textMessage);
       	}
		//注意:一旦開啟事務發送,那麼就必須使用commit方法進行事務送出,否則消息無法到達
        MQ伺服器
        session.commit();
        }catch(JMSException e){
        	e.printStackTrace();
			//消息事務復原
        try{
        	session.rollback();
        }catch(JMSException e1){
        	e1.printStackTrace();
        }
    }
}

           

方式二:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/**
*
*/
@Configuration
public class ActiveMqConfig {
@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory
    connectionFactory) {
    	return new JmsTransactionManager(connectionFactory);
    }
}
           

生産者業務類:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 消息發送的業務類
*/
@Service
public class MessageService {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Value("${activemq.name}")
    private String name;
    @Transactional // 對消息發送加入事務管理(同時也對JDBC資料庫的事務生效)
    public void sendMessage(){
        for(int i=1;i<=10;i++) {
                //模拟異常
                if(i==4){
                    int a = 10/0;
                }
            jmsMessagingTemplate.convertAndSend(name, "消息---"+i);
        }
    }
}

           

測試發送方法:

@Autowired
private MessageService messageService;
/**
* 事務性發送--方案二: Spring的JmsTransactionManager功能
*/
@Test
public void sendMessageTx2(){
    messageService.sendMessage();
}
           

二、消費者事務

/**
* 消息消費者
*/
@Component
public class Consumer {
    /**
    * 接收消息的方法
    */
    @JmsListener(destination="${activemq.name}",containerFactory =
    "jmsQueryListenerFactory")
    public void receiveMessage(TextMessage textMessage,Session session) throws
    JMSException {
        try {
                System.out.println("消息内容:" + textMessage.getText() + ",是否重發:"
                + textMessage.getJMSRedelivered());
                int i = 100/0; //模拟異常
                session.commit();//送出事務
            } catch (JMSException e) {
            try {
            	session.rollback();//復原事務
            } catch (JMSException e1) {
            }
            e.printStackTrace();
        }
    }
}

           

11.消息确認機制

MS消息隻有在被确認之後,才認為已經被成功地消費了。消息的成功消費通常包含三個階段:客戶接 收消息、客戶處理消息和消息被确認。在事務性會話中,當一個事務被送出的時候,确認自動發生。在 非事務性會話中,消息何時被确認取決于建立會話時的應答模式(acknowledgement mode)。該參 數有以下三個可選值:

Session.AUTO_ACKNOWLEDGE:當客戶成功的從receive方法傳回的時候,或者從 MessageListener.onMessage方法成功傳回的時候,會話 自動确認客戶收到的消息

Session.CLIENT_ACKNOWLEDGE:客戶通過消息的acknowledge方法确認消息。需要注意的 是,在這種模式中,确認是在會話層上進行:确認一個被消 費的消息将自動确認所有已被會話消費的消息。例如,如果 一個消息消費者消費了10個消息,然後确認第5個消息,那 麼所有10個消息都被确認

Session.DUPS_ACKNOWLEDGE:該選擇隻是會話遲鈍确認消息的送出。如果JMS provider失 敗,那麼可能會導緻一些重複的消息。如果是重複的消息, 那麼JMS provider必須把消息頭的JMSRedelivered字段設定 為true