天天看點

RabbitMQ與Spring的架構整合之Spring AMQP實戰

1、SpringAMQP使用者管理元件RabbitAdmin。

  RabbitAdmin類可以很好的操作RabbitMQ,在Spring中直接進行注入即可。注意,autoStartup必須設定為true,否則Spring容器不會加載RabbitAdmin類。RabbitAdmin底層實作就是從Spring容器中擷取Exchange交換機、Binding綁定、RoutingKey路由鍵以及Queue隊列的@Bean聲明。

  然後使用RabbitTemplate的execute方法執行對應的聲明、修改、删除等一系列RabbitMQ基礎功能操作。例如,添加一個交換機、删除一個綁定、清空一個隊列裡面的消息等待操作。

2、由于使用的maven工程配合了Springboot整合Spring與RabbitMQ的知識。是以先引入依賴包,如下所示:

1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
 5     https://maven.apache.org/xsd/maven-4.0.0.xsd">
 6     <modelVersion>4.0.0</modelVersion>
 7     <parent>
 8         <groupId>org.springframework.boot</groupId>
 9         <artifactId>spring-boot-starter-parent</artifactId>
10         <version>2.2.1.RELEASE</version>
11         <relativePath /> <!-- lookup parent from repository -->
12     </parent>
13     <groupId>com.bie</groupId>
14     <artifactId>rabbitmq-spring</artifactId>
15     <version>0.0.1-SNAPSHOT</version>
16     <name>rabbitmq-spring</name>
17     <description>Demo project for Spring Boot</description>
18 
19     <properties>
20         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
21         <project.reporting.outputEncoding>UTF-8
22         </project.reporting.outputEncoding>
23         <java.version>1.8</java.version>
24         <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
25     </properties>
26 
27     <dependencies>
28         <!-- Spring與RabbitMQ整合的包 -->
29         <dependency>
30             <groupId>org.springframework.boot</groupId>
31             <artifactId>spring-boot-starter-amqp</artifactId>
32         </dependency>
33         <dependency>
34             <groupId>org.springframework.boot</groupId>
35             <artifactId>spring-boot-starter-web</artifactId>
36         </dependency>
37 
38         <dependency>
39             <groupId>org.springframework.boot</groupId>
40             <artifactId>spring-boot-starter-test</artifactId>
41             <scope>test</scope>
42             <exclusions>
43                 <exclusion>
44                     <groupId>org.junit.vintage</groupId>
45                     <artifactId>junit-vintage-engine</artifactId>
46                 </exclusion>
47             </exclusions>
48         </dependency>
49         <dependency>
50             <groupId>org.springframework.amqp</groupId>
51             <artifactId>spring-rabbit-test</artifactId>
52             <scope>test</scope>
53         </dependency>
54         <!-- RabbitMQ基礎核心包 -->
55         <dependency>
56             <groupId>com.rabbitmq</groupId>
57             <artifactId>amqp-client</artifactId>
58         </dependency>
59     </dependencies>
60 
61     <build>
62         <plugins>
63             <plugin>
64                 <groupId>org.springframework.boot</groupId>
65                 <artifactId>spring-boot-maven-plugin</artifactId>
66             </plugin>
67         </plugins>
68     </build>
69 
70 </project>      

由于使用的是Springboot項目配合RabbitMQ來做的,是以配置檔案這裡使用了注解來替換,是以啟動的時候,加載如下所示配置類,如下所示:

1 package com.bie;
 2 
 3 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
 4 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 5 import org.springframework.amqp.rabbit.core.RabbitAdmin;
 6 import org.springframework.context.annotation.Bean;
 7 import org.springframework.context.annotation.ComponentScan;
 8 import org.springframework.context.annotation.Configuration;
 9 
10 /**
11  * 
12  * @author biehl
13  *
14  */
15 @Configuration
16 @ComponentScan(basePackages = "com.bie")
17 public class RabbitMQConfig {
18 
19     /**
20      * 将ConnectionFactory注入到bean容器中
21      * 
22      * @return
23      */
24     @Bean
25     public ConnectionFactory connectionFactory() {
26         CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
27         connectionFactory.setAddresses("192.168.110.133:5672");
28         connectionFactory.setUsername("guest");
29         connectionFactory.setPassword("guest");
30         connectionFactory.setVirtualHost("/");
31         return connectionFactory;
32     }
33 
34     /**
35      * 參數依賴上面注入的ConnectionFactory類,是以保持參數名稱和注入的ConnectionFactory一緻
36      * 
37      * @param connectionFactory
38      * @return
39      */
40     @Bean
41     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
42         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
43         // 注意:autoStartup必須設定為true,否則Spring容器不會加載RabbitAdmin類。
44         rabbitAdmin.setAutoStartup(true);
45         return rabbitAdmin;
46     }
47 
48 }      

項目主啟動類,如下所示:

1 package com.bie;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 
 6 @SpringBootApplication
 7 public class RabbitmqSpringApplication {
 8 
 9     public static void main(String[] args) {
10         SpringApplication.run(RabbitmqSpringApplication.class, args);
11     }
12 
13 }      

下面示範一下,Spring整合RabbitMQ,建立交換機,建立隊列,将交換機和隊列綁定的示例代碼,如下所示:

1 package com.bie;
 2 
 3 import java.util.HashMap;
 4 
 5 import org.junit.Test;
 6 import org.junit.runner.RunWith;
 7 import org.springframework.amqp.core.Binding;
 8 import org.springframework.amqp.core.BindingBuilder;
 9 import org.springframework.amqp.core.DirectExchange;
10 import org.springframework.amqp.core.FanoutExchange;
11 import org.springframework.amqp.core.Queue;
12 import org.springframework.amqp.core.TopicExchange;
13 import org.springframework.amqp.rabbit.core.RabbitAdmin;
14 import org.springframework.beans.factory.annotation.Autowired;
15 import org.springframework.boot.test.context.SpringBootTest;
16 import org.springframework.test.context.junit4.SpringRunner;
17 
18 /**
19  * 
20  * @author biehl
21  *
22  */
23 @SpringBootTest
24 @RunWith(SpringRunner.class)
25 public class RabbitmqSpringApplicationTests {
26 
27     @Autowired
28     private RabbitAdmin rabbitAdmin;
29 
30     @Test
31     public void rabbitmqAdmin() {
32         // 參數1交換機名稱, 參數1是否持久化durable, 參數3是否自動删除 autoDelete
33         // 建立direct類型的交換機
34         DirectExchange directExchange = new DirectExchange("test.directExchange", false, false);
35         rabbitAdmin.declareExchange(directExchange);
36 
37         // 建立topic類型的交換機
38         TopicExchange topicExchange = new TopicExchange("test.topicExchange", false, false);
39         rabbitAdmin.declareExchange(topicExchange);
40 
41         // 建立fanout類型的交換機
42         FanoutExchange fanoutExchange = new FanoutExchange("test.fanoutExchange", false, false);
43         rabbitAdmin.declareExchange(fanoutExchange);
44 
45         // 建立direct類型的隊列
46         Queue directQueue = new Queue("test.direct.queue", false);
47         rabbitAdmin.declareQueue(directQueue);
48 
49         // 建立topic類型的交隊列
50         Queue topicQueue = new Queue("test.topic.queue", false);
51         rabbitAdmin.declareQueue(topicQueue);
52 
53         // 建立fanout類型的隊列
54         Queue fanoutQueue = new Queue("test.fanout.queue", false);
55         rabbitAdmin.declareQueue(fanoutQueue);
56 
57         // 聲明綁定
58         // 參數1 String destination,可以認為是具體的隊列。
59         // 參數2 DestinationType destinationType,綁定的類型。
60         // 參數3 String exchange,交換機的名稱。
61         // 參數4 String routingKey,路由鍵的名稱。
62         // 參數5 Map<String, Object> arguments可以傳入的參數。
63         // 将test.directExchange交換機和test.direct.queue隊列進行綁定
64         rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE,
65                 "test.directExchange", "direct", new HashMap<>()));
66 
67         // 将test.topicExchange交換機和test.topic.queue隊列進行綁定
68         // rabbitAdmin.declareBinding(new Binding("test.topic.queue",
69         // Binding.DestinationType.QUEUE, "test.topicExchange",
70         // "topic", new HashMap<>()));
71         rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.topic.queue", false)) // 直接建立隊列
72                 .to(new TopicExchange("test.topicExchange", false, false)) // 直接建立交換機 建立關聯關系
73                 .with("user.#")); // 指定路由Key
74 
75         // 将test.fanoutExchange交換機和test.fanout.queue隊列進行綁定
76         // rabbitAdmin.declareBinding(new Binding("test.fanout.queue",
77         // Binding.DestinationType.QUEUE,
78         // "test.fanoutExchange", "", new HashMap<>()));
79 
80         rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.fanout.queue", false)) // 直接建立隊列
81                 .to(new FanoutExchange("test.fanoutExchange", false, false))); // 直接建立交換機 建立關聯關系
82 
83         // 清空隊列資料
84         rabbitAdmin.purgeQueue("test.direct.queue", false);
85         rabbitAdmin.purgeQueue("test.topic.queue", false);
86         rabbitAdmin.purgeQueue("test.fanout.queue", false);
87     }
88 
89 }      

執行代碼,完畢,可以在RabbitMQ的管控台查詢效果,效果如下所示:

RabbitMQ與Spring的架構整合之Spring AMQP實戰

3、使用SpringAMQP去聲明,就需要使用SpringAMQP的如下模式,即聲明@Bean方式。SpringAMQP-RabbitMQ聲明式配置使用。可以在初始化加載配置檔案中建立好交換機,隊列,以及交換機和隊列的綁定關系,啟動項目即可将交換機,隊列,以及交換機和隊列的綁定建立,如下所示:

1 package com.bie;
  2 
  3 import org.springframework.amqp.core.Binding;
  4 import org.springframework.amqp.core.BindingBuilder;
  5 import org.springframework.amqp.core.Queue;
  6 import org.springframework.amqp.core.TopicExchange;
  7 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  8 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  9 import org.springframework.amqp.rabbit.core.RabbitAdmin;
 10 import org.springframework.context.annotation.Bean;
 11 import org.springframework.context.annotation.ComponentScan;
 12 import org.springframework.context.annotation.Configuration;
 13 
 14 /**
 15  * 
 16  * @author biehl
 17  *
 18  */
 19 @Configuration
 20 @ComponentScan(basePackages = "com.bie")
 21 public class RabbitMQConfig {
 22 
 23     /**
 24      * 将ConnectionFactory注入到bean容器中
 25      * 
 26      * @return
 27      */
 28     @Bean
 29     public ConnectionFactory connectionFactory() {
 30         CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
 31         connectionFactory.setAddresses("192.168.110.133:5672");
 32         connectionFactory.setUsername("guest");
 33         connectionFactory.setPassword("guest");
 34         connectionFactory.setVirtualHost("/");
 35         return connectionFactory;
 36     }
 37 
 38     /**
 39      * 參數依賴上面注入的ConnectionFactory類,是以保持參數名稱和注入的ConnectionFactory一緻
 40      * 
 41      * @param connectionFactory
 42      * @return
 43      */
 44     @Bean
 45     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
 46         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
 47         // 注意:autoStartup必須設定為true,否則Spring容器不會加載RabbitAdmin類。
 48         rabbitAdmin.setAutoStartup(true);
 49         return rabbitAdmin;
 50     }
 51 
 52     // 使用SpringAMQP去聲明,就需要使用SpringAMQP的如下模式,即聲明@Bean方式。
 53     // SpringAMQP-RabbitMQ聲明式配置使用
 54 
 55     /**
 56      * 針對消費者配置:
 57      * 
 58      * 1. 設定交換機類型。
 59      * 
 60      * 2. 将隊列綁定到交換機。
 61      * 
 62      * FanoutExchange: 将消息分發到所有的綁定隊列,無routingkey的概念。
 63      * 
 64      * HeadersExchange :通過添加屬性key-value比對。
 65      * 
 66      * DirectExchange:按照routingkey分發到指定隊列。
 67      * 
 68      * TopicExchange:多關鍵字比對。
 69      */
 70     @Bean
 71     public TopicExchange topicExchange001() {
 72         return new TopicExchange("topic001", true, false);
 73     }
 74 
 75     @Bean
 76     public Queue queue001() {
 77         return new Queue("queue001", true);// 隊列持久化
 78     }
 79 
 80     @Bean
 81     public Binding bingding001() {
 82         return BindingBuilder.bind(queue001()).to(topicExchange001()).with("spring.*");
 83     }
 84 
 85     // 第二個交換機通過路由鍵綁定到隊列上面。
 86     @Bean
 87     public TopicExchange topicExchange002() {
 88         return new TopicExchange("topic002", true, false);
 89     }
 90 
 91     @Bean
 92     public Queue queue002() {
 93         return new Queue("queue002", true);// 隊列持久化
 94     }
 95 
 96     @Bean
 97     public Binding bingding002() {
 98         return BindingBuilder.bind(queue002()).to(topicExchange002()).with("rabbit.*");
 99     }
100 
101     // 第三個,隊列通過路由鍵綁定到第一個隊列上面,即第一個交換機綁定了兩個隊列。
102     @Bean
103     public Queue queue003() {
104         return new Queue("queue003", true);// 隊列持久化
105     }
106 
107     @Bean
108     public Binding bingding003() {
109         return BindingBuilder.bind(queue003()).to(topicExchange001()).with("mq.*");
110     }
111 
112     @Bean
113     public Queue queue_image() {
114         return new Queue("image_queue", true); // 隊列持久
115     }
116 
117     @Bean
118     public Queue queue_pdf() {
119         return new Queue("pdf_queue", true); // 隊列持久
120     }
121 
122 }      

4、RabbitTemplate,即消息模闆,我們在與SpringAMQP整合的時候進行發送消息的關鍵類。該類提供了豐富的發送消息方法,包括可靠性投遞消息方法,回調監聽消息接口ConfirmCallback,傳回值确認ReturnCallBack等等。同樣我們需要進行注入到Spring容器中,然後直接使用。在與Spring整合的時候需要執行個體化,但是在與SpringBoot整合的時候,在配置檔案裡面添加配置即可。

1 package com.bie;
  2 
  3 import org.springframework.amqp.core.Binding;
  4 import org.springframework.amqp.core.BindingBuilder;
  5 import org.springframework.amqp.core.Queue;
  6 import org.springframework.amqp.core.TopicExchange;
  7 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  8 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  9 import org.springframework.amqp.rabbit.core.RabbitAdmin;
 10 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 11 import org.springframework.context.annotation.Bean;
 12 import org.springframework.context.annotation.ComponentScan;
 13 import org.springframework.context.annotation.Configuration;
 14 
 15 /**
 16  * 
 17  * @author biehl
 18  *
 19  */
 20 @Configuration
 21 @ComponentScan(basePackages = "com.bie")
 22 public class RabbitMQConfig {
 23 
 24     /**
 25      * 将ConnectionFactory注入到bean容器中
 26      * 
 27      * @return
 28      */
 29     @Bean
 30     public ConnectionFactory connectionFactory() {
 31         CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
 32         connectionFactory.setAddresses("192.168.110.133:5672");
 33         connectionFactory.setUsername("guest");
 34         connectionFactory.setPassword("guest");
 35         connectionFactory.setVirtualHost("/");
 36         return connectionFactory;
 37     }
 38 
 39     /**
 40      * 參數依賴上面注入的ConnectionFactory類,是以保持參數名稱和注入的ConnectionFactory一緻
 41      * 
 42      * @param connectionFactory
 43      * @return
 44      */
 45     @Bean
 46     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
 47         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
 48         // 注意:autoStartup必須設定為true,否則Spring容器不會加載RabbitAdmin類。
 49         rabbitAdmin.setAutoStartup(true);
 50         return rabbitAdmin;
 51     }
 52 
 53     // 使用SpringAMQP去聲明,就需要使用SpringAMQP的如下模式,即聲明@Bean方式。
 54     // SpringAMQP-RabbitMQ聲明式配置使用
 55 
 56     /**
 57      * 針對消費者配置:
 58      * 
 59      * 1. 設定交換機類型。
 60      * 
 61      * 2. 将隊列綁定到交換機。
 62      * 
 63      * FanoutExchange: 将消息分發到所有的綁定隊列,無routingkey的概念。
 64      * 
 65      * HeadersExchange :通過添加屬性key-value比對。
 66      * 
 67      * DirectExchange:按照routingkey分發到指定隊列。
 68      * 
 69      * TopicExchange:多關鍵字比對。
 70      */
 71     @Bean
 72     public TopicExchange topicExchange001() {
 73         return new TopicExchange("topic001", true, false);
 74     }
 75 
 76     @Bean
 77     public Queue queue001() {
 78         return new Queue("queue001", true);// 隊列持久化
 79     }
 80 
 81     @Bean
 82     public Binding bingding001() {
 83         return BindingBuilder.bind(queue001()).to(topicExchange001()).with("spring.*");
 84     }
 85 
 86     // 第二個交換機通過路由鍵綁定到隊列上面。
 87     @Bean
 88     public TopicExchange topicExchange002() {
 89         return new TopicExchange("topic002", true, false);
 90     }
 91 
 92     @Bean
 93     public Queue queue002() {
 94         return new Queue("queue002", true);// 隊列持久化
 95     }
 96 
 97     @Bean
 98     public Binding bingding002() {
 99         return BindingBuilder.bind(queue002()).to(topicExchange002()).with("rabbit.*");
100     }
101 
102     // 第三個,隊列通過路由鍵綁定到第一個隊列上面,即第一個交換機綁定了兩個隊列。
103     @Bean
104     public Queue queue003() {
105         return new Queue("queue003", true);// 隊列持久化
106     }
107 
108     @Bean
109     public Binding bingding003() {
110         return BindingBuilder.bind(queue003()).to(topicExchange001()).with("mq.*");
111     }
112 
113     @Bean
114     public Queue queue_image() {
115         return new Queue("image_queue", true); // 隊列持久
116     }
117 
118     @Bean
119     public Queue queue_pdf() {
120         return new Queue("pdf_queue", true); // 隊列持久
121     }
122 
123     // RabbitTemplate,即消息模闆,我們在與SpringAMQP整合的時候進行發送消息的關鍵類。
124     // 該類提供了豐富的發送消息方法,包括可靠性投遞消息方法,回調監聽消息接口ConfirmCallback,
125     // 傳回值确認ReturnCallBack等等。同樣我們需要進行注入到Spring容器中,然後直接使用。
126     // 将RabbitTemplate加入到Spring容器中
127     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
128         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
129         return rabbitTemplate;
130     }
131 
132 }      

使用RabbittEmplate發送消息的案例,由于結合初始化配置檔案建立的交換機,隊列以及交換機和隊列的綁定,将消息發送到自己建立的交換機,隊列上面,是以效果請自己仔細檢視,如下所示:

1 package com.bie;
 2 
 3 import java.util.HashMap;
 4 
 5 import org.junit.Test;
 6 import org.junit.runner.RunWith;
 7 import org.springframework.amqp.AmqpException;
 8 import org.springframework.amqp.core.Binding;
 9 import org.springframework.amqp.core.BindingBuilder;
10 import org.springframework.amqp.core.DirectExchange;
11 import org.springframework.amqp.core.FanoutExchange;
12 import org.springframework.amqp.core.Message;
13 import org.springframework.amqp.core.MessagePostProcessor;
14 import org.springframework.amqp.core.MessageProperties;
15 import org.springframework.amqp.core.Queue;
16 import org.springframework.amqp.core.TopicExchange;
17 import org.springframework.amqp.rabbit.core.RabbitAdmin;
18 import org.springframework.amqp.rabbit.core.RabbitTemplate;
19 import org.springframework.beans.factory.annotation.Autowired;
20 import org.springframework.boot.test.context.SpringBootTest;
21 import org.springframework.test.context.junit4.SpringRunner;
22 
23 /**
24  * 
25  * @author biehl
26  *
27  */
28 @SpringBootTest
29 @RunWith(SpringRunner.class)
30 public class RabbitmqSpringApplicationTests {
31 
32     // 發送消息
33     @Autowired
34     private RabbitTemplate rabbitTemplate;
35 
36     @Test
37     public void sendMessage() {
38         // 1、建立消息
39         MessageProperties messageProperties = new MessageProperties();
40         messageProperties.getHeaders().put("desc", "消息描述");
41         messageProperties.getHeaders().put("type", "消息類型");
42         Message message = new Message("hello RabbitMQ".getBytes(), messageProperties);
43 
44         // 2、發送消息
45         String exchange = "topic001";
46         String routingKey = "spring.amqp";
47         rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() {
48 
49             @Override
50             public Message postProcessMessage(Message message) throws AmqpException {
51                 System.out.println("======添加額外的設定======");
52                 message.getMessageProperties().getHeaders().put("desc", "額外的消息描述");
53                 message.getMessageProperties().getHeaders().put("attr", "額外的屬性");
54                 return message;
55             }
56         });
57     }
58 
59     @Test
60     public void sendMessage2() throws Exception {
61         // 1 建立消息
62         MessageProperties messageProperties = new MessageProperties();
63         messageProperties.setContentType("text/plain");
64         Message message = new Message("RabbitMQ的消息.......".getBytes(), messageProperties);
65 
66         // 2、發送消息
67         rabbitTemplate.send("topic001", "spring.abc", message);
68 
69         rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
70         rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
71     }
72 
73 }      

運作效果如下所示:

RabbitMQ與Spring的架構整合之Spring AMQP實戰

5、SpringAMQP消息容器SimpleMessageListenerContainer。

  1)、簡單消息監聽器,這個類非常的強大,我們可以對他進行很多設定,對于消費者的設定項,這個類都可以滿足。可以監聽隊列(多個隊列),自動啟動,自動聲明功能。可以設定事務特性、事務管理器、事務屬性、事務容量(并發)、是否開啟事務、復原消息等等。可以設定消費者數量、最小最大數量、批量消費等等。可以設定消息确認和自動确認模式,是否重回隊列、異常捕獲handler函數。可以設定消費者标簽生成政策,是否獨占模式,消費者屬性等等。可以設定具體的監聽器、消息轉換器等等。

  2)、注意,SpringAMQP消息容器SimpleMessageListenerContainer可以進行動态設定,比如在運作中的應用可以動态的修改其消費者數量的大小,接收消息的模式等等。很多基于RabbitMQ的自制定化後端控制台在進行動态設定的時候,也是根據這一特性實作的。

1 package com.bie;
  2 
  3 import java.util.UUID;
  4 
  5 import org.springframework.amqp.core.AcknowledgeMode;
  6 import org.springframework.amqp.core.Binding;
  7 import org.springframework.amqp.core.BindingBuilder;
  8 import org.springframework.amqp.core.Message;
  9 import org.springframework.amqp.core.Queue;
 10 import org.springframework.amqp.core.TopicExchange;
 11 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
 12 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 13 import org.springframework.amqp.rabbit.core.RabbitAdmin;
 14 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 15 import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
 16 import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
 17 import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
 18 import org.springframework.amqp.support.ConsumerTagStrategy;
 19 import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter;
 20 import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
 21 import org.springframework.context.annotation.Bean;
 22 import org.springframework.context.annotation.ComponentScan;
 23 import org.springframework.context.annotation.Configuration;
 24 
 25 import com.bie.adapter.MessageDelegate;
 26 import com.bie.convert.ImageMessageConverter;
 27 import com.bie.convert.PDFMessageConverter;
 28 import com.bie.convert.TextMessageConverter;
 29 import com.rabbitmq.client.Channel;
 30 
 31 /**
 32  * 
 33  * @author biehl
 34  *
 35  */
 36 @Configuration
 37 @ComponentScan(basePackages = "com.bie")
 38 public class RabbitMQConfig {
 39 
 40     /**
 41      * 将ConnectionFactory注入到bean容器中
 42      * 
 43      * @return
 44      */
 45     @Bean
 46     public ConnectionFactory connectionFactory() {
 47         CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
 48         connectionFactory.setAddresses("192.168.110.133:5672");
 49         connectionFactory.setUsername("guest");
 50         connectionFactory.setPassword("guest");
 51         connectionFactory.setVirtualHost("/");
 52         return connectionFactory;
 53     }
 54 
 55     /**
 56      * 參數依賴上面注入的ConnectionFactory類,是以保持參數名稱和注入的ConnectionFactory一緻
 57      * 
 58      * @param connectionFactory
 59      * @return
 60      */
 61     @Bean
 62     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
 63         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
 64         // 注意:autoStartup必須設定為true,否則Spring容器不會加載RabbitAdmin類。
 65         rabbitAdmin.setAutoStartup(true);
 66         return rabbitAdmin;
 67     }
 68 
 69     // 使用SpringAMQP去聲明,就需要使用SpringAMQP的如下模式,即聲明@Bean方式。
 70     // SpringAMQP-RabbitMQ聲明式配置使用
 71 
 72     /**
 73      * 針對消費者配置:
 74      * 
 75      * 1. 設定交換機類型。
 76      * 
 77      * 2. 将隊列綁定到交換機。
 78      * 
 79      * FanoutExchange: 将消息分發到所有的綁定隊列,無routingkey的概念。
 80      * 
 81      * HeadersExchange :通過添加屬性key-value比對。
 82      * 
 83      * DirectExchange:按照routingkey分發到指定隊列。
 84      * 
 85      * TopicExchange:多關鍵字比對。
 86      */
 87     @Bean
 88     public TopicExchange topicExchange001() {
 89         return new TopicExchange("topic001", true, false);
 90     }
 91 
 92     @Bean
 93     public Queue queue001() {
 94         return new Queue("queue001", true);// 隊列持久化
 95     }
 96 
 97     @Bean
 98     public Binding bingding001() {
 99         return BindingBuilder.bind(queue001()).to(topicExchange001()).with("spring.*");
100     }
101 
102     // 第二個交換機通過路由鍵綁定到隊列上面。
103     @Bean
104     public TopicExchange topicExchange002() {
105         return new TopicExchange("topic002", true, false);
106     }
107 
108     @Bean
109     public Queue queue002() {
110         return new Queue("queue002", true);// 隊列持久化
111     }
112 
113     @Bean
114     public Binding bingding002() {
115         return BindingBuilder.bind(queue002()).to(topicExchange002()).with("rabbit.*");
116     }
117 
118     // 第三個,隊列通過路由鍵綁定到第一個隊列上面,即第一個交換機綁定了兩個隊列。
119     @Bean
120     public Queue queue003() {
121         return new Queue("queue003", true);// 隊列持久化
122     }
123 
124     @Bean
125     public Binding bingding003() {
126         return BindingBuilder.bind(queue003()).to(topicExchange001()).with("mq.*");
127     }
128 
129     @Bean
130     public Queue queue_image() {
131         return new Queue("image_queue", true); // 隊列持久
132     }
133 
134     @Bean
135     public Queue queue_pdf() {
136         return new Queue("pdf_queue", true); // 隊列持久
137     }
138 
139     // RabbitTemplate,即消息模闆,我們在與SpringAMQP整合的時候進行發送消息的關鍵類。
140     // 該類提供了豐富的發送消息方法,包括可靠性投遞消息方法,回調監聽消息接口ConfirmCallback,
141     // 傳回值确認ReturnCallBack等等。同樣我們需要進行注入到Spring容器中,然後直接使用。
142     // 将RabbitTemplate加入到Spring容器中
143     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
144         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
145         return rabbitTemplate;
146     }
147 
148     /**
149      * 
150      * @param connectionFactory
151      * @return
152      */
153     @Bean
154     public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
155         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
156         // 監聽的隊列,可變參數,可以添加多個隊列。
157         container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
158         // 設定消費者個數。
159         container.setConcurrentConsumers(1);
160         // 設定最大消費者個數。
161         container.setMaxConcurrentConsumers(5);
162         // 設定預設是否重回隊列。
163         container.setDefaultRequeueRejected(false);
164         // 設定簽收模式,自動簽收。
165         container.setAcknowledgeMode(AcknowledgeMode.AUTO);
166         // 設定是否外露
167         container.setExposeListenerChannel(true);
168         // 設定标簽政策。
169         container.setConsumerTagStrategy(new ConsumerTagStrategy() {
170             @Override
171             public String createConsumerTag(String queue) {
172                 return queue + "_" + UUID.randomUUID().toString();
173             }
174         });
175 
176         // 監聽消息
177         container.setMessageListener(new ChannelAwareMessageListener() {
178 
179             @Override
180             public void onMessage(Message message, Channel channel) throws Exception {
181                 String msg = new String(message.getBody());
182                 System.out.println("===========================消費者消息msg : " + msg);
183             }
184         });
185 
186         return container;
187     }
188 
189 }      

可以直接在測試類裡面進行啟動,直接啟動測試方法sendMessage2,如下所示:

1 package com.bie;
  2 
  3 import java.util.HashMap;
  4 
  5 import org.junit.Test;
  6 import org.junit.runner.RunWith;
  7 import org.springframework.amqp.AmqpException;
  8 import org.springframework.amqp.core.Binding;
  9 import org.springframework.amqp.core.BindingBuilder;
 10 import org.springframework.amqp.core.DirectExchange;
 11 import org.springframework.amqp.core.FanoutExchange;
 12 import org.springframework.amqp.core.Message;
 13 import org.springframework.amqp.core.MessagePostProcessor;
 14 import org.springframework.amqp.core.MessageProperties;
 15 import org.springframework.amqp.core.Queue;
 16 import org.springframework.amqp.core.TopicExchange;
 17 import org.springframework.amqp.rabbit.core.RabbitAdmin;
 18 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 19 import org.springframework.beans.factory.annotation.Autowired;
 20 import org.springframework.boot.test.context.SpringBootTest;
 21 import org.springframework.test.context.junit4.SpringRunner;
 22 
 23 /**
 24  * 
 25  * @author biehl
 26  *
 27  */
 28 @SpringBootTest
 29 @RunWith(SpringRunner.class)
 30 public class RabbitmqSpringApplicationTests {
 31 
 32     @Autowired
 33     private RabbitAdmin rabbitAdmin;
 34 
 35     @Test
 36     public void rabbitmqAdmin() {
 37         // 參數1交換機名稱, 參數1是否持久化durable, 參數3是否自動删除 autoDelete
 38         // 建立direct類型的交換機
 39         DirectExchange directExchange = new DirectExchange("test.directExchange", false, false);
 40         rabbitAdmin.declareExchange(directExchange);
 41 
 42         // 建立topic類型的交換機
 43         TopicExchange topicExchange = new TopicExchange("test.topicExchange", false, false);
 44         rabbitAdmin.declareExchange(topicExchange);
 45 
 46         // 建立fanout類型的交換機
 47         FanoutExchange fanoutExchange = new FanoutExchange("test.fanoutExchange", false, false);
 48         rabbitAdmin.declareExchange(fanoutExchange);
 49 
 50         // 建立direct類型的隊列
 51         Queue directQueue = new Queue("test.direct.queue", false);
 52         rabbitAdmin.declareQueue(directQueue);
 53 
 54         // 建立topic類型的交隊列
 55         Queue topicQueue = new Queue("test.topic.queue", false);
 56         rabbitAdmin.declareQueue(topicQueue);
 57 
 58         // 建立fanout類型的隊列
 59         Queue fanoutQueue = new Queue("test.fanout.queue", false);
 60         rabbitAdmin.declareQueue(fanoutQueue);
 61 
 62         // 聲明綁定
 63         // 參數1 String destination,可以認為是具體的隊列。
 64         // 參數2 DestinationType destinationType,綁定的類型。
 65         // 參數3 String exchange,交換機的名稱。
 66         // 參數4 String routingKey,路由鍵的名稱。
 67         // 參數5 Map<String, Object> arguments可以傳入的參數。
 68         // 将test.directExchange交換機和test.direct.queue隊列進行綁定
 69         rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE,
 70                 "test.directExchange", "direct", new HashMap<>()));
 71 
 72         // 将test.topicExchange交換機和test.topic.queue隊列進行綁定
 73         // rabbitAdmin.declareBinding(new Binding("test.topic.queue",
 74         // Binding.DestinationType.QUEUE, "test.topicExchange",
 75         // "topic", new HashMap<>()));
 76         rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.topic.queue", false)) // 直接建立隊列
 77                 .to(new TopicExchange("test.topicExchange", false, false)) // 直接建立交換機 建立關聯關系
 78                 .with("user.#")); // 指定路由Key
 79 
 80         // 将test.fanoutExchange交換機和test.fanout.queue隊列進行綁定
 81         // rabbitAdmin.declareBinding(new Binding("test.fanout.queue",
 82         // Binding.DestinationType.QUEUE,
 83         // "test.fanoutExchange", "", new HashMap<>()));
 84 
 85         rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.fanout.queue", false)) // 直接建立隊列
 86                 .to(new FanoutExchange("test.fanoutExchange", false, false))); // 直接建立交換機 建立關聯關系
 87 
 88         // 清空隊列資料
 89         rabbitAdmin.purgeQueue("test.direct.queue", false);
 90         rabbitAdmin.purgeQueue("test.topic.queue", false);
 91         rabbitAdmin.purgeQueue("test.fanout.queue", false);
 92     }
 93 
 94     // 發送消息
 95     @Autowired
 96     private RabbitTemplate rabbitTemplate;
 97 
 98     @Test
 99     public void sendMessage() {
100         // 1、建立消息
101         MessageProperties messageProperties = new MessageProperties();
102         messageProperties.getHeaders().put("desc", "消息描述");
103         messageProperties.getHeaders().put("type", "消息類型");
104         Message message = new Message("hello RabbitMQ".getBytes(), messageProperties);
105 
106         // 2、發送消息
107         String exchange = "topic001";
108         String routingKey = "spring.amqp";
109         rabbitTemplate.convertAndSend(exchange, routingKey, message, new MessagePostProcessor() {
110 
111             @Override
112             public Message postProcessMessage(Message message) throws AmqpException {
113                 System.out.println("======添加額外的設定======");
114                 message.getMessageProperties().getHeaders().put("desc", "額外的消息描述");
115                 message.getMessageProperties().getHeaders().put("attr", "額外的屬性");
116                 return message;
117             }
118         });
119     }
120 
121     @Test
122     public void sendMessage2() throws Exception {
123         // 1 建立消息
124         MessageProperties messageProperties = new MessageProperties();
125         messageProperties.setContentType("text/plain");
126         Message message = new Message("RabbitMQ的消息.......".getBytes(), messageProperties);
127 
128         // 2、發送消息
129         rabbitTemplate.send("topic001", "spring.abc", message);
130 
131         rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
132         rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
133     }
134 
135 }      

SpringAMQP消息容器SimpleMessageListenerContainer,簡單消息監聽器,效果如下所示:

RabbitMQ與Spring的架構整合之Spring AMQP實戰
RabbitMQ與Spring的架構整合之Spring AMQP實戰
作者:别先生

繼續閱讀