天天看點

ActiveMQ基礎與SpringBoot整合

ActiveMQ

實作了

JMS

規範。

ActiveMQ中相關概念術語

  1. Destination

    目的地

    消息将要發送的地方,包括:

    Queue

    Topic

    ,它們都對

    Destination

    接口進行了實作
    1. PTP

      模式 - Queue
    1. 釋出訂閱

      模式 - Topic

      MessageProvider

      需要指定

      Destination

      才能發送消息,

      MessageConsumer

      Destination

      才能接收和消費消息。
  2. Producer

    消息生産者

    消息生産者,負責将消息發送到目的地

    Destination

  3. Consumer

    消息消費者

    消息消費者,負責從目的地

    Destination

    消費消息。
  4. Message

    消息本體
  5. ConnectionFactory

    連接配接工廠

    用于建立連接配接的工廠

  6. Connection

    連接配接

    使用者通路ActiveMQ

  7. Session

    會話

    一次持久有效有狀态的通路,由

    Connection

    建立,是具體操作消息的基礎支撐。

JMS

中定義了兩種消息模型:點對點(point to point, queue)和釋出/訂閱(publish/subscribe,topic)。主要差別就是是能否重複消費。

JMS中Queue模式與Topic模式對比

Topic Queue
概要 Publish Subscribe messaging 釋出訂閱消息 Point-to-Point 點對點
有無狀态 topic資料預設不落地,是無狀态的。 Queue資料預設會在mq伺服器上以檔案形式儲存,比如Active MQ一般儲存在$AMQ_HOMEdatakr-storedata下面。也可以配置成DB存儲。
完整性保障 并不保證publisher釋出的每條資料,Subscriber都能接受到。 Queue保證每條資料都能被receiver接收。
消息是否會丢失 一般來說publisher釋出消息到某一個topic時,隻有正在監聽該topic位址的sub能夠接收到消息;如果沒有sub在監聽,該topic就丢失了。 Sender發送消息到目标Queue,receiver可以異步接收這個Queue上的消息。Queue上的消息如果暫時沒有receiver來取,也不會丢失。
消息釋出接收政策 一對多的消息釋出接收政策,監聽同一個topic位址的多個sub都能收到publisher發送的消息。Sub接收完通知mq伺服器 一對一的消息釋出接收政策,一個sender發送的消息,隻能有一個receiver接收。receiver接收完後,通知mq伺服器已接收,mq伺服器對queue裡的消息采取删除或其他操作。

1. PTP Queue不可重複消費

消息生産者生産消息發送到queue中,然後消息消費者從queue中取出并且消費消息。

消息被消費以後(消費者ack應答确認/事務模式),queue中不再有存儲,是以消息消費者不可能消費到已經被消費的消息。

Queue支援存在多個消費者,但是對一個消息而言,隻會有一個消費者可以消費、其它的則不能消費此消息了。

當消費者不存在時,消息會一直儲存,直到有消費消費

2. 釋出訂閱模式 Topic 可以重複消費

消息生産者(釋出)将消息釋出到Topic中,同時有多個消息消費者(訂閱該Topic)消費該消息。

和點對點方式不同,釋出到topic的消息會被所有訂閱者消費。

當生産者釋出消息,不管是否有消費者。都不會儲存消息。如果生産者向隊列發送消息時,沒有消費者訂閱該隊列,則消息全部丢失。否則向所有訂閱了該Topic的消費者發送同樣的消息(即:消費者必須線上)

在SpringBoot中使用ActiveMQ

ActiveMQ管理位址: http://localhost:8161/admin/
  1. PTP模式
    • 依賴
      //jms-active
          compile 'org.springframework.boot:spring-boot-starter-activemq'
          //active連接配接池-1.5.13依賴
          compile 'org.apache.activemq:activemq-pool'           
    • 配置資訊
      spring:
        # activemq
        activemq:
          broker-url: failover:(tcp://localhost:61616,tcp://localhost:666)?randomize=false      # tcp://localhost:61616/故障轉移,預設情況下如果某個連結失效了,則從清單中随機擷取一個,如果設定了randomize=false則是嚴格按照清單的先後順序的
          user: admin           # 使用者名
          password: admin       # 密碼
          in-memory: false      # 基于記憶體的activemq
          close-timeout: 15s     # 在考慮結束之前等待的時間
          pool:
            enabled: true                               # 啟動連接配接池(是否用Pooledconnectionfactory代替普通的ConnectionFactory)
            max-connections: 10                         # 最大連結數量
            idle-timeout: 60s                           # 空閑連接配接存活時間
            block-if-full: true                         # 當連接配接請求和池滿時是否阻塞。設定false會抛“JMSException異常”
            block-if-full-timeout: -1                   # 如果池仍然滿,則在抛出異常之前阻塞時間
            create-connection-on-startup: true          # 是否在啟動時建立連接配接。可以在啟動時用于加熱池
            maximum-active-session-per-connection: 500  # 每個連接配接的有效會話的最大數目。
            reconnect-on-exception: true                # 當發生"JMSException"時嘗試重新連接配接
        jms:
          pub-sub-domain: false                  # 預設情況下activemq提供的是queue模式,若要使用topic模式需要配置下面配置           
    • 定義

      PTP

      模式下的

      Destination

      -

      Queue

      /**
       * @author futao
       * Created on 2019-06-04.
       */
      @AllArgsConstructor
      @Getter
      public enum ActiveMqQueueEnum {
          /**
           * springboot-test-queue=測試Queue
           */
          TEST_QUEUE("springboot-test-queue", "測試Queue");
        
          private String queueName;
          private String desc;
      
          public static final String testQueue = "springboot-test-queue";
      }
      
      /**
       * @author futao
       * Created on 2019-06-04.
       */
      @Configuration
      public class ActiveMqConfig {
      
         /**
           * The ActiveMQConnectionFactory creates ActiveMQ Connections.
           * The PooledConnectionFactory pools Connections.
           * If you only need to create one Connection and keep it around for a long time you don't need to pool.
           * If you tend to create many Connection instances over time then Pooling is better as connecting is a heavy operation and can be a performance bottleneck.
           * <p>
           * 可以在這裡統一設定JmsTemplate的一些配置,也可以在具體使用到JmsTemplate的時候單獨設定
           * JmsMessageTemplate是對JmsTemplate的進一步封裝
           * TODO 目前看起來不起作用
           *
           * @param factory
           * @return
           */
          //    @Primary
      //    @Bean
          public JmsTemplate jmsTemplate(PooledConnectionFactory factory) {
              JmsTemplate jmsTemplate = new JmsTemplate();
              //關閉事物
              jmsTemplate.setSessionTransacted(false);
              //TODO 在此設定無效
      //        jmsTemplate.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
              jmsTemplate.setConnectionFactory(factory);
              return jmsTemplate;
          }
        
          @Bean(name = ActiveMqQueueEnum.testQueue)
          public ActiveMQQueue activeTestQueue() {
              return new ActiveMQQueue(ActiveMqQueueEnum.TEST_QUEUE.getQueueName());
          }
        /**
           * 定義一個消息監聽器連接配接工廠,這裡定義的是點對點模式的監聽器連接配接工廠
           *
           * @param pooledConnectionFactory
           * @return
           */
          @Bean(name = "jmsQueueListener")
          public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(PooledConnectionFactory pooledConnectionFactory) {
              DefaultJmsListenerContainerFactory factory =
                      new DefaultJmsListenerContainerFactory();
              factory.setConnectionFactory(pooledConnectionFactory);
              factory.setSessionTransacted(false);
              factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
              return factory;
          }
      }             
    • PTP

      模式下的生産者
      package com.futao.springbootdemo.foundation.mq.active.ptp;
      
      import lombok.extern.slf4j.Slf4j;
      import org.apache.activemq.command.ActiveMQQueue;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.jms.core.JmsMessagingTemplate;
      import org.springframework.stereotype.Component;
      
      import javax.jms.JMSException;
      
      /**
       * PTP模式生産者
       *
       * @author futao
       * Created on 2019-06-06.
       */
      @Slf4j
      @Component
      public class PtpProducer {
        
          @Autowired
          private JmsMessagingTemplate jmsMessagingTemplate;
      
          /**
           * 目的地
           */
          @Qualifier("springboot-test-queue")
          @Autowired
          private ActiveMQQueue springBootTestQueue;
      
          public void send(String msg) {
              jmsMessagingTemplate.convertAndSend(springBootTestQueue, msg);
              try {
                  log.info("send to ActiveMQ-Queue[{}] success ,msg:[{}]", springBootTestQueue.getQueueName(), msg);
              } catch (JMSException e) {
                  e.printStackTrace();
              }
          }
      }
       
      /**
       * @author futao
       * Created on 2019-06-04.
       */
      @RequestMapping("/activemq")
      @RestController
      public class ActiveController {
        @Resource
          private PtpProducer ptpProducer;
      
          @PostMapping("/ptp/sender")
          public void ptpSender(@RequestParam String msg) {
              ptpProducer.send(msg);
          }
      }           
    • PTP

      模式下的消費者
      package com.futao.springbootdemo.foundation.mq.active.ptp;
      
      import com.futao.springbootdemo.foundation.mq.active.ActiveMqQueueEnum;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.activemq.ActiveMQConnectionFactory;
      import org.apache.activemq.command.ActiveMQMessage;
      import org.apache.activemq.command.ActiveMQQueue;
      import org.junit.Test;
      import org.springframework.jms.annotation.JmsListener;
      import org.springframework.stereotype.Service;
      
      import javax.jms.*;
      
      /**
       * @author futao
       * Created on 2019-06-06.
       */
      @Slf4j
      @Service
      public class PtpConsumer {
      
          @JmsListener(destination = ActiveMqQueueEnum.testQueue, containerFactory = "jmsQueueListener")
          public void ptpConsumer(ActiveMQMessage message) throws JMSException {
              String text = ((TextMessage) message).getText();
              if ("節日快樂666".equalsIgnoreCase(text)) {
                  message.acknowledge();    //ack手動确認
              }
              log.info("receive message from activeMQ :[{}]", text);
          }
        /**
         * 手動建立ActiveMQConnectionFactory消費消息,生産消息也類似
         */
          @Test
          public void test() throws Exception {
              ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61616");
              Connection connection = connectionFactory.createConnection();
              Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//開啟ack手動确認
              MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(ActiveMqQueueEnum.TEST_QUEUE.getQueueName()));
              connection.start();
              consumer.setMessageListener(message -> {
                  try {
                      String text = ((TextMessage) message).getText();
                      System.out.println(("收到消息:{}" + text));
                      if ("節日快樂666".equalsIgnoreCase(text)) {
                          message.acknowledge();    //ack手動确認
                      }
                  } catch (JMSException e) {
                      e.printStackTrace();
                  }
              });
              Thread.sleep(999999999);
          }
      }
                 
  • 特點
    • 一條消息隻會發送給其中某一個單獨的消費者
    • 未被确認的消息将再次發送給其他消費
  1. 釋出訂閱模式
    • 釋出訂閱模式需要将

      spring.jms.pub-sub-domain=true

      ,其他配置不需要修改
    • 定義釋出訂閱模式下的

      Destination

      Topic

      /**
       * @author futao
       * Created on 2019-06-04.
       */
      @Configuration
      public class ActiveMqConfig {
        /**
           * ActiveMQ topic的定義
           */
          public static class TopicDefinition {
              public static final String activeTestTopic = "active-test-topic";
              public static final String activeProdTopic = "active-prod-topic";
          }
      
          /**
           * 定義一個名為BeanName為activeTestTopic的Topic:active-test-topic
           *
           * @return
           */
          @Bean(name = "activeTestTopic")
          public ActiveMQTopic activeMQTestTopic() {
              return new ActiveMQTopic(TopicDefinition.activeTestTopic);
          }
      
          /**
           * 定義一個名為BeanName為activeProdTopic的Topic:active-prod-topic
           *
           * @return
           */
          @Bean(name = "activeProdTopic")
          public ActiveMQTopic activeMQProdTopic() {
              return new ActiveMQTopic(TopicDefinition.activeProdTopic);
          }
      }
          @PostMapping("/ps/sender")
          public void pushTest(@RequestParam String msg) {
              activeMqProducer.send(msg);
          }           
    • 釋出訂閱模式下的消費者定義
      package com.futao.springbootdemo.foundation.mq.active.topic;
      
      import com.futao.springbootdemo.foundation.mq.active.ActiveMqConfig;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.activemq.command.ActiveMQMessage;
      import org.springframework.jms.annotation.JmsListener;
      import org.springframework.stereotype.Service;
      
      import javax.jms.JMSException;
      import javax.jms.TextMessage;
      
      /**
       * 訂閱的隊列是PTP模式還是Topic模式,與這邊的定義無關。取決于配置
       * # 開啟topic模式
       * spring:
       * jms:
       * pub-sub-domain: true
       *
       * @author futao
       * Created on 2019-06-04.
       */
      @Slf4j
      @Service
      public class ActiveMqConsumer {
      
          /**
           * 訂閱testTopic  -1
           *
           * @param mqMessage
           * @throws JMSException
           */
          @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeTestTopic)
          public void testTopicConsumer1(ActiveMQMessage mqMessage) throws JMSException {
              String text = ((TextMessage) mqMessage.getMessage()).getText();
              log.info("testTopicConsumer1接收到activeMq-activeTestTopic消息:[{}]", text);
          }
      
          /**
           * 訂閱testTopic  -2
           *
           * @param mqMessage
           * @throws JMSException
           */
          @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeTestTopic)
          public void testTopicConsumer2(ActiveMQMessage mqMessage) throws JMSException {
              String text = ((TextMessage) mqMessage.getMessage()).getText();
              log.info("testTopicConsumer2接收到activeMq-activeTestTopic消息:[{}]", text);
          }
      
          /**
           * 訂閱prodTopic  -1
           *
           * @param mqMessage
           * @throws JMSException
           */
          @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeProdTopic)
          public void prodTopicConsumer1(ActiveMQMessage mqMessage) throws JMSException {
              String text = ((TextMessage) mqMessage.getMessage()).getText();
              log.info("prodTopicConsumer1接收到activeMq-activeProdTopic消息:[{}]", text);
          }
      
          /**
           * 訂閱 prodTopic  -2
           *
           * @param mqMessage
           * @throws JMSException
           */
          @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeProdTopic)
          public void prodTopicConsumer2(ActiveMQMessage mqMessage) throws JMSException {
              String text = ((TextMessage) mqMessage.getMessage()).getText();
              log.info("prodTopicConsumer2接收到activeMq-activeProdTopic消息:[{}]", text);
          }
      }           
    • 結果展示
    **發送到Topic的消息被所有訂閱了該Topic的消費者接收

參考資料

SpringBoot與ActiveMQ整合實作手動ACK(事務模式與ack應答模式)

TODO:

  • 如何保證消費者将消息發送到ActiveMQ的過程中消息不丢失
  • ActiveMQ的叢集與主從
  • 消息的持久化
  • 事務
  • PTP模式下消費者多久沒ACK後ActiveMQ會認為該條消息消費失敗呢?(是不是有個消費逾時時間設定)。還是隻能等到該消費者下線。

繼續閱讀