ActiveMQ
實作了
JMS
規範。
ActiveMQ中相關概念術語
-
Destination
目的地
消息将要發送的地方,包括:
和Queue
,它們都對Topic
接口進行了實作Destination
-
-
模式 - QueuePTP
-
模式 - Topic釋出訂閱
需要指定MessageProvider
才能發送消息,Destination
MessageConsumer
才能接收和消費消息。Destination
-
-
Producer
消息生産者
消息生産者,負責将消息發送到目的地
。Destination
-
Consumer
消息消費者
消息消費者,負責從目的地
消費消息。Destination
-
消息本體Message
-
ConnectionFactory
連接配接工廠
用于建立連接配接的工廠
-
Connection
連接配接
使用者通路ActiveMQ
-
Session
會話
一次持久有效有狀态的通路,由
建立,是具體操作消息的基礎支撐。Connection
JMS
中定義了兩種消息模型:點對點(point to point, queue)和釋出/訂閱(publish/subscribe,topic)。主要差別就是是能否重複消費。
JMS中Queue模式與Topic模式對比
Topic | Queue | |
---|---|---|
概要 | Publish Subscribe messaging 釋出訂閱消息 | Point-to-Point 點對點 |
有無狀态 | topic資料預設不落地,是無狀态的。 | Queue資料預設會在mq伺服器上以檔案形式儲存,比如Active MQ一般儲存在$AMQ_HOMEdatakr-storedata下面。也可以配置成DB存儲。 |
完整性保障 | 并不保證publisher釋出的每條資料,Subscriber都能接受到。 | Queue保證每條資料都能被receiver接收。 |
消息是否會丢失 | 一般來說publisher釋出消息到某一個topic時,隻有正在監聽該topic位址的sub能夠接收到消息;如果沒有sub在監聽,該topic就丢失了。 | Sender發送消息到目标Queue,receiver可以異步接收這個Queue上的消息。Queue上的消息如果暫時沒有receiver來取,也不會丢失。 |
消息釋出接收政策 | 一對多的消息釋出接收政策,監聽同一個topic位址的多個sub都能收到publisher發送的消息。Sub接收完通知mq伺服器 | 一對一的消息釋出接收政策,一個sender發送的消息,隻能有一個receiver接收。receiver接收完後,通知mq伺服器已接收,mq伺服器對queue裡的消息采取删除或其他操作。 |
1. PTP Queue不可重複消費
消息生産者生産消息發送到queue中,然後消息消費者從queue中取出并且消費消息。
消息被消費以後(消費者ack應答确認/事務模式),queue中不再有存儲,是以消息消費者不可能消費到已經被消費的消息。
Queue支援存在多個消費者,但是對一個消息而言,隻會有一個消費者可以消費、其它的則不能消費此消息了。
當消費者不存在時,消息會一直儲存,直到有消費消費
2. 釋出訂閱模式 Topic 可以重複消費
消息生産者(釋出)将消息釋出到Topic中,同時有多個消息消費者(訂閱該Topic)消費該消息。
和點對點方式不同,釋出到topic的消息會被所有訂閱者消費。
當生産者釋出消息,不管是否有消費者。都不會儲存消息。如果生産者向隊列發送消息時,沒有消費者訂閱該隊列,則消息全部丢失。否則向所有訂閱了該Topic的消費者發送同樣的消息(即:消費者必須線上)
在SpringBoot中使用ActiveMQ
ActiveMQ管理位址: http://localhost:8161/admin/
- PTP模式
- 依賴
//jms-active compile 'org.springframework.boot:spring-boot-starter-activemq' //active連接配接池-1.5.13依賴 compile 'org.apache.activemq:activemq-pool'
- 配置資訊
spring: # activemq activemq: broker-url: failover:(tcp://localhost:61616,tcp://localhost:666)?randomize=false # tcp://localhost:61616/故障轉移,預設情況下如果某個連結失效了,則從清單中随機擷取一個,如果設定了randomize=false則是嚴格按照清單的先後順序的 user: admin # 使用者名 password: admin # 密碼 in-memory: false # 基于記憶體的activemq close-timeout: 15s # 在考慮結束之前等待的時間 pool: enabled: true # 啟動連接配接池(是否用Pooledconnectionfactory代替普通的ConnectionFactory) max-connections: 10 # 最大連結數量 idle-timeout: 60s # 空閑連接配接存活時間 block-if-full: true # 當連接配接請求和池滿時是否阻塞。設定false會抛“JMSException異常” block-if-full-timeout: -1 # 如果池仍然滿,則在抛出異常之前阻塞時間 create-connection-on-startup: true # 是否在啟動時建立連接配接。可以在啟動時用于加熱池 maximum-active-session-per-connection: 500 # 每個連接配接的有效會話的最大數目。 reconnect-on-exception: true # 當發生"JMSException"時嘗試重新連接配接 jms: pub-sub-domain: false # 預設情況下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
- 定義
模式下的PTP
-Destination
Queue
/** * @author futao * Created on 2019-06-04. */ @AllArgsConstructor @Getter public enum ActiveMqQueueEnum { /** * springboot-test-queue=測試Queue */ TEST_QUEUE("springboot-test-queue", "測試Queue"); private String queueName; private String desc; public static final String testQueue = "springboot-test-queue"; } /** * @author futao * Created on 2019-06-04. */ @Configuration public class ActiveMqConfig { /** * The ActiveMQConnectionFactory creates ActiveMQ Connections. * The PooledConnectionFactory pools Connections. * If you only need to create one Connection and keep it around for a long time you don't need to pool. * If you tend to create many Connection instances over time then Pooling is better as connecting is a heavy operation and can be a performance bottleneck. * <p> * 可以在這裡統一設定JmsTemplate的一些配置,也可以在具體使用到JmsTemplate的時候單獨設定 * JmsMessageTemplate是對JmsTemplate的進一步封裝 * TODO 目前看起來不起作用 * * @param factory * @return */ // @Primary // @Bean public JmsTemplate jmsTemplate(PooledConnectionFactory factory) { JmsTemplate jmsTemplate = new JmsTemplate(); //關閉事物 jmsTemplate.setSessionTransacted(false); //TODO 在此設定無效 // jmsTemplate.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); jmsTemplate.setConnectionFactory(factory); return jmsTemplate; } @Bean(name = ActiveMqQueueEnum.testQueue) public ActiveMQQueue activeTestQueue() { return new ActiveMQQueue(ActiveMqQueueEnum.TEST_QUEUE.getQueueName()); } /** * 定義一個消息監聽器連接配接工廠,這裡定義的是點對點模式的監聽器連接配接工廠 * * @param pooledConnectionFactory * @return */ @Bean(name = "jmsQueueListener") public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(PooledConnectionFactory pooledConnectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(pooledConnectionFactory); factory.setSessionTransacted(false); factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); return factory; } }
-
模式下的生産者PTP
package com.futao.springbootdemo.foundation.mq.active.ptp; import lombok.extern.slf4j.Slf4j; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Component; import javax.jms.JMSException; /** * PTP模式生産者 * * @author futao * Created on 2019-06-06. */ @Slf4j @Component public class PtpProducer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; /** * 目的地 */ @Qualifier("springboot-test-queue") @Autowired private ActiveMQQueue springBootTestQueue; public void send(String msg) { jmsMessagingTemplate.convertAndSend(springBootTestQueue, msg); try { log.info("send to ActiveMQ-Queue[{}] success ,msg:[{}]", springBootTestQueue.getQueueName(), msg); } catch (JMSException e) { e.printStackTrace(); } } } /** * @author futao * Created on 2019-06-04. */ @RequestMapping("/activemq") @RestController public class ActiveController { @Resource private PtpProducer ptpProducer; @PostMapping("/ptp/sender") public void ptpSender(@RequestParam String msg) { ptpProducer.send(msg); } }
-
模式下的消費者PTP
package com.futao.springbootdemo.foundation.mq.active.ptp; import com.futao.springbootdemo.foundation.mq.active.ActiveMqQueueEnum; import lombok.extern.slf4j.Slf4j; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; import org.junit.Test; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Service; import javax.jms.*; /** * @author futao * Created on 2019-06-06. */ @Slf4j @Service public class PtpConsumer { @JmsListener(destination = ActiveMqQueueEnum.testQueue, containerFactory = "jmsQueueListener") public void ptpConsumer(ActiveMQMessage message) throws JMSException { String text = ((TextMessage) message).getText(); if ("節日快樂666".equalsIgnoreCase(text)) { message.acknowledge(); //ack手動确認 } log.info("receive message from activeMQ :[{}]", text); } /** * 手動建立ActiveMQConnectionFactory消費消息,生産消息也類似 */ @Test public void test() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//開啟ack手動确認 MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(ActiveMqQueueEnum.TEST_QUEUE.getQueueName())); connection.start(); consumer.setMessageListener(message -> { try { String text = ((TextMessage) message).getText(); System.out.println(("收到消息:{}" + text)); if ("節日快樂666".equalsIgnoreCase(text)) { message.acknowledge(); //ack手動确認 } } catch (JMSException e) { e.printStackTrace(); } }); Thread.sleep(999999999); } }
- 依賴
- 特點
- 一條消息隻會發送給其中某一個單獨的消費者
- 未被确認的消息将再次發送給其他消費
- 釋出訂閱模式
- 釋出訂閱模式需要将
,其他配置不需要修改spring.jms.pub-sub-domain=true
- 定義釋出訂閱模式下的
Destination
Topic
/** * @author futao * Created on 2019-06-04. */ @Configuration public class ActiveMqConfig { /** * ActiveMQ topic的定義 */ public static class TopicDefinition { public static final String activeTestTopic = "active-test-topic"; public static final String activeProdTopic = "active-prod-topic"; } /** * 定義一個名為BeanName為activeTestTopic的Topic:active-test-topic * * @return */ @Bean(name = "activeTestTopic") public ActiveMQTopic activeMQTestTopic() { return new ActiveMQTopic(TopicDefinition.activeTestTopic); } /** * 定義一個名為BeanName為activeProdTopic的Topic:active-prod-topic * * @return */ @Bean(name = "activeProdTopic") public ActiveMQTopic activeMQProdTopic() { return new ActiveMQTopic(TopicDefinition.activeProdTopic); } } @PostMapping("/ps/sender") public void pushTest(@RequestParam String msg) { activeMqProducer.send(msg); }
- 釋出訂閱模式下的消費者定義
package com.futao.springbootdemo.foundation.mq.active.topic; import com.futao.springbootdemo.foundation.mq.active.ActiveMqConfig; import lombok.extern.slf4j.Slf4j; import org.apache.activemq.command.ActiveMQMessage; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Service; import javax.jms.JMSException; import javax.jms.TextMessage; /** * 訂閱的隊列是PTP模式還是Topic模式,與這邊的定義無關。取決于配置 * # 開啟topic模式 * spring: * jms: * pub-sub-domain: true * * @author futao * Created on 2019-06-04. */ @Slf4j @Service public class ActiveMqConsumer { /** * 訂閱testTopic -1 * * @param mqMessage * @throws JMSException */ @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeTestTopic) public void testTopicConsumer1(ActiveMQMessage mqMessage) throws JMSException { String text = ((TextMessage) mqMessage.getMessage()).getText(); log.info("testTopicConsumer1接收到activeMq-activeTestTopic消息:[{}]", text); } /** * 訂閱testTopic -2 * * @param mqMessage * @throws JMSException */ @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeTestTopic) public void testTopicConsumer2(ActiveMQMessage mqMessage) throws JMSException { String text = ((TextMessage) mqMessage.getMessage()).getText(); log.info("testTopicConsumer2接收到activeMq-activeTestTopic消息:[{}]", text); } /** * 訂閱prodTopic -1 * * @param mqMessage * @throws JMSException */ @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeProdTopic) public void prodTopicConsumer1(ActiveMQMessage mqMessage) throws JMSException { String text = ((TextMessage) mqMessage.getMessage()).getText(); log.info("prodTopicConsumer1接收到activeMq-activeProdTopic消息:[{}]", text); } /** * 訂閱 prodTopic -2 * * @param mqMessage * @throws JMSException */ @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeProdTopic) public void prodTopicConsumer2(ActiveMQMessage mqMessage) throws JMSException { String text = ((TextMessage) mqMessage.getMessage()).getText(); log.info("prodTopicConsumer2接收到activeMq-activeProdTopic消息:[{}]", text); } }
- 結果展示
- 釋出訂閱模式需要将
參考資料
SpringBoot與ActiveMQ整合實作手動ACK(事務模式與ack應答模式)TODO:
- 如何保證消費者将消息發送到ActiveMQ的過程中消息不丢失
- ActiveMQ的叢集與主從
- 消息的持久化
- 事務
- PTP模式下消費者多久沒ACK後ActiveMQ會認為該條消息消費失敗呢?(是不是有個消費逾時時間設定)。還是隻能等到該消費者下線。