文章目錄
- 1. 引入依賴
- 2. 配置檔案
- 3. 生産者
- 4. 配置config
- 5. queue消費者
- 6. topic消費者
- 6. ActiveMQ 消息存儲規則
1. 引入依賴
pom檔案引入activemq依賴
<!--activeMq配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.7</version>
</dependency>
2. 配置檔案
spring:
activemq:
user: admin
password: admin
broker-url: failover:(tcp://192.168.43.666:61616)
#是否信任所有包(如果傳遞的是對象則需要設定為true,預設是傳字元串)
packages:
trust-all: true
#連接配接池
pool:
enabled: true
max-connections: 5
idle-timeout: 30000
# expiry-timeout: 0
jms:
#預設使用queue模式,使用topic則需要設定為true
pub-sub-domain: true
# 是否信任所有包
#spring.activemq.packages.trust-all=
# 要信任的特定包的逗号分隔清單(當不信任所有包時)
#spring.activemq.packages.trusted=
# 當連接配接請求和池滿時是否阻塞。設定false會抛“JMSException異常”。
#spring.activemq.pool.block-if-full=true
# 如果池仍然滿,則在抛出異常前阻塞時間。
#spring.activemq.pool.block-if-full-timeout=-1ms
# 是否在啟動時建立連接配接。可以在啟動時用于加熱池。
#spring.activemq.pool.create-connection-on-startup=true
# 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
#spring.activemq.pool.enabled=false
# 連接配接過期逾時。
#spring.activemq.pool.expiry-timeout=0ms
# 連接配接空閑逾時
#spring.activemq.pool.idle-timeout=30s
# 連接配接池最大連接配接數
#spring.activemq.pool.max-connections=1
# 每個連接配接的有效會話的最大數目。
#spring.activemq.pool.maximum-active-session-per-connection=500
# 當有"JMSException"時嘗試重新連接配接
#spring.activemq.pool.reconnect-on-exception=true
# 在空閑連接配接清除線程之間運作的時間。當為負數時,沒有空閑連接配接驅逐線程運作。
#spring.activemq.pool.time-between-expiration-check=-1ms
# 是否隻使用一個MessageProducer
#spring.activemq.pool.use-anonymous-producers=true
3. 生産者
package com.gblfy.producer;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.*;
import java.io.Serializable;
/**
* 發送消息
*
* @author gblfy
* @date 2022-11-02
*/
@RestController
@RequestMapping(value = "/active")
public class SendController {
//也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* 發送消息接口
* 發送queue消息 :http://127.0.0.1:8080/active/send?msg=ceshi1234
* 發送topic 消息: http://127.0.0.1:8080/active/topic/send?msg=ceshi1234
* 發送queue消息(延遲time毫秒) :http://127.0.0.1:8080/active/send?msg=ceshi1234&time=5000
*
* @param msg 消息
* @param type url中參數,非必須
* @param time
* @return
*/
@RequestMapping({"/send", "/{type}/send"})
public String send(@PathVariable(value = "type", required = false) String type, String msg, Long time) {
Destination destination = null;
if (type == null) {
type = "";
}
switch (type) {
case "topic":
//發送廣播消息
destination = new ActiveMQTopic("active.topic");
break;
default:
//發送 隊列消息
destination = new ActiveMQQueue("active.queue");
break;
}
// System.out.println("開始請求發送:"+DateUtil.getStringDate(new Date(),"yyyy-MM-dd HH:mm:ss"));
if (time != null && time > 0) {
//延遲隊列,延遲time毫秒
//延遲隊列需要在 <broker>标簽上增加屬性 schedulerSupport="true"
delaySend(destination, msg, time);
} else {
jmsMessagingTemplate.convertAndSend(destination, msg);//無序
//jmsMessagingTemplate.convertSendAndReceive();//有序
}
return "activemq消息發送成功 隊列消息:" + msg;
}
/**
* 延時發送
* 說明:延遲隊列需要在 <broker>标簽上增加屬性 schedulerSupport="true"
*
* @param destination 發送的隊列
* @param data 發送的消息
* @param time 延遲時間 /毫秒
*/
public <T extends Serializable> void delaySend(Destination destination, T data, Long time) {
Connection connection = null;
Session session = null;
MessageProducer producer = null;
// 擷取連接配接工廠
ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory();
try {
// 擷取連接配接
connection = connectionFactory.createConnection();
connection.start();
// 擷取session,true開啟事務,false關閉事務
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 建立一個消息隊列
producer = session.createProducer(destination);
producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
ObjectMessage message = session.createObjectMessage(data);
//設定延遲時間
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
// 發送消息
producer.send(message);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
4. 配置config
package com.gblfy.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.Queue;
import javax.jms.Topic;
/**
* 描述:
* activemq 有兩種模式 queue 和 topic
* queue 模式是單對單,有多個消費者的情況下則是使用輪詢監聽
* topic 模式/廣播模式/釋出訂閱模式 是一對多,發送消息所有的消費者都能夠監聽到
*
* @author gblfy
* @date 2022-11-02
*/
@EnableJms
@Configuration
public class ActiveMQConfig {
//隊列名
private static final String queueName = "active.queue";
//主題名
private static final String topicName = "active.topic";
@Value("${spring.activemq.user:}")
private String username;
@Value("${spring.activemq.password:}")
private String password;
@Value("${spring.activemq.broker-url:}")
private String brokerUrl;
@Bean
public Queue acQueue() {
return new ActiveMQQueue(queueName);
}
@Bean
public Topic acTopic() {
return new ActiveMQTopic(topicName);
}
@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(username, password, brokerUrl);
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
// 關閉Session事務,手動确認與事務沖突
bean.setSessionTransacted(false);
// 設定消息的簽收模式(自己簽收)
/**
* AUTO_ACKNOWLEDGE = 1 :自動确認
* CLIENT_ACKNOWLEDGE = 2:用戶端手動确認
* DUPS_OK_ACKNOWLEDGE = 3: 自動批量确認
* SESSION_TRANSACTED = 0:事務送出并确認
* 但是在activemq補充了一個自定義的ACK模式:
* INDIVIDUAL_ACKNOWLEDGE = 4:單條消息确認
**/
bean.setSessionAcknowledgeMode(4);
//此處設定消息重發規則,redeliveryPolicy() 中定義
connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
bean.setConnectionFactory(connectionFactory);
return bean;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
// 關閉Session事務,手動确認與事務沖突
bean.setSessionTransacted(false);
bean.setSessionAcknowledgeMode(4);
//設定為釋出訂閱方式, 預設情況下使用的生産消費者方式
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
return bean;
}
/**
* 消息的重發規則配置
*/
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
// 是否在每次嘗試重新發送失敗後,增長這個等待時間
redeliveryPolicy.setUseExponentialBackOff(true);
// 重發次數五次, 總共六次
redeliveryPolicy.setMaximumRedeliveries(5);
// 重發時間間隔,預設為1000ms(1秒)
redeliveryPolicy.setInitialRedeliveryDelay(1000);
// 重發時長遞增的時間倍數2
redeliveryPolicy.setBackOffMultiplier(2);
// 是否避免消息碰撞
redeliveryPolicy.setUseCollisionAvoidance(false);
// 設定重發最大拖延時間-1表示無延遲限制
redeliveryPolicy.setMaximumRedeliveryDelay(-1);
return redeliveryPolicy;
}
}
5. queue消費者
package com.gblfy.listener;
import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Session;
/**
* TODO
*
* @author gblfy
* @Date 2022-11-02
**/
@Component
public class QueueListener {
/**
* queue 模式 單對單,兩個消費者監聽同一個隊列則通過輪詢接收消息
* containerFactory屬性的值關聯config類中的聲明
*
* @param msg
*/
@JmsListener(destination = "active.queue", containerFactory = "jmsListenerContainerQueue")
public void queueListener(ActiveMQMessage message, Session session, String msg) throws JMSException {
try {
System.out.println("active queue 接收到消息 " + msg);
//手動簽收
message.acknowledge();
} catch (Exception e) {
//重新發送
session.recover();
}
}
}
6. topic消費者
package com.gblfy.listener;
import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Session;
/**
* TODO
*
* @author gblfy
* @Date 2022-11-02
**/
@Component
public class TopicListener {
/**
* topic 模式/廣播模式/釋出訂閱模式 一對多,多個消費者可同時接收到消息
* topic 模式無死信隊列,死信隊列是queue模式
* containerFactory屬性的值關聯config類中的聲明
*
* @param msg
*/
@JmsListener(destination = "active.topic", containerFactory = "jmsListenerContainerTopic")
public void topicListener(ActiveMQMessage message, Session session, String msg) throws JMSException {
try {
// System.out.println("接收到消息:" + DateUtil.getStringDate(new Date(), "yyyy-MM-dd HH:mm:ss"));
System.out.println("active topic 接收到消息 " + msg);
System.out.println("");
//手動簽收
message.acknowledge();
} catch (Exception e) {
//重新發送
session.recover();
}
}
@JmsListener(destination = "active.topic", containerFactory = "jmsListenerContainerTopic")
public void topicListener2(ActiveMQMessage message, Session session, String msg) throws JMSException {
try {
// System.out.println("接收到消息:" + DateUtil.getStringDate(new Date(), "yyyy-MM-dd HH:mm:ss"));
System.out.println("active topic2 接收到消息 " + msg);
System.out.println("");
//手動簽收
message.acknowledge();
} catch (Exception e) {
//重新發送
session.recover();
}
}
}
6. ActiveMQ 消息存儲規則
QUEUE 點對點:
特點:消息遵循先到先得,消息隻能被一個消費者消費。
消息存儲規則:消費者消費消息成功,MQ服務端消息删除
TOPIC訂閱模式: 消息屬于廣播(訂閱)模式,消息會被所有的topic消費者消費消息。
消息存儲規則:所有消費者消費成功,MQ服務端消息删除,有一個消息沒有沒有消費完成,消息也會存儲在MQ服務端。