RabbitMQ工作隊列之Routing模式(四)
上一篇講了關于fanout釋出訂閱模式,允許系統推送消息至所有的訂閱者(消費者),擴充一下問題,如果将生産者生産的消息進行分類,那麼所有的消費者都會收到全部消息,這不是我們樂意見到的。如果我們希望可以對消費者進行分類,每個消費者能接收特定的消息,那該如何改造呢?
答案是肯定哒,标題已經說明了,本篇博文主要講解的是RabbitMQ工作隊列中的Routing路由密鑰模式。
RabbitMQ工作隊列中的Routing路由密鑰模式三種使用方式:
- 基于原生client用戶端連接配接使用
- 基于spring xml配置檔案內建使用
- 基于spring boot內建使用
老規矩,來一張官網模型截圖:

目錄
用
[TOC]
來生成目錄:
文章目錄
- RabbitMQ工作隊列之Routing模式(四)
-
-
- 目錄
- @[toc]
- 1、基于原生client用戶端連接配接使用
-
-
- 1.1、maven建構,pom檔案加入依賴
- 1.2、建立連接配接工廠
- 1.3、建立消息生産者
- 1.4、定義info類型消費者,此消費者可以接收info、warning、error類型
- 1.5、定義warning類型消費者,隻能接收warning類型消息
- 1.6、定義Error類型消費者,隻能接收error類型消息
- 1.7、運作效果
- 1.8、RabbitMQ管控台
- 2、基于spring xml配置檔案內建使用
-
-
- 2.1、maven工廠引入依賴包:
- 2.2、建立spring加載配置檔案context.xml
- 2.3、建立rabbitMQ配置檔案rabbitmq-routing.xml
- 2.4、建立生産者SendRouting
- 2.5、info消費者
- 2.6、error消費者
- 2.7、warning消費者
- 2.8、運作效果圖
- 3、基于spring boot內建使用
-
-
- 3.1、maven加入依賴包
- 3.2、建立application.yml配置檔案
- 3.3、建立擷取靜态參數類
- 3.4、建立rabbitMQ注解配置
- 3.5、建立controller層模拟生産者
- 3.6、info消費者
- 3.7、error消費者
- 3.8、warning消費者
- 3.9、運作效果
- 4、小結
-
-
- 小結
-
- 目錄
- @[toc]
- 1、基于原生client用戶端連接配接使用
-
-
- 1.1、maven建構,pom檔案加入依賴
- 1.2、建立連接配接工廠
- 1.3、建立消息生産者
- 1.4、定義info類型消費者,此消費者可以接收info、warning、error類型
- 1.5、定義warning類型消費者,隻能接收warning類型消息
- 1.6、定義Error類型消費者,隻能接收error類型消息
- 1.7、運作效果
- 1.8、RabbitMQ管控台
-
- 2、基于spring xml配置檔案內建使用
-
-
- 2.1、maven工廠引入依賴包:
- 2.2、建立spring加載配置檔案context.xml
- 2.3、建立rabbitMQ配置檔案rabbitmq-routing.xml
- 2.4、建立生産者SendRouting
- 2.5、info消費者
- 2.6、error消費者
- 2.7、warning消費者
- 2.8、運作效果圖
-
- 3、基于spring boot內建使用
-
-
- 3.1、maven加入依賴包
- 3.2、建立application.yml配置檔案
- 3.3、建立擷取靜态參數類
- 3.4、建立rabbitMQ注解配置
- 3.5、建立controller層模拟生産者
- 3.6、info消費者
- 3.7、error消費者
- 3.8、warning消費者
- 3.9、運作效果
-
- 4、小結
-
-
- 小結
-
1、基于原生client用戶端連接配接使用
1.1、maven建構,pom檔案加入依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<!--引入rbiitmq的連接配接工具包-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.3.0</version>
</dependency>
1.2、建立連接配接工廠
package com.edu.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 22:01
* @description :擷取連接配接
* @note 注意事項
*/
public class ConnectionUtils {
//擷取接連
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//設定連接配接MQ的IP位址
factory.setHost("192.168.199.128");
//設定連接配接端口号
factory.setPort(5672);
//設定要接連MQ的庫(域)
factory.setVirtualHost("/test_vh");
//連接配接帳号
factory.setUsername("root");
//連接配接密碼
factory.setPassword("123456");
return factory.newConnection();
}
}
1.3、建立消息生産者
package com.edu.routing;
import com.edu.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 22:16
* @description :消息生産者
* @note 注意事項
*/
public class Sender {
//定義交換器名稱
public static String EXCHANGE_NAME = "test_origin_routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//擷取連接配接
Connection connection = ConnectionUtils.getConnection();
//從連接配接中擷取一個通道
Channel channel = connection.createChannel();
/**
*
* 聲明一個交換器
*
* 文檔位址:https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html#exchangeDeclare-java.lang.String-com.rabbitmq.client.BuiltinExchangeType-boolean-boolean-java.util.Map-
* 該方法定義了許多的重載,拿出完整參數的方法來講
* AMQP.Exchange.DeclareOk exchangeDeclare(String exchange,
* BuiltinExchangeType type,
* boolean durable,
* boolean autoDelete,
* Map<String,Object> arguments)
* throws IOException
*
* @param exchange 交換器的名字
* @param type 交換器的類型:direct, topic, headers, fanout
* @param durable 是否持久化,true持久化,false不持久化
* @param autoDelete 伺服器不再使用該隊列時,是否自動删除,true删除,false不删除
* @param arguments 其他參數,其實是定義交換器的構造方法
*/
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
Integer log_count = 3;
for (int i = 0; i < 50; i++) {
Thread.sleep(500);//模拟耗時操作,别發那麼快
//自定義消息
String msg = "hello word " + i;
Integer qumo =i % log_count;//取餘,模拟分發不同類型的routingKey
switch (qumo){
case 0:
//釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
channel.basicPublish(EXCHANGE_NAME,"info",null,msg.getBytes());
System.out.println("-->info send " + msg);
break;
case 1:
//釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
channel.basicPublish(EXCHANGE_NAME,"error",null,msg.getBytes());
System.out.println("-->error send " + msg);
break;
case 2:
//釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
channel.basicPublish(EXCHANGE_NAME,"warning",null,msg.getBytes());
System.out.println("-->warning send " + msg);
break;
}
}
channel.close();//關閉通道
connection.close();//關閉連接配接
}
}
1.4、定義info類型消費者,此消費者可以接收info、warning、error類型
package com.edu.routing;
import com.edu.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 22:16
* @description :消息消費者
* @note 注意事項
*/
public class CustomeriInfo {
//隊列名稱
public static String QUEUE_NAME = "test_origin_routing_queue_info";
//定義交換器名稱
public static String EXCHANGE_NAME = "test_origin_routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//建立連接配接
Connection connection = ConnectionUtils.getConnection();
//擷取通道
Channel channel = connection.createChannel();
//擷取交換器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//建立消息聲明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 交換器綁定隊列
* 該方法定義了2個重載
* AMQP.Queue.BindOk queueBind(String queue,
* String exchange,
* String routingKey,
* Map<String,Object> arguments)
* throws IOException
*
* @param queue 隊列名稱
* @param exchange 交換器名稱
* @param routingKey 用于綁定的路由密鑰
* @param arguments 其他參數,其實是定義交換器的構造方法
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");//綁定routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");//綁定routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");//綁定routingKey
//定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
//擷取并轉成String
String message = new String(body, "UTF-8");
System.out.println("-->info消費者收到消息,msg:"+message);
}
};
//監聽隊列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
1.5、定義warning類型消費者,隻能接收warning類型消息
package com.edu.routing;
import com.edu.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 22:16
* @description :消息消費者
* @note 注意事項
*/
public class CustomerWarning {
//隊列名稱
public static String QUEUE_NAME = "test_origin_routing_queue_warning";
//定義交換器名稱
public static String EXCHANGE_NAME = "test_origin_routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//建立連接配接
Connection connection = ConnectionUtils.getConnection();
//擷取通道
Channel channel = connection.createChannel();
//擷取交換器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//建立消息聲明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 交換器綁定隊列
* 該方法定義了2個重載
* AMQP.Queue.BindOk queueBind(String queue,
* String exchange,
* String routingKey,
* Map<String,Object> arguments)
* throws IOException
*
* @param queue 隊列名稱
* @param exchange 交換器名稱
* @param routingKey 用于綁定的路由密鑰
* @param arguments 其他參數,其實是定義交換器的構造方法
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
//定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
//擷取并轉成String
String message = new String(body, "UTF-8");
System.out.println("-->warning消費者收到消息,msg:"+message);
}
};
//監聽隊列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
1.6、定義Error類型消費者,隻能接收error類型消息
package com.edu.routing;
import com.edu.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 22:16
* @description :消息消費者
* @note 注意事項
*/
public class CustomerError {
//隊列名稱
public static String QUEUE_NAME = "test_origin_routing_queue_error";
//定義交換器名稱
public static String EXCHANGE_NAME = "test_origin_routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//建立連接配接
Connection connection = ConnectionUtils.getConnection();
//擷取通道
Channel channel = connection.createChannel();
//擷取交換器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//建立消息聲明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 交換器綁定隊列
* 該方法定義了2個重載
* AMQP.Queue.BindOk queueBind(String queue,
* String exchange,
* String routingKey,
* Map<String,Object> arguments)
* throws IOException
*
* @param queue 隊列名稱
* @param exchange 交換器名稱
* @param routingKey 用于綁定的路由密鑰
* @param arguments 其他參數,其實是定義交換器的構造方法
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
//定義消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
//擷取并轉成String
String message = new String(body, "UTF-8");
System.out.println("-->error消費者收到消息,msg:"+message);
}
};
//監聽隊列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
1.7、運作效果
生産者生産消息
info
error
warning
1.8、RabbitMQ管控台
檢視exchange綁定的queue
2、基于spring xml配置檔案內建使用
2.1、maven工廠引入依賴包:
<!--引入rbiitmq的連接配接工具包-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.0</version>
</dependency>
<!-- 引入spring內建rabbit的包 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.3.RELEASE</version>
</dependency>
<!-- spring核心庫 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
<!-- springbean庫 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
<!-- 上下文 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
2.2、建立spring加載配置檔案context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">
<!--加載routing路由密鑰模式-->
<import resource="classpath:rabbitmq-routing.xml" />
</beans>
2.3、建立rabbitMQ配置檔案rabbitmq-routing.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">
<!--配置connection-factory,指定連接配接rabbit server參數 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/test_vh" username="root"
password="123456" host="192.168.199.128" port="5672" />
<!--MQ的管理,包括隊列,交換器,聲明等-->
<rabbit:admin connection-factory="connectionFactory" />
<!--定義rabbit模版,指定連接配接工廠以及定義exchange-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="directExchangeName" />
<!--定義隊列,自動聲明(可以用于發消息和監聽使用)-->
<!--定義queue 說明:durable:是否持久化 exclusive: 僅建立者可以使用的私有隊列,斷開後自動删除 auto_delete: 當所有消費用戶端連接配接斷開後,是否自動删除隊列-->
<rabbit:queue id="test_routing_spring_xml1" name="test_routing_spring_xml1" auto-declare="true" auto-delete="true" />
<!--定義隊列-->
<rabbit:queue id="test_routing_spring_xml2" name="test_routing_spring_xml2" auto-declare="true" auto-delete="true" />
<!--定義隊列-->
<rabbit:queue id="test_routing_spring_xml3" name="test_routing_spring_xml3" auto-declare="true" auto-delete="true" />
<!--定義direct交換器,并且隊列綁定交換器,記住生産者發送消息的時候,使用的是這個交換器的name-->
<rabbit:direct-exchange id="directExchangeName" name="test_routing_spring_exchange">
<rabbit:bindings>
<rabbit:binding queue="test_routing_spring_xml1" key="info" />
<rabbit:binding queue="test_routing_spring_xml1" key="error" />
<rabbit:binding queue="test_routing_spring_xml1" key="warning" />
<rabbit:binding queue="test_routing_spring_xml2" key="warning" />
<rabbit:binding queue="test_routing_spring_xml3" key="error" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定義消費者-->
<bean id="myCustomer" class="routing.CustomerInfo"/>
<bean id="myCustomer2" class="routing.CustomerWarning"/>
<bean id="myCustomer3" class="routing.CustomerError"/>
<!--隊列監聽 acknowledge應答方式:auto,manual,none -->
<rabbit:listener-container connection-factory="connectionFactory" >
<rabbit:listener ref="myCustomer" method="listen" queue-names="test_routing_spring_xml1" />
<rabbit:listener ref="myCustomer2" method="listen" queue-names="test_routing_spring_xml2" />
<rabbit:listener ref="myCustomer3" method="listen" queue-names="test_routing_spring_xml3" />
</rabbit:listener-container>
</beans>
2.4、建立生産者SendRouting
package com.rabbit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @author: Alex
* @DateTime: 2018/8/22 19:40
* @Description: 生産者
* @Version: 1.0.0
**/
public class SendRouting {
//交換器名稱,這裡的exchange名稱要和rabbitmq-routing.xml裡面配置對應
private static String EXCHANGE_NAME = "test_routing_spring_exchange";
public static void main(String[] args) throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:context.xml");
//擷取rabbit模版(等價于@Autowired)
RabbitTemplate bean = context.getBean(RabbitTemplate.class);
Integer log_count = 3;
for (int i = 0; i < 30; i++) {
Thread.sleep(500);//模拟耗時操作,别發那麼快
//自定義消息
String msg = "hello word " + i;
Integer qumo =i % log_count;//取餘,模拟分發不同類型的routingKey
switch (qumo){
case 0:
//釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
bean.convertAndSend(EXCHANGE_NAME,"info","hello word"+i);
System.out.println("-->info send " + msg);
break;
case 1:
//釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
bean.convertAndSend(EXCHANGE_NAME,"error","hello word"+i);
System.out.println("-->error send " + msg);
break;
case 2:
//釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
bean.convertAndSend(EXCHANGE_NAME,"warning","hello word"+i);
System.out.println("-->warning send " + msg);
break;
}
}
Thread.sleep(10000);//休眠2秒後,關閉spring容器
context.close();
}
}
2.5、info消費者
package routing;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 23:39
* @description :我的消費者info
* @note 注意事項
*/
public class CustomerInfo {
public void listen(String foo){
System.out.println("消費者消費info,擷取消息msg:"+foo);
}
}
2.6、error消費者
package routing;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 23:39
* @description :我的消費者error
* @note 注意事項
*/
public class CustomerError {
public void listen(String foo){
System.out.println("消費者消費error,擷取消息msg:"+foo);
}
}
2.7、warning消費者
package routing;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 23:39
* @description :我的消費者warning
* @note 注意事項
*/
public class CustomerWarning {
public void listen(String foo){
System.out.println("消費者消費warning,擷取消息msg:"+foo);
}
}
2.8、運作效果圖
由此圖我們可以看出,我們的
info消費者綁定了info,error,warning三個類型的消息
error消費者綁定error消息
warning消費者綁定warning消息
每次info都會列印消息,
error和warning隻有接收屬于自身的消息才會列印消息
3、基于spring boot內建使用
3.1、maven加入依賴包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2、建立application.yml配置檔案
spring:
rabbitmq:
username: root
password: 123456
host: 192.168.199.128
port: 5672
virtual-host: /test_vh
rabbitMQconfig:
queueName:
info: test_spring_boot_routing_info
error: test_spring_boot_routing_error
warning: test_spring_boot_routing_warning
exchangeName:
directName: test_spring_boot_exchange_routing
3.3、建立擷取靜态參數類
package cn.rabbitmq.edu.rabbitmq_spring_boot.utils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @author: Alex
* @DateTime: 2018/8/23 11:19
* @Description: 靜态參數類
* @Version: 1.0.0
**/
@Component
public class ParamUtil {
@Value("${rabbitMQconfig.queueName.info}")
public String queueNameInfo;
@Value("${rabbitMQconfig.queueName.error}")
public String queueNameError;
@Value("${rabbitMQconfig.queueName.warning}")
public String queueNameWarning;
@Value("${rabbitMQconfig.exchangeName.directName}")
public String directName;
}
3.4、建立rabbitMQ注解配置
package cn.rabbitmq.edu.rabbitmq_spring_boot.utils;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: Alex
* @DateTime: 2018/8/23 10:52
* @Description: rabbitMQ配置聲明
* @Version: 1.0.0
**/
@Configuration
public class RabbitMQDeclareUtil {
@Autowired
private ParamUtil paramUtil;
@Bean
Queue getQueue1(){
//定義第一個隊列隊列,Ctrl+滑鼠左鍵,點選Queue可以看到定義
return new Queue(paramUtil.queueNameInfo);
}
@Bean
Queue getQueue2(){
//定義第二個隊列
return new Queue(paramUtil.queueNameError);
}
@Bean
Queue getQueue3(){
//定義第三個隊列
return new Queue(paramUtil.queueNameWarning);
}
@Bean
DirectExchange getExchange() {
//定義一個DirectExchange交換器
return new DirectExchange(paramUtil.directName);
}
/**
* info隊列與交換器綁定,指定routingKey為info
* @param getQueue1 定義的第一個隊列
* @param getExchange 定義的交換器
* @return
*/
@Bean
Binding bindingInfo(Queue getQueue1, DirectExchange getExchange) {
return BindingBuilder.bind(getQueue1).to(getExchange).with("info");
}
/**
* info隊列與交換器綁定,指定routingKey為error
* @param getQueue1 定義的第一個隊列
* @param getExchange 定義的交換器
* @return
*/
@Bean
Binding bindingInfo1(Queue getQueue1, DirectExchange getExchange) {
return BindingBuilder.bind(getQueue1).to(getExchange).with("error");
}
/**
* info隊列與交換器綁定,指定routingKey為warning
* @param getQueue1 定義的第一個隊列
* @param getExchange 定義的交換器
* @return
*/
@Bean
Binding bindingInfo2(Queue getQueue1, DirectExchange getExchange) {
return BindingBuilder.bind(getQueue1).to(getExchange).with("warning");
}
/**
* error隊列與交換器綁定,指定routingKey為error
* @param getQueue2
* @param getExchange
* @return
*/
@Bean
Binding bindingError(Queue getQueue2, DirectExchange getExchange) {
return BindingBuilder.bind(getQueue2).to(getExchange).with("error");
}
/**
* warning隊列與交換器綁定,指定routingKey為warning
* @param getQueue3
* @param getExchange
* @return
*/
@Bean
Binding bindingWarning(Queue getQueue3, DirectExchange getExchange) {
return BindingBuilder.bind(getQueue3).to(getExchange).with("warning");
}
}
3.5、建立controller層模拟生産者
package cn.rabbitmq.edu.rabbitmq_spring_boot.controller;
import cn.rabbitmq.edu.rabbitmq_spring_boot.utils.ParamUtil;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author: Alex
* @DateTime: 2018/8/17 16:36
* @Description: web通路模拟發送
* @Version: 1.0.0
**/
@RestController
public class IndexController {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private ParamUtil paramUtil;
/**
* 使用AmqpTemplate
* @return
* @throws Exception
*/
@PostMapping("/amqpSend")
public String amqpSend() throws Exception{
Integer log_count = 3;
for (int i = 0; i < 30; i++) {
Thread.sleep(500);//模拟耗時操作,别發那麼快
String msg = "hello amqp " + i;//自定義消息
Integer qumo =i % log_count;//取餘,模拟分發不同類型的routingKey
switch (qumo){
case 0:
//釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
amqpTemplate.convertAndSend(paramUtil.directName,"info",msg);//根據指定的exchange發送資料
System.out.println("-->info send " + msg);
break;
case 1:
//釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
amqpTemplate.convertAndSend(paramUtil.directName,"error",msg);//根據指定的exchange發送資料
System.out.println("-->error send " + msg);
break;
case 2:
//釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
amqpTemplate.convertAndSend(paramUtil.directName,"warning",msg);//根據指定的exchange發送資料
System.out.println("-->warning send " + msg);
break;
}
}
return "success";
}
}
3.6、info消費者
package cn.rabbitmq.edu.rabbitmq_spring_boot.customer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: Alex
* @DateTime: 2018/8/17 16:35
* @Description: 模拟消費者1
* @Version: 1.0.0
**/
@Component
//監聽的隊列
@RabbitListener(queues = "test_spring_boot_routing_info")
public class CustomerMsg {
/**
* 進行接收處理
* @param string
*/
@RabbitHandler
public void onMessage(String string,Channel channel, Message message) throws IOException, InterruptedException {
System.out.println("消費者info,接收時間:"+System.currentTimeMillis()+",收到消息,消息: " + string);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//手動确認
//丢棄這條消息
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
}
}
3.7、error消費者
package cn.rabbitmq.edu.rabbitmq_spring_boot.customer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: Alex
* @DateTime: 2018/8/17 16:35
* @Description: 模拟消費者2
* @Version: 1.0.0
**/
@Component
//監聽的隊列
@RabbitListener(queues = "test_spring_boot_routing_error")
public class CustomerMsg2 {
/**
* 進行接收處理
* @param string
*/
@RabbitHandler
public void onMessage(String string,Channel channel, Message message) throws IOException, InterruptedException {
System.out.println("消費者error,接收時間:"+System.currentTimeMillis()+",收到消息,消息: " + string);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//手動确認
//丢棄這條消息
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
}
}
3.8、warning消費者
package cn.rabbitmq.edu.rabbitmq_spring_boot.customer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: Alex
* @DateTime: 2018/8/17 16:35
* @Description: 模拟消費者2
* @Version: 1.0.0
**/
@Component
//監聽的隊列
@RabbitListener(queues = "test_spring_boot_routing_warning")
public class CustomerMsg3 {
/**
* 進行接收處理
* @param string
*/
@RabbitHandler
public void onMessage(String string,Channel channel, Message message) throws IOException, InterruptedException {
System.out.println("消費者warning,接收時間:"+System.currentTimeMillis()+",收到消息,消息: " + string);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//手動确認
//丢棄這條消息
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
}
}
3.9、運作效果
4、小結
小結
通過本章的學習,學到了routing模式,也就是exchange為direct類型的定義,routing模式為路由密鑰模式,通過給定的routingKey進行與隊列的比對,比對綁定後,發送消息就到達指定的隊列啦。
回頭再看上一章,上一章開始接觸exchange,目前學習了2類exchange的類型,fanout和direct模式。
fanout為扇形廣播消息
direct為比對路由密鑰廣播消息