SpringBoot 整合ActiveMQ_企業實戰 文章目錄
- 1. 建立Springboot工程
- 2. 引入maven依賴
- 3. ActiveMq配置類
- 4. MQ生産者
- 5. MQ 點對點消費者
- 6. MQ 釋出點閱消費者A
- 7. MQ 釋出點閱消費者B
- 8. 統一測試類
1. 建立Springboot工程
SpringBoot 整合ActiveMQ_企業實戰
SpringBoot 整合ActiveMQ_企業實戰
SpringBoot 整合ActiveMQ_企業實戰
SpringBoot 整合ActiveMQ_企業實戰 2. 引入maven依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gblfy</groupId>
<artifactId>springboot-activemq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>springboot-activemq</name>
<description>Spring Boot內建Activemq</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.7.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<!--全局編碼設定-->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!--JDK版本-->
<java.version>1.8</java.version>
<!--全局版本-->
<fastjson.version>1.2.58</fastjson.version>
<lombok.version>1.18.8</lombok.version>
</properties>
<dependencies>
<!--SpringMVC 啟動器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Activemq Start-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<!-- Activemq End-->
<!--資料處理-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!--lombok插件-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<!--單元測試-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!--maven編譯插件-->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3. ActiveMq配置類
package com.gblfy.activemq.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
/**
* @author gblfy
* @ClassNme ActiveMqConfig
* @Description Mq配置類
* @Date 2019/9/3 18:05
* @version1.0
*/
@Configuration
public class ActiveMqConfig {
// queue模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
// topic模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
}
4. MQ生産者
package com.gblfy.activemq.producer;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
/**
* @author gblfy
* @ClassNme MqProducer
* @Description Mq 生産者封裝公共類
* @Date 2019/9/3 18:05
* @version1.0
*/
@Service
public class MqProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* 發送字元串消息隊列
*
* @param queueName 隊列名稱
* @param message 字元串
*/
public void sendStringQueue(String queueName, String message) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message);
}
/**
* 發送字元串集合消息隊列
*
* @param queueName 隊列名稱
* @param list 字元串集合
*/
public void sendStringListQueue(String queueName, List<String> list) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), list);
}
/**
* 發送Map消息隊列
*
* @param queueName
* @param headers
*/
public void sendMapQueue(String queueName, Map<String, Object> headers) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), headers);
}
/**
* 發送對象消息隊列
*
* @param queueName 隊列名稱
* @param obj 對象
*/
public void sendObjQueue(String queueName, Serializable obj) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), obj);
}
/**
* 發送對象集合消息隊列
*
* @param queueName 隊列名稱
* @param objList 對象集合
*/
public void sendObjListQueue(String queueName, List<Serializable> objList) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), objList);
}
/**
* 發送字元串消息主題
*
* @param topicName 主題名稱
* @param message 字元串
*/
public void sendStringTopic(String topicName, String message) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), message);
}
/**
* 發送字元串集合消息主題
*
* @param topicName 主題名稱
* @param list 字元串集合
*/
public void sendStringListTopic(String topicName, List<String> list) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), list);
}
/**
* 發送Map消息主題
*
* @param queueName
* @param headers
*/
public void sendMapTopic(String queueName, Map<String, Object> headers) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), headers);
}
/**
* 發送對象消息主題
*
* @param topicName 主題名稱
* @param obj 對象
*/
public void sendObjTopic(String topicName, Serializable obj) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), obj);
}
/**
* 發送對象集合消息主題
*
* @param topicName 主題名稱
* @param objList 對象集合
*/
public void sendObjListTopic(String topicName, List<Serializable> objList) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), objList);
}
}
5. MQ 點對點消費者
package com.gblfy.activemq.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.ObjectMessage;
import java.util.List;
import java.util.Map;
/**
* @author gblfy
* @ClassNme QueueConsumer
* @Description Mq點對點消費者
* @Date 2019/9/3 18:05
* @version1.0
*/
@Component
public class QueueConsumer {
@JmsListener(destination = "stringQueue", containerFactory = "jmsListenerContainerQueue")
public void receiveStringQueue(String msg) {
System.out.println("接收到消息...." + msg);
}
@JmsListener(destination = "stringListQueue", containerFactory = "jmsListenerContainerQueue")
public void receiveStringListQueue(List<String> list) {
System.out.println("接收到集合隊列消息...." + list);
}
@JmsListener(destination = "mapQueue", containerFactory = "jmsListenerContainerQueue")
public void receiveMapQueue(Map<String, Object> headers) {
System.out.println("接收到集合隊列消息...." + headers);
}
@JmsListener(destination = "objQueue", containerFactory = "jmsListenerContainerQueue")
public void receiveObjQueue(ObjectMessage objectMessage) throws Exception {
System.out.println("接收到對象隊列消息...." + objectMessage.getObject());
}
@JmsListener(destination = "objListQueue", containerFactory = "jmsListenerContainerQueue")
public void receiveObjListQueue(ObjectMessage objectMessage) throws Exception {
System.out.println("接收到的對象隊列消息..." + objectMessage.getObject());
}
}
6. MQ 釋出點閱消費者A
package com.gblfy.activemq.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.ObjectMessage;
import java.util.List;
/**
* @author gblfy
* @ClassNme ATopicConsumer
* @Description Mq訂閱者A
* @Date 2019/9/3 18:05
* @version1.0
*/
@Component
public class ATopicConsumer {
@JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveStringTopic(String msg) {
System.out.println("ATopicConsumer接收到消息...." + msg);
}
@JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveStringListTopic(List<String> list) {
System.out.println("ATopicConsumer接收到集合主題消息...." + list);
}
@JmsListener(destination = "mapTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveMapTopic(List<String> list) {
System.out.println("ATopicConsumer接收到Map主題消息...." + list);
}
@JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {
System.out.println("ATopicConsumer接收到對象主題消息...." + objectMessage.getObject());
}
@JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception {
System.out.println("ATopicConsumer接收到的對象集合主題消息..." + objectMessage.getObject());
}
}
7. MQ 釋出點閱消費者B
package com.gblfy.activemq.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.ObjectMessage;
import java.util.List;
/**
* @author gblfy
* @ClassNme BTopicConsumer
* @Description Mq訂閱者B
* @Date 2019/9/3 18:05
* @version1.0
*/
@Component
public class BTopicConsumer {
@JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveStringTopic(String msg) {
System.out.println("BTopicConsumer接收到消息...." + msg);
}
@JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveStringListTopic(List<String> list) {
System.out.println("BTopicConsumer接收到集合主題消息...." + list);
}
@JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {
System.out.println("BTopicConsumer接收到對象主題消息...." + objectMessage.getObject());
}
@JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception {
System.out.println("BTopicConsumer接收到的對象集合主題消息..." + objectMessage.getObject());
}
}
8. 統一測試類
package com.gblfy.activemq.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author gblfy
* @ClassNme User
* @Description Mq發送接送對象模拟
* @Date 2019/9/3 18:05
* @version1.0
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
private String id;
private String name;
private Integer age;
}
package com.gblfy.activemq;
import com.gblfy.activemq.producer.MqProducer;
import com.gblfy.activemq.entity.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author gblfy
* @ClassNme ActivemqdemoApplicationTests
* @Description Mq 測試公共類
* @Date 2019/9/3 18:05
* @version1.0
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqdemoApplicationTests {
@Autowired
private MqProducer mqProducer;
/**************************MQ點對點測試場景***************************/
/**
* 點對點場景 01
* 消息類型 String
*/
@Test
public void testStringQueue() {
for (int i = 1; i <= 100; i++) {
System.out.println("第" + i + "次發送字元串隊列消息");
mqProducer.sendStringQueue("stringQueue", "消息:" + i);
}
}
/**
* 點對點場景 02
* 消息類型 StringList
*/
@Test
public void testStringListQueue() {
List<String> idList = new ArrayList<>();
idList.add("id1");
idList.add("id2");
idList.add("id3");
System.out.println("正在發送集合隊列消息ing......");
mqProducer.sendStringListQueue("stringListQueue", idList);
}
/**
* 點對點場景 03
* 消息類型 Map<String,Object></String,Object>
*/
@Test
public void testMapQueue() {
Map<String, Object> map = new HashMap<>();
map.put("1", "sxh");
map.put("2", "ljy");
map.put("3", "sh");
map.put("4", "qjj");
map.put("5", "ygf");
map.put("6", "lxj");
map.put("7", "gblfy");
System.out.println("正在發送Map隊列消息ing......");
mqProducer.sendMapQueue("mapQueue", map);
}
/**
* 點對點場景 04
* 消息類型 Obj
*/
@Test
public void testObjQueue() {
System.out.println("正在發送對象隊列消息......");
mqProducer.sendObjQueue("objQueue", new User("1", "小明", 20));
}
/**
* 點對點場景 05
* 消息類型 ObjList
*/
@Test
public void testObjListQueue() {
System.out.println("正在發送對象集合隊列消息......");
List<Serializable> userList = new ArrayList<>();
userList.add(new User("1", "雨昕", 01));
userList.add(new User("2", "劉英", 26));
userList.add(new User("3", "振振", 12));
mqProducer.sendObjListQueue("objListQueue", userList);
}
/**************************MQ釋出訂閱測試場景***************************/
/**
* 釋出訂閱場景 01
* 消息類型 String
*/
@Test
public void testStringTopic() {
for (int i = 1; i <= 100; i++) {
System.out.println("第" + i + "次發送字元串主題消息");
mqProducer.sendStringTopic("stringTopic", "消息:" + i);
}
}
/**
* 釋出訂閱場景 02
* 消息類型 StringList
*/
@Test
public void testStringListTopic() {
List<String> idList = new ArrayList<>();
idList.add("id1");
idList.add("id2");
idList.add("id3");
System.out.println("正在發送集合主題消息ing......");
mqProducer.sendStringListTopic("stringListTopic", idList);
}
/**
* 釋出訂閱場景 03
* 消息類型 Map
*/
@Test
public void testMapTopic() {
Map<String, Object> map = new HashMap<>();
map.put("1", "sxh");
map.put("2", "ljy");
map.put("3", "sh");
map.put("4", "qjj");
map.put("5", "ygf");
map.put("6", "lxj");
map.put("7", "gblfy");
System.out.println("正在發送Map隊列消息ing......");
mqProducer.sendMapTopic("mapQueue", map);
}
/**
* 釋出訂閱場景 04
* 消息類型 Obj
*/
@Test
public void testObjTopic() {
System.out.println("正在發送對象主題消息......");
mqProducer.sendObjTopic("objTopic", new User("1", "小明", 20));
}
/**
* 釋出訂閱場景 05
* 消息類型 ObjList
*/
@Test
public void testObjListTopic() {
System.out.println("正在發送對象集合主題消息......");
List<Serializable> userList = new ArrayList<>();
userList.add(new User("1", "雨昕", 01));
userList.add(new User("2", "劉英", 26));
userList.add(new User("3", "振振", 12));
mqProducer.sendObjListTopic("objListTopic", userList);
}
}
git clone [email protected]:gb-heima/springboot-activemq.git