一、docker 安裝 ActiveMQ
1.在docker環境中執行:
// 搜尋activemq鏡像
docker search activemq
// 拉取activemq鏡像
docker pull webcenter/activemq
// 檢視拉取後的activemq鏡像
docker images
// 建立資料檔案夾和日志檔案夾
mkdir -p ./activemq/soft/activemq
mkdir -p ./activemq/soft/activemq/log
// docker執行指令,名稱,背景啟動,綁定端口,開機啟動,資料卷綁定
docker run --name=activemq -itd -p 8161:8161 -p 61616:61616 --restart=always -v /home/docker/activemq/soft/activemq:/data/activemq -v /home/docker/activemq/soft/activemq/log:/var/log/activemq webcenter/activemq:latest
// 預設登陸使用者名密碼
使用者名密碼admin/admin
2.通路頁面
通路路徑:http://ip:8161。
二、ActiveMQ介紹
1.ActiveMQ基于JMS協定,組成部分:
JMS Provider:生産者,支援事務來保證發送可靠性。
JMS Message:JMS 的消息,主要類型有Text,Object,Map,Bytes,Stream五種類型。由消息頭,消息屬性,消息體組成。
JMS Consumer:消費者,支援事務和确認機制來保證消費可靠性。同步:使用recive()方法阻塞接受消息(用戶端拉)。異步:使用監聽方式接受消息(伺服器推)。
JMS Domains:消息傳遞域,支援P2P(點對點傳輸),隻存在一個隊列,生産者發送到隊列,消費者從隊列中消費;支援pub/sub訂閱消費模式,生産者發送到Topic,生産者訂閱topic進行消費(消費者在訂閱之前是收不到消息的。在訂閱之後線上的狀态可以收到消息,如果想離線後依然能接收到消息,需要設定成持久訂閱)。
Connection Factory:連接配接工廠,建立連接配接,通過連接配接可以建立session對話,再建立生産者和消費者。(springboot中整合為jmstemplate模闆,可以直接生産和消費消息);
JMS Connection:封裝了客戶與 JMS 提供者之間的一個虛拟的連接配接。
JMS Session:是生産者和消費者的一個單線程上下文。會話用于建立消息生産者(Producer)、消息消費者(Consumer),和消息(Message)等。會話提供了一個事務性的上下文,一組發送和接收被組合到了一個原子操作中。
2.消息結構
1.消息頭:
屬性 | 含義 |
---|---|
Destination | 目的地,主要是queue和topic |
DeliveryMode | 傳遞模式,在send或者jmstemplate中設定。分為持久模式和非持久模式 |
Expiration | 消息過期/到期時間,在send或者jmstemplate中設定 |
Priority | 消息優先級,有 0-9 十個級别,0-4是普通消息,5-9是加急消息。JMS 不要求 JMS Provider 嚴格按着十個優先級發送消息,但必須保證加急消息要先于普通消息到達。預設是第4級 |
MessageID | 由生産者自動配置設定,唯一的ID,以ID:開頭 |
Timestamp | 生産者發送消息到消息call或者return傳回的時間差 |
JMSType | JMS 消息類型的識别符 |
CorrelationID | JMS 相關性 id,由用戶端設定。用來連接配接到另外一個消息,典型的應用是在回複消息中連接配接到原消息 |
ReplyTo | 回複,由用戶端設定。提供本資訊回複消息的目的位址 |
Redelivered | 重發,由JMS Provider(供應商)設定 |
2.消息體:JMS API 定義了 5 種消息體格式,也叫消息類型,可以使用不同的形式發送接收資料,并可以相容現有的消息格式。包括:TextMessage、MapMessage、BytesMessage、StreamMessage 和 ObjectMessage。它們都是 Message 接口的子類。
3.消息屬性:自定義屬性,JMS定義的屬性,供應商特點的屬性。
三、ActiveMQ + SpringBoot的使用
ActiveMQ配置:
spring:
activemq:
broker-url: tcp://192.168.99.100:61616
user: admin
password: admin
in-memory: false # 基于外部mq模式
pool:
enable: true #開啟連結池
max-connections: 10 #最大連結數
package com.zwfw.framework.activemq.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import javax.jms.DeliveryMode;
import javax.jms.Session;
@Configuration
public class ActivemqConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String username;
@Value("${spring.activemq.password}")
private String password;
/**
* 消息重發政策配置
*/
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
//是否在每次嘗試重新發送失敗後,增長這個等待時間
redeliveryPolicy.setUseExponentialBackOff(true);
//重發次數,預設為6次-設定為3次
redeliveryPolicy.setMaximumRedeliveries(3);
//重發時間間隔機關毫秒,預設為1秒
redeliveryPolicy.setInitialRedeliveryDelay(1000L);
//第一次失敗後重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒
redeliveryPolicy.setBackOffMultiplier(2);
// 是否避免消息碰撞
redeliveryPolicy.setUseCollisionAvoidance(false);
// 設定重發最大拖延時間-1表示無延遲限制
redeliveryPolicy.setMaximumRedeliveryDelay(-1);
return redeliveryPolicy;
}
/**
* 消息工廠配置
*/
@Bean
public ActiveMQConnectionFactory activeMqConnectionFactory() {
ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory(username, password, brokerUrl);
activeMqConnectionFactory.setRedeliveryPolicy(redeliveryPolicy());
return activeMqConnectionFactory;
}
@Bean(name = "jmsTemplate")
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate();
// 設定連接配接工廠
jmsTemplate.setConnectionFactory(activeMqConnectionFactory());
//deliveryMode, priority, timeToLive 的開關,要生效,必須配置為true,預設false
jmsTemplate.setExplicitQosEnabled(true);
//定義持久化後節點挂掉以後,重新開機可以繼續消費 1表示非持久化,2表示持久化
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
/**
* 如果不啟用事務,則會導緻XA事務失效;
* 作為生産者如果需要支援事務,則需要配置SessionTransacted為true
*/
jmsTemplate.setSessionTransacted(false);
//消息的應答方式,需要手動确認,此時SessionTransacted必須被設定為false,且為Session.CLIENT_ACKNOWLEDGE模式
/**
* 當關閉事務時候,下面設定才有效
* Session.AUTO_ACKNOWLEDGE 消息自動簽收
* Session.CLIENT_ACKNOWLEDGE 用戶端調用acknowledge方法手動簽收
* Session.DUPS_OK_ACKNOWLEDGE 不必必須簽收,消息可能會重複發送
*/
jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
return jmsTemplate;
}
/**
* topic模式的ListenerContainer
* topic下沒有消息回執一說,确認消息之存在queue模式
* 浏覽隻是針對 Queue 的概念,Topic 沒有浏覽。浏覽是指擷取消息而消息依然保持在 broker 中,而消息的接收會把消息從 broker 中移除。
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);
factory.setConnectionFactory(activeMqConnectionFactory());
factory.setMessageConverter(jacksonJmsMessageConverter());
return factory;
}
/**
* queue模式的ListenerContainer
* 監聽容器配置,使用jackson的消息轉換器
* 1 不開啟事務,手動确認,自動确認
* 2 開啟事務,是自動應答,當用戶端消費有異常抛出,會進行重試模式,按照上面重試配置次數重試後,如果還是失敗,則會進入死信隊列
* @return
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// 關閉事務
factory.setSessionTransacted(false);
// 設定手動确認,預設配置中Session是開啟了事務的,事務優先級大于用戶端确認,即使我們設定了手動Ack也是無效的
factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
factory.setConnectionFactory(activeMqConnectionFactory());
factory.setMessageConverter(jacksonJmsMessageConverter());
return factory;
}
/**
* queue模式的ListenerContainer
* 監聽容器配置,使用自帶的消息轉換器
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueueNoConver() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// 關閉事務
factory.setSessionTransacted(false);
// 設定手動确認,預設配置中Session是開啟了事物的,即使我們設定了手動Ack也是無效的
factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
factory.setConnectionFactory(activeMqConnectionFactory());
return factory;
}
/**
* 自定義消息轉換器
* @return
*/
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
// MappingJackson2MessageConverter隻支援TEXT和byte類型的轉換,
// 見org.springframework.jms.support.converter.MappingJackson2MessageConverter.toMessage
converter.setTargetType(MessageType.TEXT);
// 可以為任何字元,但必需要配置,在下文中的setTypeIdOnMessage方法中會用上
converter.setTypeIdPropertyName("_type");
return converter;
}
}
queue和topic配置
package com.zwfw.framework.activemq.queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.Queue;
import javax.jms.Topic;
@Configuration
public class QueueConfig {
/**
* 聲明普通隊列
*/
@Bean
public Queue conmonQueue(){
return new ActiveMQQueue("common.queue");
}
/**
* 聲明延時隊列
*/
@Bean
public Queue delayQueue(){
return new ActiveMQQueue("delay.queue");
}
/**
* 聲明廣播類型隊列
*/
@Bean
public Topic topicQueue(){
return new ActiveMQTopic("topic.queue");
}
}
上面是mq的基本配置,配置了幾個監聽容器:queue模式下的帶事務和不帶事務手動确認的容器和topic監聽的容器,這些容器在後續監聽類中配置用得上。
P2P模式(圖是借鑒來的):
前言:該模式,點對點傳輸。生産者傳輸消息到隊列,消費者從隊列中消費。生産者可配置事務,如果開啟事務發送,隻有在commit之後,隊列中才會有消息入列,如果rollback則不會進入隊列;用戶端一般有兩種方式,一種是事務,第二種是确認機制:開啟事務,不需要設定确認機制(事務優先級大于确認機制,設定了手動确認等也無效),預設就是自動确認,當事務方法中抛出異常,則消息不會被消費,會進入重試,重試次數到了之後,會進入activemq的DLQ隊列(死信)。
生産者代碼和說明:
@RestController
public class SendController {
@Autowired
private Queue conmonQueue;
@Autowired
private JmsTemplate jmsTemplate;
/**
* 單條資料發送,事務模式
*/
@RequestMapping("/commonQueue")
public void commonQueue() throws InterruptedException {
for (int i = 0; i < 20; i++) {
Book book = new Book();
book.setName("三體" + i).setAuthor("劉慈欣" + i).setType("科幻" + i);
// 設定事務模式發送
jmsTemplate.setSessionTransacted(true);
jmsTemplate.convertAndSend(conmonQueue,book);
if (i == 12) {
throw new RuntimeException();
}
}
}
/**
* 批量發送,事務模式
*/
@RequestMapping("/commonQueue/{num}")
public void commonQueueTranscate(@PathVariable("num") Integer num) throws Exception {
MessageProducer pd = null;
Session session = null;
Connection connection = null;
try {
ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();
connection = connectionFactory.createConnection();
connection.start();
// 開啟事務,隻能設定AUTO_ACKNOWLEDGE,其他模式無效且不受控制
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
pd = session.createProducer(conmonQueue);
for (int i = 0; i < 20; i++) {
Book book = new Book();
book.setName("三體" + i).setAuthor("劉慈欣" + i).setType("科幻" + i);
// 用内置的消息轉換器
TextMessage message = session.createTextMessage(JSON.toJSONString(book));
// 用jackson的消息轉化器
// Message message = jmsTemplate.getMessageConverter().toMessage(book, session);
pd.send(message);
System.out.println("send book" + i + " to queue");
}
// 測試判斷,偶數送出,奇數復原
if (num % 2 == 0) {
session.commit();
}else {
session.rollback();
}
} catch (JMSException e) {
throw new RuntimeException(e);
}finally {
pd.close();
session.close();
connection.close();
}
}
}
事務:使用springboot整合後,基本上是使用jmstemplate模版進行消息的發送。在事務環境下,好像隻能發送單條并且發送成功後确認單條資料。這樣資料量大的話感覺會影響效率。如果在批量發送環境下,用jmstemplate發送我還沒有找到合适的方法 ………是以用session會話的模式,設定事務進行批量發送。
序列化:在配置檔案中,定義了jackson的序列化方式,如果不定義,就是使用預設的org.springframework.jms.support.converter.SimpleMessageConverter.toMessage序列化。在裡面根據你發送消息的類型來序列化。
如果使用的是自定義的jackson序列化,發送的jmstemplate模闆也需要注入jackson的序列化配置,在監聽容器配置中,也需要注入jackson的序列化配置。否則,如果發送的jmstemplate沒有注入,或者用的會話模式session.createTextMessage來發送的消息(自帶的序列化),在監聽收到消息後會序列化失敗,原因是,使用了jackson配置發送的消息,在内部會調用setTypeIdOnMessage,裡面會塞入設定過的typeIdPropertyName。當消費者反序列化的時候,則會調用getJavaTypeForMessage方法,裡面會判斷有沒有這個屬性,如果沒有則抛出異常。
總而言之,發送端和接收端的序列化配置必需同步。
當用戶端監聽配置的反序列化是jackson後,jmstemplate也要注入jackson配置。如果想要批量發送消息,可以使用下面的模版來構造一個消息對象,通過配置jackson後的jmstemplate方法是調用了setTypeIdOnMessage方法,在反序列化的時候不會出現上面異常問題:
消費者代碼和說明:
@Component
public class ActiveListener {
/**
* 将開啟事務時候,方法内有異常則不會确認消費
* 發生異常會進入重試模式,伺服器按重試配置數推送,預設6次
* 重試還是失敗,消息會進入死信隊列
*/
@JmsListener(destination = "common.queue", containerFactory = "jmsListenerContainerQueue")
public void commonQueueListen(Book book, ActiveMQMessage message) throws Exception {
System.out.println(book);
// 手動确認消息,當開啟事務時,此設定無效
message.acknowledge();
}
/**
* 使用内置序列化的配置的監聽容器
*/
@JmsListener(destination = "common.queue", containerFactory = "jmsListenerContainerQueueNoConver")
public void commonQueueListen1(String book, ActiveMQMessage message) throws Exception {
System.out.println(book);
// 手動确認消息,當開啟事務時,此設定無效
message.acknowledge();
}
}
當監聽容器開啟事務後, message.acknowledge()方法并沒有作用,事務對應的是自動确認,不受控制。當監聽容器關閉事務,應使用确認機制,一般手動确認,如果沒有确認,則消息不會被消費。
分組和并發消費
并發消費:如果想在發送消息并且由多個消費者一起并發消費,可以通過設定配置檔案中的concurrency屬性,或者在@JmsListener中給注解屬性concurrency設定數量,如下圖。
分組消費:用于隊列模式。分組消費需要在生産者發送的消息中設定消息頭,使用setStringProperty來設定消息的頭屬性。在消費者端,在@JmsListener注解上添加消息頭過濾 selector =“JMSXGroupID=‘groupB’”,就可以完成分組消費。
生産者:
@RequestMapping("/groupQueue")
public void groupQueue() throws InterruptedException {
for (int i = 0; i < 20; i++) {
Book book = new Book();
book.setName("三體" + i).setAuthor("劉慈欣" + i).setType("科幻" + i);
jmsTemplate.setSessionTransacted(true);
// 可以用session會話模式生成的message來設定消息頭,我這裡用模版發送,在生成message同時塞入屬性
jmsTemplate.send(conmonQueue, session -> {
Message message = jmsTemplate.getMessageConverter().toMessage(book, session);
message.setStringProperty("JMSXGroupID","groupA");
return message;
});
}
}
消費者:
/**
* 分組
* selector,過濾頭屬性
* concurrency,并發數量,可以生成多個消費者
*/
@JmsListener(destination = "common.queue", containerFactory = "jmsListenerContainerQueue", selector ="JMSXGroupID='groupA'")
public void commonQueueListenGroupA(Book book, ActiveMQMessage message) throws Exception {
System.out.println("groupA: " + book);
// 手動确認消息,當開啟事務時,此設定無效
message.acknowledge();
}
/**
* 分組
* selector,過濾頭屬性
* concurrency,并發數量,可以生成多個消費者
*/
@JmsListener(concurrency = "10", destination = "common.queue", containerFactory = "jmsListenerContainerQueue", selector ="JMSXGroupID='groupB'")
public void commonQueueListenGroupB(Book book, ActiveMQMessage message) throws Exception {
System.out.println("groupB: " + book);
// 手動确認消息,當開啟事務時,此設定無效
message.acknowledge();
}
pub/sub模式(圖是借鑒來的):
topic模式待完善。。。。
四、解決的問題
此文主要是提供ActiveMQ基本的配置,生産者事務,消費者事務和确認機制,生産者和消費者的序列化問題。更詳細的介紹網上有很多資料可以查閱。主要是自己學習的時候,網上的代碼都沒有很好的解決問題,是以在這裡記錄一下。
- 序列化和反序列化必需同步:如果使用原生的session來發送消息是沒有配置序列化的(應該也是可以設定自定義的序列化配置的),監聽類也需要用自帶的預設的序列化方式來接受對象。如果使用了jackson序列化方式,在配置類中,jmstemplate和監聽容器都要注入此配置,并且發送的時候需要用帶有jackson配置的jmstemplate來發送消息,否則消息會序列化失敗。
- 生産者的事務:如果用jmstemplate來發送消息,跟讀源碼會發現,如果設定setSessionTransacted為true,則會代理生成一個事務,并且發送了會commit。在網上用模版批量發送事務消息的例子。于是就用的session會話來控制批量消息事務的控制。如果要用自定義序列化,則可以用“Message message = jmsTemplate.getMessageConverter().toMessage(book, session);”來構造一個含有jackson序列化配置的消息。
- 消費者的事務和确認機制:當事務設定為true的時候,和生産者一樣,在配置确認機制就不生效了。因為預設就是自動确認。當發生異常事務復原,會進行重試階段,重試後如果失敗,則會進入死信隊列。當事務設定為false的時候,一般設定的是手動确認,隻有确認後的資料才會出隊,否則資料會一直存在隊列裡面(排除定時的消息)。
- 批量事務:用模版方法發送貌似隻能一條條的來,批量的話得用session來發送,并commit;
- topic的代碼還沒有實作,還在學習中,後續補上。。。