RabbitMQ有以下幾種工作模式 :
1、Work queues 工作隊列
2、Publish/Subscribe 釋出訂閱
3、Routing 路由
4、Topics 通配符
5、Header Header 轉發器
6、RPC 遠端調用
進入浏覽器,輸入:http://localhost:15672
初始賬号和密碼:guest/guest
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnL0UzM4IzNyETM1ETMwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
通配符工作模式:
1、一個交換機可以綁定多個隊列,每個隊列可以設定一個或多個帶通配符的 routingkey.
2、生産者将消息發給交換機,交換機根據routingkey的值來比對隊列,比對時采用 統配符的方式,比對成功的将消息轉發到指定隊列.
生産者代碼:
package com.xuecheng.test.rabbitmq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Administrator
* @version 1.0
* @create 2018-06-17 19:23
**/
public class Producer04_topics {
//隊列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
private static final String ROUTINGKEY_SMS="inform.#.sms.#";
public static void main(String[] args) {
//通過連接配接工廠建立新的連接配接和mq建立連接配接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//端口
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設定虛拟機,一個mq服務可以設定多個虛拟機,每個虛拟機就相當于一個獨立的mq
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//建立新連接配接
connection = connectionFactory.newConnection();
//建立會話通道,生産者和mq服務所有通信都在channel通道中完成
channel = connection.createChannel();
//聲明隊列,如果隊列在mq 中沒有則要建立
//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 參數明細
* 1、queue 隊列名稱
* 2、durable 是否持久化,如果持久化,mq重新開機後隊列還在
* 3、exclusive 是否獨占連接配接,隊列隻允許在該連接配接中通路,如果connection連接配接關閉隊列則自動删除,如果将此參數設定true可用于臨時隊列的建立
* 4、autoDelete 自動删除,隊列不再使用時是否自動删除此隊列,如果将此參數和exclusive參數設定為true就可以實作臨時隊列(隊列不用了就自動删除)
* 5、arguments 參數,可以設定一個隊列的擴充參數,比如:可設定存活時間
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//聲明一個交換機
//參數:String exchange, String type
/**
* 參數明細:
* 1、交換機的名稱
* 2、交換機的類型
* fanout:對應的rabbitmq的工作模式是 publish/subscribe
* direct:對應的Routing 工作模式
* topic:對應的Topics工作模式
* headers: 對應的headers工作模式
*/
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//進行交換機和隊列綁定
//參數:String queue, String exchange, String routingKey
/**
* 參數明細:
* 1、queue 隊列名稱
* 2、exchange 交換機名稱
* 3、routingKey 路由key,作用是交換機根據路由key的值将消息轉發到指定的隊列中,在釋出訂閱模式中調協為空字元串
*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);
//發送消息
//參數:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 參數明細:
* 1、exchange,交換機,如果不指定将使用mq的預設交換機(設定為"")
* 2、routingKey,路由key,交換機根據路由key來将消息轉發到指定的隊列,如果使用預設交換機,routingKey設定為隊列的名稱
* 3、props,消息的屬性
* 4、body,消息内容
*/
for(int i=0;i<5;i++){
//發送消息的時候指定routingKey
String message = "send email inform message to user";
channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes());
System.out.println("send to mq "+message);
}
for(int i=0;i<5;i++){
//發送消息的時候指定routingKey
String message = "send sms inform message to user";
channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes());
System.out.println("send to mq "+message);
}
for(int i=0;i<5;i++){
//發送消息的時候指定routingKey
String message = "send sms and email inform message to user";
channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes());
System.out.println("send to mq "+message);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//關閉連接配接
//先關閉通道
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
郵件消費者代碼:
package com.xuecheng.test.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Administrator
* @version 1.0
* @create 2018-06-17 18:22
**/
public class Consumer04_topics_email {
//隊列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
public static void main(String[] args) throws IOException, TimeoutException {
//通過連接配接工廠建立新的連接配接和mq建立連接配接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//端口
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設定虛拟機,一個mq服務可以設定多個虛拟機,每個虛拟機就相當于一個獨立的mq
connectionFactory.setVirtualHost("/");
//建立新連接配接
Connection connection = connectionFactory.newConnection();
//建立會話通道,生産者和mq服務所有通信都在channel通道中完成
Channel channel = connection.createChannel();
/**
* 參數明細
* 1、queue 隊列名稱
* 2、durable 是否持久化,如果持久化,mq重新開機後隊列還在
* 3、exclusive 是否獨占連接配接,隊列隻允許在該連接配接中通路,如果connection連接配接關閉隊列則自動删除,如果将此參數設定true可用于臨時隊列的建立
* 4、autoDelete 自動删除,隊列不再使用時是否自動删除此隊列,如果将此參數和exclusive參數設定為true就可以實作臨時隊列(隊列不用了就自動删除)
* 5、arguments 參數,可以設定一個隊列的擴充參數,比如:可設定存活時間
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
//聲明一個交換機
//參數:String exchange, String type
/**
* 參數明細:
* 1、交換機的名稱
* 2、交換機的類型
* fanout:對應的rabbitmq的工作模式是 publish/subscribe
* direct:對應的Routing 工作模式
* topic:對應的Topics工作模式
* headers: 對應的headers工作模式
*/
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//進行交換機和隊列綁定
//參數:String queue, String exchange, String routingKey
/**
* 參數明細:
* 1、queue 隊列名稱
* 2、exchange 交換機名稱
* 3、routingKey 路由key,作用是交換機根據路由key的值将消息轉發到指定的隊列中,在釋出訂閱模式中調協為空字元串
*/
channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
//實作消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 當接收到消息後此方法将被調用
* @param consumerTag 消費者标簽,用來辨別消費者的,在監聽隊列時設定channel.basicConsume
* @param envelope 信封,通過envelope
* @param properties 消息屬性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
String exchange = envelope.getExchange();
//消息id,mq在channel中用來辨別消息的id,可用于确認消息已接收
long deliveryTag = envelope.getDeliveryTag();
//消息内容
String message= new String(body,"utf-8");
System.out.println("receive message:"+message);
}
};
//監聽隊列
//參數:String queue, boolean autoAck, Consumer callback
/**
* 參數明細:
* 1、queue 隊列名稱
* 2、autoAck 自動回複,當消費者接收到消息後要告訴mq消息已接收,如果将此參數設定為tru表示會自動回複mq,如果設定為false要通過程式設計實作回複
* 3、callback,消費方法,當消費者接收到消息要執行的方法
*/
channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
}
}
短信消費者代碼:
package com.xuecheng.test.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Administrator
* @version 1.0
* @create 2018-06-17 18:22
**/
public class Consumer04_topics_sms {
//隊列名稱
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
private static final String ROUTINGKEY_SMS="inform.#.sms.#";
public static void main(String[] args) throws IOException, TimeoutException {
//通過連接配接工廠建立新的連接配接和mq建立連接配接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//端口
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//設定虛拟機,一個mq服務可以設定多個虛拟機,每個虛拟機就相當于一個獨立的mq
connectionFactory.setVirtualHost("/");
//建立新連接配接
Connection connection = connectionFactory.newConnection();
//建立會話通道,生産者和mq服務所有通信都在channel通道中完成
Channel channel = connection.createChannel();
/**
* 參數明細
* 1、queue 隊列名稱
* 2、durable 是否持久化,如果持久化,mq重新開機後隊列還在
* 3、exclusive 是否獨占連接配接,隊列隻允許在該連接配接中通路,如果connection連接配接關閉隊列則自動删除,如果将此參數設定true可用于臨時隊列的建立
* 4、autoDelete 自動删除,隊列不再使用時是否自動删除此隊列,如果将此參數和exclusive參數設定為true就可以實作臨時隊列(隊列不用了就自動删除)
* 5、arguments 參數,可以設定一個隊列的擴充參數,比如:可設定存活時間
*/
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//聲明一個交換機
//參數:String exchange, String type
/**
* 參數明細:
* 1、交換機的名稱
* 2、交換機的類型
* fanout:對應的rabbitmq的工作模式是 publish/subscribe
* direct:對應的Routing 工作模式
* topic:對應的Topics工作模式
* headers: 對應的headers工作模式
*/
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//進行交換機和隊列綁定
//參數:String queue, String exchange, String routingKey
/**
* 參數明細:
* 1、queue 隊列名稱
* 2、exchange 交換機名稱
* 3、routingKey 路由key,作用是交換機根據路由key的值将消息轉發到指定的隊列中,在釋出訂閱模式中調協為空字元串
*/
channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);
//實作消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 當接收到消息後此方法将被調用
* @param consumerTag 消費者标簽,用來辨別消費者的,在監聽隊列時設定channel.basicConsume
* @param envelope 信封,通過envelope
* @param properties 消息屬性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
String exchange = envelope.getExchange();
//消息id,mq在channel中用來辨別消息的id,可用于确認消息已接收
long deliveryTag = envelope.getDeliveryTag();
//消息内容
String message= new String(body,"utf-8");
System.out.println("receive message:"+message);
}
};
//監聽隊列
//參數:String queue, boolean autoAck, Consumer callback
/**
* 參數明細:
* 1、queue 隊列名稱
* 2、autoAck 自動回複,當消費者接收到消息後要告訴mq消息已接收,如果将此參數設定為tru表示會自動回複mq,如果設定為false要通過程式設計實作回複
* 3、callback,消費方法,當消費者接收到消息要執行的方法
*/
channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
}
}
pom:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>xc-framework-parent</artifactId>
<groupId>com.xuecheng</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../xc-framework-parent/pom.xml</relativePath>
</parent>
<artifactId>test-rabbitmq-producer</artifactId>
<dependencies>
<!-- <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version><!–此版本與spring boot 1.5.9版本比對–>
</dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
</project>