首先本文是學習過程中的一個小demo,不涉及實際的發送短信、郵件的發送邏輯,同時,在文中 RabbitMQ 是基于釋出訂閱模式。
是以如下會使用郵件、短信發送的例子,生産者對外釋出發送消息的接口,根據調用的參數發送到相應的隊列中。
其實這裡面還會存在一些問題,比如事務問題、重複簽收問題等等,由于是練手Demo,其他問題留在後面的文章補充。
文章目錄
1. 生産者1.1 maven依賴1.2 application.yml配置類1.3 交換機綁定隊列1.4 生産者投遞消息1.5 控制層調用代碼2. 消費者2.1 maven依賴2.2 application.yml配置2.3 郵件消費者2.4 短信消費者3. 運作測試
1. 生産者
1.1 maven依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<!-- springboot-web元件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 添加springboot對amqp的支援 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!--fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
</dependencies>
1.2 application.yml配置類
spring:
rabbitmq:
####連接配接位址
host: 127.0.0.1
####端口号
port: 5672
####賬号
username: guest
####密碼
password: guest
### 位址
virtual-host: /
1.3 交換機綁定隊列
@Component
public class FanoutConfig {
/**
* 郵件隊列
*/
private String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue";
/**
* 短信隊列
*/
private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
/**
* 交換機名稱
*/
private String EXCHANGE_NAME = "fanoutExchange";
/**
* 1.定義郵件隊列
* @return
*/
@Bean
public Queue fanOutEamilQueue() {
return new Queue(FANOUT_EMAIL_QUEUE);
}
/**
* 1.定義短信隊列
* @return
*/
@Bean
public Queue fanOutSmsQueue() {
return new Queue(FANOUT_SMS_QUEUE);
}
/**
* 2.定義交換機
* @return
*/
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}
/**
* 3.隊列與交換機綁定郵件隊列
* @param fanOutEamilQueue
* @param fanoutExchange
* @return
*/
@Bean
Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
}
/**
* 4.隊列與交換機綁定短信隊列
* @param fanOutSmsQueue
* @param fanoutExchange
* @return
*/
@Bean
Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
}
}
FanoutConfig 中執行如下幾步任務:
- 定義短信隊列
- 定義交換機
- 隊列與交換機綁定郵件|短信隊列
1.4 生産者投遞消息
@Component
public class FanoutProducer {
@Autowired
private AmqpTemplate amqpTemplate;
/**
* 發送消息
* @param queueName 隊列名稱
*/
public void send(String queueName) {
String msg = "my_fanout_msg:" + new Date();
System.out.println(msg + ":" + msg);
amqpTemplate.convertAndSend(queueName, msg);
}
}
1.5 控制層調用代碼
@RestController
public class ProducerController {
@Autowired
private FanoutProducer fanoutProducer;
@RequestMapping("/sendFanout")
public String sendFanout(String queueName) {
fanoutProducer.send(queueName);
return "success";
}
}
2. 消費者
2.1 maven依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<!-- springboot-web元件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 添加springboot對amqp的支援 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!--fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
</dependencies>
2.2 application.yml配置
spring:
rabbitmq:
####連接配接位址
host: 127.0.0.1
####端口号
port: 5672
####賬号
username: guest
####密碼
password: guest
### 位址
virtual-host: /
server:
port: 8081
2.3 郵件消費者
@Component
@RabbitListener(queues = "fanout_eamil_queue")
public class FanoutEamilConsumer {
@RabbitHandler
public void process(String msg) throws Exception {
System.out.println("郵件消費者擷取生産者消息msg:" + msg);
}
}
2.4 短信消費者
@Component
@RabbitListener(queues = "fanout_sms_queue")
public class FanoutSmsConsumer {
@RabbitHandler
public void process(String msg) {
System.out.println("短信消費者擷取生産者消息msg:" + msg);
}
}
3. 運作測試
通路時才會建立交換機以及隊列,并非項目啟動就會建立
生産者通路連結(位址+發送的隊列名稱):
http://localhost:8080/sendFanout?queueName=fanout_sms_queue

消費者啟動:
案例代碼: https://www.lanzoux.com/i5zkf7c
我建立了一個java相關的公衆号,用來記錄自己的學習之路,感興趣的小夥伴可以關注一下微信公衆号哈:程式員小源