RabbitMQ的工作模式
RabbitMQ有一下四種工作模式:
- DIRECT(“direct”),定向
- FANOUT(“fanout”),發送到所有綁定的隊列
- TOPIC(“topic”),通配符的方式
- HEADERS(“headers”);參數比對
下面将逐個講解各個模式的用法
Work queues 工作隊列模式
模式說明

如圖所示,一個隊列對應多個消費者,C1和C2屬于競争關系,同一條消息要麼被C1消費,要麼被C2消費
總結:Work queues多個消費端共同消費同一隊列中的消息
應用場景:對于任務過重或者任務較多情況下使用工作隊列可以提高任務處理速度
生産者
public class MessageProducer {
public static void sendMessage() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.設定參數
// ip,預設為本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,預設值5672
connectionFactory.setPort(5672);
// 虛拟機,預設值/
connectionFactory.setVirtualHost("my_vhost");
// 賬号,預設值guest
connectionFactory.setUsername("admin");
// 密碼,預設值guest
connectionFactory.setPassword("admin");
// 3.建立連接配接 Connection
Connection connection = connectionFactory.newConnection();
// 4.建立Channel
Channel channel = connection.createChannel();
// 5.建立隊列
/**
* 參數說明:
* 1.queue:隊列名稱
* 2.durable:是否持久化,當mq重新開機之後,資料依舊存在
* 3.exclusive:
* - 是否獨占,隻能有一個監聽者該隊列
* - 當connection關閉時,是否删除隊列
* 4.autoDelete:是否 自動删除,當沒有Consumer是,自動删除
* 5.arguments:參數
*/
// 如果沒有名為“first_queue”的隊列,則會建立,否則不會建立
channel.queueDeclare("work_queue",true,false,false,null);
// 6.發送消息
/**
* 參數:
* 1.exchange:交換機名稱,簡單模式下交換機預設為""
* 2.routing key:路由mingc
* 3.props:配置資訊
* 4.body:發送的消息資料
*/
for (int i = 1; i < 21; i++) {
String message = "work queues 第" + i + "條消息";
channel.basicPublish("","work_queue",null,message.getBytes());
}
// 7.釋放資源
channel.close();
connection.close();
}
}
為了測試出效果,循環發送20條消息
消費者
public class MessageConsumer {
public static void getMessage() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.設定參數
// ip,預設為本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,預設值5672
connectionFactory.setPort(5672);
// 虛拟機,預設值/
connectionFactory.setVirtualHost("my_vhost");
// 賬号,預設值guest
connectionFactory.setUsername("admin");
// 密碼,預設值guest
connectionFactory.setPassword("admin");
// 3.建立連接配接 Connection
Connection connection = connectionFactory.newConnection();
// 4.建立Channel
Channel channel = connection.createChannel();
// 5.建立隊列
/**
* 參數說明:
* 1.queue:隊列名稱
* 2.durable:是否持久化,當mq重新開機之後,資料依舊存在
* 3.exclusive:
* - 是否獨占,隻能有一個監聽者該隊列
* - 當connection關閉時,是否删除隊列
* 4.autoDelete:是否 自動删除,當沒有Consumer是,自動删除
* 5.arguments:參數
*/
// 如果沒有名為“first_queue”的隊列,則會建立,否則不會建立
channel.queueDeclare("work_queue",true,false,false,null);
// 6.接收消息
/**
* handleDelivery()參數:
* 1.consumerTag:辨別
* 2.envelope:擷取一些資訊,交換機,路由key
* 3.properties:配置資訊
* 4.bady:資料
*/
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("用戶端1消費:" + new String(body));
}
};
channel.basicConsume("work_queue",true,consumer);
// 7.釋放資源?不需要 消費者相當于一個監聽者 時刻監聽隊列
// 此處隻是不讓程式結束,等待回調函數執行
while (true){}
}
}
同樣的代碼在複制一份,修改列印輸出内容,作為用戶端2,啟動會看到work_queue有兩個消費者
消費端啟動後,然後運作生産用戶端發送消息
消費用戶端1控制台列印如下:
用戶端1消費:work queues 第2條消息
用戶端1消費:work queues 第4條消息
用戶端1消費:work queues 第6條消息
用戶端1消費:work queues 第8條消息
…
消費用戶端2控制台列印如下:
用戶端2消費:work queues 第1條消息
用戶端2消費:work queues 第3條消息
用戶端2消費:work queues 第5條消息
用戶端2消費:work queues 第7條消息
用戶端2消費:work queues 第9條消息
…
可以看出生産的20條消息,被兩個消費端共同分擔
Pub/Sub訂閱模式
模式說明
該模式中多了Exchange,整個發送過程相較之前 有所變化,生産者生産的消息不再是發送到隊列中去,而是發給交換機
Exchange:交換機(X),一方面接收生産者發送的消息,另一方面知道如何處理消息,例如遞交給某個特别隊列、遞交給所有隊列、或者丢棄消息。到底怎麼操作,取決于Exchange的類型,常見以下三種:
- Fanout:廣播,将消息交給所有綁定的交換機隊列
- Direct:定向,把消息交給符合指定routing key的隊列
- Topic:通配符,把消息交給符合routing pattern(路由模式)的隊列
Exchange隻負責轉發消息,不具備存儲消息的能力,是以如果沒有任何隊列與exchange綁定或者沒有符合路由規則的隊列,那麼消息将會丢失
生産者
public class MessageProducer_PubSub {
public static void sendMessage() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.設定參數
// ip,預設為本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,預設值5672
connectionFactory.setPort(5672);
// 虛拟機,預設值/
connectionFactory.setVirtualHost("my_vhost");
// 賬号,預設值guest
connectionFactory.setUsername("admin");
// 密碼,預設值guest
connectionFactory.setPassword("admin");
// 3.建立連接配接 Connection
Connection connection = connectionFactory.newConnection();
// 4.建立Channel
Channel channel = connection.createChannel();
// 5.常見交換機
/**
* 參數說明:
* 1.exchange:交換機類型
* 2.type:交換機類型
* DIRECT("direct"),定向
* FANOUT("fanout"),發送到所有綁定的隊列
* TOPIC("topic"),通配符的方式
* HEADERS("headers");參數比對
* 3.durable:是否持久化
* 4.autoDelete:是否自動删除
* 5.internal:内部使用,一般為false
* 6.arguments:參數
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,false,false,null);
// 6.建立隊列
String queue1 = "test_fanout_queue1";
String queue2 = "test_fanout_queue2";
channel.queueDeclare(queue1,true,false,false,null);
channel.queueDeclare(queue2,true,false,false,null);
// 7.綁定隊列和交換機
/**
* 參數說明:
* 1.queue:隊列名稱
* 2.exchange:交換機名稱
* 3.routingKey:路由鍵,綁定規則
* 如果交換機的類型為fanout,routingKey設定為""
*/
channel.queueBind(queue1,exchangeName,"");
channel.queueBind(queue2,exchangeName,"");
// 8.發送消息到交換機
String message = "pubsub模式的消息";
channel.basicPublish(exchangeName,"", null,message.getBytes());
// 9.釋放資源
channel.close();
connection.close();
}
}
運作代碼後觀察rabbit控制台
交換機新增了自定義的交換機名
點選交換機名,檢視詳情,能看到如下圖的綁定關系
隊列新增了和交換機綁定的兩個隊列,并且隊列中各有一條消息待消費
消費者
public class MessageConsumer_PubSub1 {
public static void getMessage() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.設定參數
// ip,預設為本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,預設值5672
connectionFactory.setPort(5672);
// 虛拟機,預設值/
connectionFactory.setVirtualHost("my_vhost");
// 賬号,預設值guest
connectionFactory.setUsername("admin");
// 密碼,預設值guest
connectionFactory.setPassword("admin");
// 3.建立連接配接 Connection
Connection connection = connectionFactory.newConnection();
// 4.建立Channel
Channel channel = connection.createChannel();
// 5.接收消息
/**
* handleDelivery()參數:
* 1.consumerTag:辨別
* 2.envelope:擷取一些資訊,交換機,路由key
* 3.properties:配置資訊
* 4.bady:資料
*/
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("用戶端1消費:" + new String(body));
}
};
channel.basicConsume("test_fanout_queue1",true,consumer);
// 6.釋放資源?不需要 消費者相當于一個監聽者 時刻監聽隊列
// 此處隻是不讓程式結束,等待回調函數執行
while (true){}
}
}
相同代碼複制一份,修改監聽隊列為test_fanout_queue2,修改列印資訊作為用戶端2,分别啟動消費用戶端1和消費用戶端2
控制台輸出如下:
用戶端1消費:pubsub模式的消息
用戶端2消費:pubsub模式的消息
可以看出,同一條消息換發送到交換機,由交換機分發到兩個隊列中,兩個用戶端分别在對應的隊列中進行消費
Routing 路由模式
模式說明:
圖中表示不同的日志級别,由交換機分發到不同的隊列中
- 隊列與交換機綁定,不能是任意綁定了,而是指定一個RoutingKey(路由key)
- 消息發送到exchange時,必須指定消息的RoutingKey
- Exchange不在把消息發送到每一個綁定的隊列,而是根據消息的RoutingKey進行判斷,隻有隊列的Routingkey與消息的RoutingKey一緻時,才會收到消息
- 交換機類型為Direct,綁定
生産者
public class MessageProducer_Routing {
public static void sendMessage() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.設定參數
// ip,預設為本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,預設值5672
connectionFactory.setPort(5672);
// 虛拟機,預設值/
connectionFactory.setVirtualHost("my_vhost");
// 賬号,預設值guest
connectionFactory.setUsername("admin");
// 密碼,預設值guest
connectionFactory.setPassword("admin");
// 3.建立連接配接 Connection
Connection connection = connectionFactory.newConnection();
// 4.建立Channel
Channel channel = connection.createChannel();
// 5.常見交換機
/**
* 參數說明:
* 1.exchange:交換機類型
* 2.type:交換機類型
* DIRECT("direct"),定向
* FANOUT("fanout"),發送到所有綁定的隊列
* TOPIC("topic"),通配符的方式
* HEADERS("headers");參數比對
* 3.durable:是否持久化
* 4.autoDelete:是否自動删除
* 5.internal:内部使用,一般為false
* 6.arguments:參數
*/
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);
// 6.建立隊列
String queue1 = "test_direct_queue1";
String queue2 = "test_direct_queue2";
channel.queueDeclare(queue1,true,false,false,null);
channel.queueDeclare(queue2,true,false,false,null);
// 7.綁定隊列和交換機
/**
* 參數說明:
* 1.queue:隊列名稱
* 2.exchange:交換機名稱
* 3.routingKey:路由鍵,綁定規則
* 如果交換機的類型為fanout,routingKey設定為""
*/
// error隊列綁定
channel.queueBind(queue1,exchangeName,"error");
// info error warn隊列綁定
channel.queueBind(queue2,exchangeName,"info");
channel.queueBind(queue2,exchangeName,"error");
channel.queueBind(queue2,exchangeName,"warning");
// 8.發送消息到交換機
String messageInfo = "日志資訊:info";
String messageError = "日志資訊:error";
String messageWarn = "日志資訊:warning";
// 發送不同RoutingKey類型的消息,觀察exchange會怎麼處理
channel.basicPublish(exchangeName,"info", null,messageInfo.getBytes());
channel.basicPublish(exchangeName,"error", null,messageError.getBytes());
channel.basicPublish(exchangeName,"warning", null,messageWarn.getBytes());
// 9.釋放資源
channel.close();
connection.close();
}
}
運作生産者代碼,觀察rabbit控制台
Exchange交換機新增名稱為test_direct的交換機
點選名稱進入,可檢視和隊列綁定關系
在觀察queues欄,可以看出消息根據不同的routingkey被分發到不同隊列
隊列1接收RoutingKey為error的消息,隻有1條
隊列2接收RoutingKey為info、error、warning的消息,有三天
因為兩個隊列都綁定了RoutingKey為error的消息,是以error消息會出現在兩個隊列中
消費者
public class MessageConsumer_Routing1 {
public static void getMessage() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.設定參數
// ip,預設為本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,預設值5672
connectionFactory.setPort(5672);
// 虛拟機,預設值/
connectionFactory.setVirtualHost("my_vhost");
// 賬号,預設值guest
connectionFactory.setUsername("admin");
// 密碼,預設值guest
connectionFactory.setPassword("admin");
// 3.建立連接配接 Connection
Connection connection = connectionFactory.newConnection();
// 4.建立Channel
Channel channel = connection.createChannel();
// 5.接收消息
/**
* handleDelivery()參數:
* 1.consumerTag:辨別
* 2.envelope:擷取一些資訊,交換機,路由key
* 3.properties:配置資訊
* 4.bady:資料
*/
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("用戶端1消費:" + new String(body));
}
};
channel.basicConsume("test_direct_queue1",true,consumer);
// 6.釋放資源?不需要 消費者相當于一個監聽者 時刻監聽隊列
// 此處隻是不讓程式結束,等待回調函數執行
while (true){}
}
}
複制相同代碼,修改監聽隊列名稱,修改列印資訊作為用戶端2
啟動消費者,控制台列印如下:
用戶端1消費:日志資訊:error
用戶端2消費:日志資訊:info
用戶端2消費:日志資訊:error
用戶端2消費:日志資訊:warning
總結:生産者發送的消息經過exchange處理,根據RoutingKey進行比對,分發到不同的隊列中,由該隊列對應的消費者去消費
Topic通配符模式
模式說明:
- 交換機類型為Topic,通配符模式
- 例RoutingKey為Usa.news 可以比對usa.#的隊列,也可以比對#.news的隊列
- 過程:生産者将消息發送給交換機,由交換機根據隊列RoutingKey設定的比對規則進行分發到不同隊列
生産者:
public class MessageProducer_Topic {
public static void sendMessage() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.設定參數
// ip,預設為本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,預設值5672
connectionFactory.setPort(5672);
// 虛拟機,預設值/
connectionFactory.setVirtualHost("my_vhost");
// 賬号,預設值guest
connectionFactory.setUsername("admin");
// 密碼,預設值guest
connectionFactory.setPassword("admin");
// 3.建立連接配接 Connection
Connection connection = connectionFactory.newConnection();
// 4.建立Channel
Channel channel = connection.createChannel();
// 5.常見交換機
/**
* 參數說明:
* 1.exchange:交換機類型
* 2.type:交換機類型
* DIRECT("direct"),定向
* FANOUT("fanout"),發送到所有綁定的隊列
* TOPIC("topic"),通配符的方式
* HEADERS("headers");參數比對
* 3.durable:是否持久化
* 4.autoDelete:是否自動删除
* 5.internal:内部使用,一般為false
* 6.arguments:參數
*/
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.TOPIC,true,false,false,null);
// 6.建立隊列
String queue1 = "test_topic_queue1";
String queue2 = "test_topic_queue2";
channel.queueDeclare(queue1,true,false,false,null);
channel.queueDeclare(queue2,true,false,false,null);
// 7.綁定隊列和交換機
/**
* 參數說明:
* 1.queue:隊列名稱
* 2.exchange:交換機名稱
* 3.routingKey:路由鍵,綁定規則
* 如果交換機的類型為fanout,routingKey設定為""
*/
// 訂單相關資訊、info資訊存入隊列1
channel.queueBind(queue1,exchangeName,"*.info.*");
channel.queueBind(queue1,exchangeName,"order.#");
// 訂單錯誤資訊、支付資訊存入隊列2
channel.queueBind(queue2,exchangeName,"order.error.*");
channel.queueBind(queue2,exchangeName,"pay.#");
// 8.發送消息到交換機
String message = "Topic模式消息";
// 發送不同RoutingKey類型的消息,觀察exchange會怎麼處理
channel.basicPublish(exchangeName,"test.info.test", null,message.getBytes());
channel.basicPublish(exchangeName,"order.error.select", null,message.getBytes());
channel.basicPublish(exchangeName,"order.error.select.success", null,message.getBytes());
channel.basicPublish(exchangeName,"pay.test", null,message.getBytes());
// 9.釋放資源
channel.close();
connection.close();
}
}
啟動生産者,觀察rabbit控制台
Exchange新增名稱為test_topic的交換機
點選名稱檢視隊列及相關綁定的通配符
隊列情況
隊列1綁定通配符*.info.*,order.#,和它比對的RoutingKey有test.info.test,order.error.select,order.error.select.success,是以它有三條消息
隊列2綁定通配符order.error.*,pay.#,和它比對的RoutingKey有order.error.select,pay.test,是以隻有兩個
為什麼order.error.*不能比對order.error.select.success
因為*表示一個單詞,#表示多個單詞,而單詞界定以.為界,是以不能比對
消費者
public class MessageConsumer_Topic1 {
public static void getMessage() throws IOException, TimeoutException {
// 1.建立連接配接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.設定參數
// ip,預設為本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,預設值5672
connectionFactory.setPort(5672);
// 虛拟機,預設值/
connectionFactory.setVirtualHost("my_vhost");
// 賬号,預設值guest
connectionFactory.setUsername("admin");
// 密碼,預設值guest
connectionFactory.setPassword("admin");
// 3.建立連接配接 Connection
Connection connection = connectionFactory.newConnection();
// 4.建立Channel
Channel channel = connection.createChannel();
// 5.接收消息
/**
* handleDelivery()參數:
* 1.consumerTag:辨別
* 2.envelope:擷取一些資訊,交換機,路由key
* 3.properties:配置資訊
* 4.bady:資料
*/
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("用戶端1消費:" + new String(body));
}
};
channel.basicConsume("test_topic_queue1",true,consumer);
// 6.釋放資源?不需要 消費者相當于一個監聽者 時刻監聽隊列
// 此處隻是不讓程式結束,等待回調函數執行
while (true){}
}
}
複制代碼修改隊列名,修改列印内容作為用戶端2
運作兩個消費者用戶端,控制台分别列印
用戶端1:
用戶端1消費:Topic模式消息
用戶端1消費:Topic模式消息
用戶端1消費:Topic模式消息
用戶端2:
用戶端1消費:Topic模式消息
用戶端1消費:Topic模式消息
總結:Topic模式和Routing模式很類似,不過Routing模式屬于值綁定,topic屬于通配符綁定,應用範圍更廣