天天看點

SpringBoot 整合ActiveMQ_企業實戰

SpringBoot 整合ActiveMQ_企業實戰

文章目錄

  • ​​1. 建立Springboot工程​​
  • ​​2. 引入maven依賴​​
  • ​​3. ActiveMq配置類​​
  • ​​4. MQ生産者​​
  • ​​5. MQ 點對點消費者​​
  • ​​6. MQ 釋出點閱消費者A​​
  • ​​7. MQ 釋出點閱消費者B​​
  • ​​8. 統一測試類​​

1. 建立Springboot工程

SpringBoot 整合ActiveMQ_企業實戰
SpringBoot 整合ActiveMQ_企業實戰
SpringBoot 整合ActiveMQ_企業實戰
SpringBoot 整合ActiveMQ_企業實戰

2. 引入maven依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.gblfy</groupId>
    <artifactId>springboot-activemq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>springboot-activemq</name>
    <description>Spring Boot內建Activemq</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.7.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <!--全局編碼設定-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!--JDK版本-->
        <java.version>1.8</java.version>
        <!--全局版本-->
        <fastjson.version>1.2.58</fastjson.version>
        <lombok.version>1.18.8</lombok.version>
    </properties>

    <dependencies>

        <!--SpringMVC 啟動器-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Activemq Start-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        <!-- Activemq End-->

        <!--資料處理-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!--lombok插件-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>

        <!--單元測試-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!--maven編譯插件-->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
      

3. ActiveMq配置類

package com.gblfy.activemq.config;


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.ConnectionFactory;

/**
 * @author gblfy
 * @ClassNme ActiveMqConfig
 * @Description Mq配置類
 * @Date 2019/9/3 18:05
 * @version1.0
 */
@Configuration
public class ActiveMqConfig {


    // queue模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }


    // topic模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }

}
      

4. MQ生産者

package com.gblfy.activemq.producer;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

/**
 * @author gblfy
 * @ClassNme MqProducer
 * @Description Mq 生産者封裝公共類
 * @Date 2019/9/3 18:05
 * @version1.0
 */
@Service
public class MqProducer {


    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;


    /**
     * 發送字元串消息隊列
     *
     * @param queueName 隊列名稱
     * @param message   字元串
     */
    public void sendStringQueue(String queueName, String message) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message);
    }


    /**
     * 發送字元串集合消息隊列
     *
     * @param queueName 隊列名稱
     * @param list      字元串集合
     */
    public void sendStringListQueue(String queueName, List<String> list) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), list);
    }

    /**
     * 發送Map消息隊列
     *
     * @param queueName
     * @param headers
     */
    public void sendMapQueue(String queueName, Map<String, Object> headers) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), headers);
    }

    /**
     * 發送對象消息隊列
     *
     * @param queueName 隊列名稱
     * @param obj       對象
     */
    public void sendObjQueue(String queueName, Serializable obj) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), obj);
    }


    /**
     * 發送對象集合消息隊列
     *
     * @param queueName 隊列名稱
     * @param objList   對象集合
     */
    public void sendObjListQueue(String queueName, List<Serializable> objList) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), objList);
    }


    /**
     * 發送字元串消息主題
     *
     * @param topicName 主題名稱
     * @param message   字元串
     */
    public void sendStringTopic(String topicName, String message) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), message);
    }


    /**
     * 發送字元串集合消息主題
     *
     * @param topicName 主題名稱
     * @param list      字元串集合
     */
    public void sendStringListTopic(String topicName, List<String> list) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), list);
    }
    /**
     * 發送Map消息主題
     *
     * @param queueName
     * @param headers
     */
    public void sendMapTopic(String queueName, Map<String, Object> headers) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), headers);
    }

    /**
     * 發送對象消息主題
     *
     * @param topicName 主題名稱
     * @param obj       對象
     */
    public void sendObjTopic(String topicName, Serializable obj) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), obj);
    }


    /**
     * 發送對象集合消息主題
     *
     * @param topicName 主題名稱
     * @param objList   對象集合
     */
    public void sendObjListTopic(String topicName, List<Serializable> objList) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), objList);
    }

}
      

5. MQ 點對點消費者

package com.gblfy.activemq.consumer;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.ObjectMessage;
import java.util.List;
import java.util.Map;

/**
 * @author gblfy
 * @ClassNme QueueConsumer
 * @Description Mq點對點消費者
 * @Date 2019/9/3 18:05
 * @version1.0
 */
@Component
public class QueueConsumer {

    @JmsListener(destination = "stringQueue", containerFactory = "jmsListenerContainerQueue")
    public void receiveStringQueue(String msg) {
        System.out.println("接收到消息...." + msg);
    }


    @JmsListener(destination = "stringListQueue", containerFactory = "jmsListenerContainerQueue")
    public void receiveStringListQueue(List<String> list) {
        System.out.println("接收到集合隊列消息...." + list);
    }

    @JmsListener(destination = "mapQueue", containerFactory = "jmsListenerContainerQueue")
    public void receiveMapQueue(Map<String, Object> headers) {
        System.out.println("接收到集合隊列消息...." + headers);
    }


    @JmsListener(destination = "objQueue", containerFactory = "jmsListenerContainerQueue")
    public void receiveObjQueue(ObjectMessage objectMessage) throws Exception {
        System.out.println("接收到對象隊列消息...." + objectMessage.getObject());
    }


    @JmsListener(destination = "objListQueue", containerFactory = "jmsListenerContainerQueue")
    public void receiveObjListQueue(ObjectMessage objectMessage) throws Exception {
        System.out.println("接收到的對象隊列消息..." + objectMessage.getObject());
    }
}
      

6. MQ 釋出點閱消費者A

package com.gblfy.activemq.consumer;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.ObjectMessage;
import java.util.List;
/**
 * @author gblfy
 * @ClassNme ATopicConsumer
 * @Description Mq訂閱者A
 * @Date 2019/9/3 18:05
 * @version1.0
 */
@Component
public class ATopicConsumer {

    @JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveStringTopic(String msg) {
        System.out.println("ATopicConsumer接收到消息...." + msg);
    }


    @JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveStringListTopic(List<String> list) {
        System.out.println("ATopicConsumer接收到集合主題消息...." + list);
    }

    @JmsListener(destination = "mapTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveMapTopic(List<String> list) {
        System.out.println("ATopicConsumer接收到Map主題消息...." + list);
    }


    @JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {
        System.out.println("ATopicConsumer接收到對象主題消息...." + objectMessage.getObject());
    }


    @JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception {
        System.out.println("ATopicConsumer接收到的對象集合主題消息..." + objectMessage.getObject());
    }

}
      

7. MQ 釋出點閱消費者B

package com.gblfy.activemq.consumer;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.ObjectMessage;
import java.util.List;

/**
 * @author gblfy
 * @ClassNme BTopicConsumer
 * @Description Mq訂閱者B
 * @Date 2019/9/3 18:05
 * @version1.0
 */
@Component
public class BTopicConsumer {

    @JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveStringTopic(String msg) {
        System.out.println("BTopicConsumer接收到消息...." + msg);
    }


    @JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveStringListTopic(List<String> list) {
        System.out.println("BTopicConsumer接收到集合主題消息...." + list);
    }


    @JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {
        System.out.println("BTopicConsumer接收到對象主題消息...." + objectMessage.getObject());
    }


    @JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")
    public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception {
        System.out.println("BTopicConsumer接收到的對象集合主題消息..." + objectMessage.getObject());
    }

}
      

8. 統一測試類

package com.gblfy.activemq.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * @author gblfy
 * @ClassNme User
 * @Description Mq發送接送對象模拟
 * @Date 2019/9/3 18:05
 * @version1.0
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {

    private String id;
    private String name;
    private Integer age;
}
      
package com.gblfy.activemq;

import com.gblfy.activemq.producer.MqProducer;
import com.gblfy.activemq.entity.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author gblfy
 * @ClassNme ActivemqdemoApplicationTests
 * @Description Mq 測試公共類
 * @Date 2019/9/3 18:05
 * @version1.0
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqdemoApplicationTests {

    @Autowired
    private MqProducer mqProducer;


    /**************************MQ點對點測試場景***************************/

    /**
     * 點對點場景 01
     * 消息類型 String
     */
    @Test
    public void testStringQueue() {

        for (int i = 1; i <= 100; i++) {
            System.out.println("第" + i + "次發送字元串隊列消息");
            mqProducer.sendStringQueue("stringQueue", "消息:" + i);
        }
    }

    /**
     * 點對點場景 02
     * 消息類型 StringList
     */
    @Test
    public void testStringListQueue() {

        List<String> idList = new ArrayList<>();
        idList.add("id1");
        idList.add("id2");
        idList.add("id3");

        System.out.println("正在發送集合隊列消息ing......");
        mqProducer.sendStringListQueue("stringListQueue", idList);
    }

    /**
     * 點對點場景 03
     * 消息類型 Map<String,Object></String,Object>
     */
    @Test
    public void testMapQueue() {
        Map<String, Object> map = new HashMap<>();
        map.put("1", "sxh");
        map.put("2", "ljy");
        map.put("3", "sh");
        map.put("4", "qjj");
        map.put("5", "ygf");
        map.put("6", "lxj");
        map.put("7", "gblfy");
        System.out.println("正在發送Map隊列消息ing......");
        mqProducer.sendMapQueue("mapQueue", map);
    }

    /**
     * 點對點場景 04
     * 消息類型 Obj
     */
    @Test
    public void testObjQueue() {

        System.out.println("正在發送對象隊列消息......");
        mqProducer.sendObjQueue("objQueue", new User("1", "小明", 20));
    }

    /**
     * 點對點場景 05
     * 消息類型 ObjList
     */
    @Test
    public void testObjListQueue() {

        System.out.println("正在發送對象集合隊列消息......");

        List<Serializable> userList = new ArrayList<>();
        userList.add(new User("1", "雨昕", 01));
        userList.add(new User("2", "劉英", 26));
        userList.add(new User("3", "振振", 12));

        mqProducer.sendObjListQueue("objListQueue", userList);
    }

    /**************************MQ釋出訂閱測試場景***************************/

    /**
     * 釋出訂閱場景 01
     * 消息類型 String
     */
    @Test
    public void testStringTopic() {

        for (int i = 1; i <= 100; i++) {
            System.out.println("第" + i + "次發送字元串主題消息");
            mqProducer.sendStringTopic("stringTopic", "消息:" + i);
        }
    }

    /**
     * 釋出訂閱場景 02
     * 消息類型 StringList
     */
    @Test
    public void testStringListTopic() {

        List<String> idList = new ArrayList<>();
        idList.add("id1");
        idList.add("id2");
        idList.add("id3");

        System.out.println("正在發送集合主題消息ing......");
        mqProducer.sendStringListTopic("stringListTopic", idList);
    }

    /**
     * 釋出訂閱場景 03
     * 消息類型 Map
     */
    @Test
    public void testMapTopic() {
        Map<String, Object> map = new HashMap<>();
        map.put("1", "sxh");
        map.put("2", "ljy");
        map.put("3", "sh");
        map.put("4", "qjj");
        map.put("5", "ygf");
        map.put("6", "lxj");
        map.put("7", "gblfy");
        System.out.println("正在發送Map隊列消息ing......");
        mqProducer.sendMapTopic("mapQueue", map);
    }

    /**
     * 釋出訂閱場景 04
     * 消息類型 Obj
     */
    @Test
    public void testObjTopic() {

        System.out.println("正在發送對象主題消息......");
        mqProducer.sendObjTopic("objTopic", new User("1", "小明", 20));
    }

    /**
     * 釋出訂閱場景 05
     * 消息類型 ObjList
     */
    @Test
    public void testObjListTopic() {

        System.out.println("正在發送對象集合主題消息......");

        List<Serializable> userList = new ArrayList<>();
        userList.add(new User("1", "雨昕", 01));
        userList.add(new User("2", "劉英", 26));
        userList.add(new User("3", "振振", 12));

        mqProducer.sendObjListTopic("objListTopic", userList);
    }
}
      
git clone [email protected]:gb-heima/springboot-activemq.git