天天看點

SpringBoot 整合ActiveMQ

文章目錄

  • ​​1. 引入依賴​​
  • ​​2. 配置檔案​​
  • ​​3. 生産者​​
  • ​​4. 配置config​​
  • ​​5. queue消費者​​
  • ​​6. topic消費者​​
  • ​​6. ActiveMQ 消息存儲規則​​

1. 引入依賴

pom檔案引入activemq依賴

<!--activeMq配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.3</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.7</version>
        </dependency>      

2. 配置檔案

spring:
  activemq:
    user: admin
    password: admin
    broker-url: failover:(tcp://192.168.43.666:61616)
    #是否信任所有包(如果傳遞的是對象則需要設定為true,預設是傳字元串)
    packages:
      trust-all: true
    #連接配接池
    pool:
      enabled: true
      max-connections: 5
      idle-timeout: 30000
#      expiry-timeout: 0
    jms:
      #預設使用queue模式,使用topic則需要設定為true
      pub-sub-domain: true

      # 是否信任所有包
      #spring.activemq.packages.trust-all=
      # 要信任的特定包的逗号分隔清單(當不信任所有包時)
      #spring.activemq.packages.trusted=
      # 當連接配接請求和池滿時是否阻塞。設定false會抛“JMSException異常”。
      #spring.activemq.pool.block-if-full=true
      # 如果池仍然滿,則在抛出異常前阻塞時間。
      #spring.activemq.pool.block-if-full-timeout=-1ms
      # 是否在啟動時建立連接配接。可以在啟動時用于加熱池。
      #spring.activemq.pool.create-connection-on-startup=true
      # 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
      #spring.activemq.pool.enabled=false
      # 連接配接過期逾時。
      #spring.activemq.pool.expiry-timeout=0ms
      # 連接配接空閑逾時
      #spring.activemq.pool.idle-timeout=30s
      # 連接配接池最大連接配接數
      #spring.activemq.pool.max-connections=1
      # 每個連接配接的有效會話的最大數目。
      #spring.activemq.pool.maximum-active-session-per-connection=500
      # 當有"JMSException"時嘗試重新連接配接
      #spring.activemq.pool.reconnect-on-exception=true
      # 在空閑連接配接清除線程之間運作的時間。當為負數時,沒有空閑連接配接驅逐線程運作。
      #spring.activemq.pool.time-between-expiration-check=-1ms
      # 是否隻使用一個MessageProducer
      #spring.activemq.pool.use-anonymous-producers=true      

3. 生産者

package com.gblfy.producer;

import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.jms.*;
import java.io.Serializable;

/**
 * 發送消息
 *
 * @author gblfy
 * @date 2022-11-02
 */
@RestController
@RequestMapping(value = "/active")
public class SendController {
    //也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    /**
     * 發送消息接口
     * 發送queue消息 :http://127.0.0.1:8080/active/send?msg=ceshi1234
     * 發送topic 消息: http://127.0.0.1:8080/active/topic/send?msg=ceshi1234
     * 發送queue消息(延遲time毫秒) :http://127.0.0.1:8080/active/send?msg=ceshi1234&time=5000
     *
     * @param msg  消息
     * @param type url中參數,非必須
     * @param time
     * @return
     */
    @RequestMapping({"/send", "/{type}/send"})
    public String send(@PathVariable(value = "type", required = false) String type, String msg, Long time) {
        Destination destination = null;
        if (type == null) {
            type = "";
        }
        switch (type) {
            case "topic":
                //發送廣播消息
                destination = new ActiveMQTopic("active.topic");
                break;
            default:
                //發送 隊列消息
                destination = new ActiveMQQueue("active.queue");
                break;
        }
        // System.out.println("開始請求發送:"+DateUtil.getStringDate(new Date(),"yyyy-MM-dd HH:mm:ss"));
        if (time != null && time > 0) {
            //延遲隊列,延遲time毫秒
            //延遲隊列需要在 <broker>标簽上增加屬性 schedulerSupport="true"
            delaySend(destination, msg, time);
        } else {
            jmsMessagingTemplate.convertAndSend(destination, msg);//無序
            //jmsMessagingTemplate.convertSendAndReceive();//有序
        }
        return "activemq消息發送成功 隊列消息:" + msg;
    }

    /**
     * 延時發送
     * 說明:延遲隊列需要在 <broker>标簽上增加屬性 schedulerSupport="true"
     *
     * @param destination 發送的隊列
     * @param data        發送的消息
     * @param time        延遲時間 /毫秒
     */
    public <T extends Serializable> void delaySend(Destination destination, T data, Long time) {
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        // 擷取連接配接工廠
        ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory();
        try {
            // 擷取連接配接
            connection = connectionFactory.createConnection();
            connection.start();
            // 擷取session,true開啟事務,false關閉事務
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 建立一個消息隊列
            producer = session.createProducer(destination);
            producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
            ObjectMessage message = session.createObjectMessage(data);
            //設定延遲時間
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
            // 發送消息
            producer.send(message);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (producer != null) {
                    producer.close();
                }
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}      

4. 配置config

package com.gblfy.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.Queue;
import javax.jms.Topic;

/**
 * 描述:
 * activemq 有兩種模式 queue 和 topic
 * queue 模式是單對單,有多個消費者的情況下則是使用輪詢監聽
 * topic 模式/廣播模式/釋出訂閱模式 是一對多,發送消息所有的消費者都能夠監聽到
 *
 * @author gblfy
 * @date 2022-11-02
 */
@EnableJms
@Configuration
public class ActiveMQConfig {
    //隊列名
    private static final String queueName = "active.queue";
    //主題名
    private static final String topicName = "active.topic";

    @Value("${spring.activemq.user:}")
    private String username;
    @Value("${spring.activemq.password:}")
    private String password;
    @Value("${spring.activemq.broker-url:}")
    private String brokerUrl;

    @Bean
    public Queue acQueue() {
        return new ActiveMQQueue(queueName);
    }

    @Bean
    public Topic acTopic() {
        return new ActiveMQTopic(topicName);
    }

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(username, password, brokerUrl);
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        // 關閉Session事務,手動确認與事務沖突
        bean.setSessionTransacted(false);
        // 設定消息的簽收模式(自己簽收)
        /**
         * AUTO_ACKNOWLEDGE = 1 :自動确認
         * CLIENT_ACKNOWLEDGE = 2:用戶端手動确認
         * DUPS_OK_ACKNOWLEDGE = 3: 自動批量确認
         * SESSION_TRANSACTED = 0:事務送出并确認
         * 但是在activemq補充了一個自定義的ACK模式:
         * INDIVIDUAL_ACKNOWLEDGE = 4:單條消息确認
         **/
        bean.setSessionAcknowledgeMode(4);
        //此處設定消息重發規則,redeliveryPolicy() 中定義
        connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        // 關閉Session事務,手動确認與事務沖突
        bean.setSessionTransacted(false);
        bean.setSessionAcknowledgeMode(4);
        //設定為釋出訂閱方式, 預設情況下使用的生産消費者方式
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    /**
     * 消息的重發規則配置
     */
    @Bean
    public RedeliveryPolicy redeliveryPolicy() {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        // 是否在每次嘗試重新發送失敗後,增長這個等待時間
        redeliveryPolicy.setUseExponentialBackOff(true);
        // 重發次數五次, 總共六次
        redeliveryPolicy.setMaximumRedeliveries(5);
        // 重發時間間隔,預設為1000ms(1秒)
        redeliveryPolicy.setInitialRedeliveryDelay(1000);
        // 重發時長遞增的時間倍數2
        redeliveryPolicy.setBackOffMultiplier(2);
        // 是否避免消息碰撞
        redeliveryPolicy.setUseCollisionAvoidance(false);
        // 設定重發最大拖延時間-1表示無延遲限制
        redeliveryPolicy.setMaximumRedeliveryDelay(-1);
        return redeliveryPolicy;
    }
}      

5. queue消費者

package com.gblfy.listener;

import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Session;

/**
 * TODO
 *
 * @author gblfy
 * @Date 2022-11-02
 **/
@Component
public class QueueListener {

    /**
     * queue 模式 單對單,兩個消費者監聽同一個隊列則通過輪詢接收消息
     * containerFactory屬性的值關聯config類中的聲明
     *
     * @param msg
     */
    @JmsListener(destination = "active.queue", containerFactory = "jmsListenerContainerQueue")
    public void queueListener(ActiveMQMessage message, Session session, String msg) throws JMSException {
        try {
            System.out.println("active queue 接收到消息 " + msg);
            //手動簽收
            message.acknowledge();
        } catch (Exception e) {
            //重新發送
            session.recover();
        }
    }
}      

6. topic消費者

package com.gblfy.listener;

import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Session;

/**
 * TODO
 *
 * @author gblfy
 * @Date 2022-11-02
 **/
@Component
public class TopicListener {

    /**
     * topic 模式/廣播模式/釋出訂閱模式 一對多,多個消費者可同時接收到消息
     * topic 模式無死信隊列,死信隊列是queue模式
     * containerFactory屬性的值關聯config類中的聲明
     *
     * @param msg
     */
    @JmsListener(destination = "active.topic", containerFactory = "jmsListenerContainerTopic")
    public void topicListener(ActiveMQMessage message, Session session, String msg) throws JMSException {
        try {
            // System.out.println("接收到消息:" + DateUtil.getStringDate(new Date(), "yyyy-MM-dd HH:mm:ss"));
            System.out.println("active topic 接收到消息 " + msg);
            System.out.println("");
            //手動簽收
            message.acknowledge();
        } catch (Exception e) {
            //重新發送
            session.recover();
        }
    }

    @JmsListener(destination = "active.topic", containerFactory = "jmsListenerContainerTopic")
    public void topicListener2(ActiveMQMessage message, Session session, String msg) throws JMSException {
        try {
            // System.out.println("接收到消息:" + DateUtil.getStringDate(new Date(), "yyyy-MM-dd HH:mm:ss"));
            System.out.println("active topic2 接收到消息 " + msg);
            System.out.println("");
            //手動簽收
            message.acknowledge();
        } catch (Exception e) {
            //重新發送
            session.recover();
        }
    }
}      

6. ActiveMQ 消息存儲規則

QUEUE 點對點:

特點:消息遵循先到先得,消息隻能被一個消費者消費。

消息存儲規則:消費者消費消息成功,MQ服務端消息删除

TOPIC訂閱模式: 消息屬于廣播(訂閱)模式,消息會被所有的topic消費者消費消息。

消息存儲規則:所有消費者消費成功,MQ服務端消息删除,有一個消息沒有沒有消費完成,消息也會存儲在MQ服務端。