天天看点

ActiveMQ基础与SpringBoot整合

ActiveMQ

实现了

JMS

规范。

ActiveMQ中相关概念术语

  1. Destination

    目的地

    消息将要发送的地方,包括:

    Queue

    Topic

    ,它们都对

    Destination

    接口进行了实现
    1. PTP

      模式 - Queue
    1. 发布订阅

      模式 - Topic

      MessageProvider

      需要指定

      Destination

      才能发送消息,

      MessageConsumer

      Destination

      才能接收和消费消息。
  2. Producer

    消息生产者

    消息生产者,负责将消息发送到目的地

    Destination

  3. Consumer

    消息消费者

    消息消费者,负责从目的地

    Destination

    消费消息。
  4. Message

    消息本体
  5. ConnectionFactory

    连接工厂

    用于创建连接的工厂

  6. Connection

    连接

    用户访问ActiveMQ

  7. Session

    会话

    一次持久有效有状态的访问,由

    Connection

    创建,是具体操作消息的基础支撑。

JMS

中定义了两种消息模型:点对点(point to point, queue)和发布/订阅(publish/subscribe,topic)。主要区别就是是能否重复消费。

JMS中Queue模式与Topic模式对比

Topic Queue
概要 Publish Subscribe messaging 发布订阅消息 Point-to-Point 点对点
有无状态 topic数据默认不落地,是无状态的。 Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOMEdatakr-storedata下面。也可以配置成DB存储。
完整性保障 并不保证publisher发布的每条数据,Subscriber都能接受到。 Queue保证每条数据都能被receiver接收。
消息是否会丢失 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。
消息发布接收策略 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

1. PTP Queue不可重复消费

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。

消息被消费以后(消费者ack应答确认/事务模式),queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。

Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。

当消费者不存在时,消息会一直保存,直到有消费消费

2. 发布订阅模式 Topic 可以重复消费

消息生产者(发布)将消息发布到Topic中,同时有多个消息消费者(订阅该Topic)消费该消息。

和点对点方式不同,发布到topic的消息会被所有订阅者消费。

当生产者发布消息,不管是否有消费者。都不会保存消息。如果生产者向队列发送消息时,没有消费者订阅该队列,则消息全部丢失。否则向所有订阅了该Topic的消费者发送同样的消息(即:消费者必须在线)

在SpringBoot中使用ActiveMQ

ActiveMQ管理地址: http://localhost:8161/admin/
  1. PTP模式
    • 依赖
      //jms-active
          compile 'org.springframework.boot:spring-boot-starter-activemq'
          //active连接池-1.5.13依赖
          compile 'org.apache.activemq:activemq-pool'           
    • 配置信息
      spring:
        # activemq
        activemq:
          broker-url: failover:(tcp://localhost:61616,tcp://localhost:666)?randomize=false      # tcp://localhost:61616/故障转移,默认情况下如果某个链接失效了,则从列表中随机获取一个,如果设置了randomize=false则是严格按照列表的先后顺序的
          user: admin           # 用户名
          password: admin       # 密码
          in-memory: false      # 基于内存的activemq
          close-timeout: 15s     # 在考虑结束之前等待的时间
          pool:
            enabled: true                               # 启动连接池(是否用Pooledconnectionfactory代替普通的ConnectionFactory)
            max-connections: 10                         # 最大链接数量
            idle-timeout: 60s                           # 空闲连接存活时间
            block-if-full: true                         # 当连接请求和池满时是否阻塞。设置false会抛“JMSException异常”
            block-if-full-timeout: -1                   # 如果池仍然满,则在抛出异常之前阻塞时间
            create-connection-on-startup: true          # 是否在启动时创建连接。可以在启动时用于加热池
            maximum-active-session-per-connection: 500  # 每个连接的有效会话的最大数目。
            reconnect-on-exception: true                # 当发生"JMSException"时尝试重新连接
        jms:
          pub-sub-domain: false                  # 默认情况下activemq提供的是queue模式,若要使用topic模式需要配置下面配置           
    • 定义

      PTP

      模式下的

      Destination

      -

      Queue

      /**
       * @author futao
       * Created on 2019-06-04.
       */
      @AllArgsConstructor
      @Getter
      public enum ActiveMqQueueEnum {
          /**
           * springboot-test-queue=测试Queue
           */
          TEST_QUEUE("springboot-test-queue", "测试Queue");
        
          private String queueName;
          private String desc;
      
          public static final String testQueue = "springboot-test-queue";
      }
      
      /**
       * @author futao
       * Created on 2019-06-04.
       */
      @Configuration
      public class ActiveMqConfig {
      
         /**
           * The ActiveMQConnectionFactory creates ActiveMQ Connections.
           * The PooledConnectionFactory pools Connections.
           * If you only need to create one Connection and keep it around for a long time you don't need to pool.
           * If you tend to create many Connection instances over time then Pooling is better as connecting is a heavy operation and can be a performance bottleneck.
           * <p>
           * 可以在这里统一设置JmsTemplate的一些配置,也可以在具体使用到JmsTemplate的时候单独设置
           * JmsMessageTemplate是对JmsTemplate的进一步封装
           * TODO 目前看起来不起作用
           *
           * @param factory
           * @return
           */
          //    @Primary
      //    @Bean
          public JmsTemplate jmsTemplate(PooledConnectionFactory factory) {
              JmsTemplate jmsTemplate = new JmsTemplate();
              //关闭事物
              jmsTemplate.setSessionTransacted(false);
              //TODO 在此设置无效
      //        jmsTemplate.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
              jmsTemplate.setConnectionFactory(factory);
              return jmsTemplate;
          }
        
          @Bean(name = ActiveMqQueueEnum.testQueue)
          public ActiveMQQueue activeTestQueue() {
              return new ActiveMQQueue(ActiveMqQueueEnum.TEST_QUEUE.getQueueName());
          }
        /**
           * 定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
           *
           * @param pooledConnectionFactory
           * @return
           */
          @Bean(name = "jmsQueueListener")
          public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(PooledConnectionFactory pooledConnectionFactory) {
              DefaultJmsListenerContainerFactory factory =
                      new DefaultJmsListenerContainerFactory();
              factory.setConnectionFactory(pooledConnectionFactory);
              factory.setSessionTransacted(false);
              factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
              return factory;
          }
      }             
    • PTP

      模式下的生产者
      package com.futao.springbootdemo.foundation.mq.active.ptp;
      
      import lombok.extern.slf4j.Slf4j;
      import org.apache.activemq.command.ActiveMQQueue;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.jms.core.JmsMessagingTemplate;
      import org.springframework.stereotype.Component;
      
      import javax.jms.JMSException;
      
      /**
       * PTP模式生产者
       *
       * @author futao
       * Created on 2019-06-06.
       */
      @Slf4j
      @Component
      public class PtpProducer {
        
          @Autowired
          private JmsMessagingTemplate jmsMessagingTemplate;
      
          /**
           * 目的地
           */
          @Qualifier("springboot-test-queue")
          @Autowired
          private ActiveMQQueue springBootTestQueue;
      
          public void send(String msg) {
              jmsMessagingTemplate.convertAndSend(springBootTestQueue, msg);
              try {
                  log.info("send to ActiveMQ-Queue[{}] success ,msg:[{}]", springBootTestQueue.getQueueName(), msg);
              } catch (JMSException e) {
                  e.printStackTrace();
              }
          }
      }
       
      /**
       * @author futao
       * Created on 2019-06-04.
       */
      @RequestMapping("/activemq")
      @RestController
      public class ActiveController {
        @Resource
          private PtpProducer ptpProducer;
      
          @PostMapping("/ptp/sender")
          public void ptpSender(@RequestParam String msg) {
              ptpProducer.send(msg);
          }
      }           
    • PTP

      模式下的消费者
      package com.futao.springbootdemo.foundation.mq.active.ptp;
      
      import com.futao.springbootdemo.foundation.mq.active.ActiveMqQueueEnum;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.activemq.ActiveMQConnectionFactory;
      import org.apache.activemq.command.ActiveMQMessage;
      import org.apache.activemq.command.ActiveMQQueue;
      import org.junit.Test;
      import org.springframework.jms.annotation.JmsListener;
      import org.springframework.stereotype.Service;
      
      import javax.jms.*;
      
      /**
       * @author futao
       * Created on 2019-06-06.
       */
      @Slf4j
      @Service
      public class PtpConsumer {
      
          @JmsListener(destination = ActiveMqQueueEnum.testQueue, containerFactory = "jmsQueueListener")
          public void ptpConsumer(ActiveMQMessage message) throws JMSException {
              String text = ((TextMessage) message).getText();
              if ("节日快乐666".equalsIgnoreCase(text)) {
                  message.acknowledge();    //ack手动确认
              }
              log.info("receive message from activeMQ :[{}]", text);
          }
        /**
         * 手动创建ActiveMQConnectionFactory消费消息,生产消息也类似
         */
          @Test
          public void test() throws Exception {
              ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61616");
              Connection connection = connectionFactory.createConnection();
              Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//开启ack手动确认
              MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(ActiveMqQueueEnum.TEST_QUEUE.getQueueName()));
              connection.start();
              consumer.setMessageListener(message -> {
                  try {
                      String text = ((TextMessage) message).getText();
                      System.out.println(("收到消息:{}" + text));
                      if ("节日快乐666".equalsIgnoreCase(text)) {
                          message.acknowledge();    //ack手动确认
                      }
                  } catch (JMSException e) {
                      e.printStackTrace();
                  }
              });
              Thread.sleep(999999999);
          }
      }
                 
  • 特点
    • 一条消息只会发送给其中某一个单独的消费者
    • 未被确认的消息将再次发送给其他消费
  1. 发布订阅模式
    • 发布订阅模式需要将

      spring.jms.pub-sub-domain=true

      ,其他配置不需要修改
    • 定义发布订阅模式下的

      Destination

      Topic

      /**
       * @author futao
       * Created on 2019-06-04.
       */
      @Configuration
      public class ActiveMqConfig {
        /**
           * ActiveMQ topic的定义
           */
          public static class TopicDefinition {
              public static final String activeTestTopic = "active-test-topic";
              public static final String activeProdTopic = "active-prod-topic";
          }
      
          /**
           * 定义一个名为BeanName为activeTestTopic的Topic:active-test-topic
           *
           * @return
           */
          @Bean(name = "activeTestTopic")
          public ActiveMQTopic activeMQTestTopic() {
              return new ActiveMQTopic(TopicDefinition.activeTestTopic);
          }
      
          /**
           * 定义一个名为BeanName为activeProdTopic的Topic:active-prod-topic
           *
           * @return
           */
          @Bean(name = "activeProdTopic")
          public ActiveMQTopic activeMQProdTopic() {
              return new ActiveMQTopic(TopicDefinition.activeProdTopic);
          }
      }
          @PostMapping("/ps/sender")
          public void pushTest(@RequestParam String msg) {
              activeMqProducer.send(msg);
          }           
    • 发布订阅模式下的消费者定义
      package com.futao.springbootdemo.foundation.mq.active.topic;
      
      import com.futao.springbootdemo.foundation.mq.active.ActiveMqConfig;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.activemq.command.ActiveMQMessage;
      import org.springframework.jms.annotation.JmsListener;
      import org.springframework.stereotype.Service;
      
      import javax.jms.JMSException;
      import javax.jms.TextMessage;
      
      /**
       * 订阅的队列是PTP模式还是Topic模式,与这边的定义无关。取决于配置
       * # 开启topic模式
       * spring:
       * jms:
       * pub-sub-domain: true
       *
       * @author futao
       * Created on 2019-06-04.
       */
      @Slf4j
      @Service
      public class ActiveMqConsumer {
      
          /**
           * 订阅testTopic  -1
           *
           * @param mqMessage
           * @throws JMSException
           */
          @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeTestTopic)
          public void testTopicConsumer1(ActiveMQMessage mqMessage) throws JMSException {
              String text = ((TextMessage) mqMessage.getMessage()).getText();
              log.info("testTopicConsumer1接收到activeMq-activeTestTopic消息:[{}]", text);
          }
      
          /**
           * 订阅testTopic  -2
           *
           * @param mqMessage
           * @throws JMSException
           */
          @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeTestTopic)
          public void testTopicConsumer2(ActiveMQMessage mqMessage) throws JMSException {
              String text = ((TextMessage) mqMessage.getMessage()).getText();
              log.info("testTopicConsumer2接收到activeMq-activeTestTopic消息:[{}]", text);
          }
      
          /**
           * 订阅prodTopic  -1
           *
           * @param mqMessage
           * @throws JMSException
           */
          @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeProdTopic)
          public void prodTopicConsumer1(ActiveMQMessage mqMessage) throws JMSException {
              String text = ((TextMessage) mqMessage.getMessage()).getText();
              log.info("prodTopicConsumer1接收到activeMq-activeProdTopic消息:[{}]", text);
          }
      
          /**
           * 订阅 prodTopic  -2
           *
           * @param mqMessage
           * @throws JMSException
           */
          @JmsListener(destination = ActiveMqConfig.TopicDefinition.activeProdTopic)
          public void prodTopicConsumer2(ActiveMQMessage mqMessage) throws JMSException {
              String text = ((TextMessage) mqMessage.getMessage()).getText();
              log.info("prodTopicConsumer2接收到activeMq-activeProdTopic消息:[{}]", text);
          }
      }           
    • 结果展示
    **发送到Topic的消息被所有订阅了该Topic的消费者接收

参考资料

SpringBoot与ActiveMQ整合实现手动ACK(事务模式与ack应答模式)

TODO:

  • 如何保证消费者将消息发送到ActiveMQ的过程中消息不丢失
  • ActiveMQ的集群与主从
  • 消息的持久化
  • 事务
  • PTP模式下消费者多久没ACK后ActiveMQ会认为该条消息消费失败呢?(是不是有个消费超时时间设置)。还是只能等到该消费者下线。

继续阅读