首先看看
AMQP
協定,對
RabbitMQ
的架構會更了解。
深入了解AMQP協定
建立一個
Maven
項目,根據自己伺服器RabbitMQ的版本導入相應的包。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
扇型交換機
扇型交換機(funout exchange)将消息路由給綁定到它身上的所有隊列,而不理會綁定的路由鍵。如果 N 個隊列綁定到某個扇型交換機上,當有消息發送給此扇型交換機時,交換機會将消息的拷貝分别發送給這所有的 N 個隊列。
因為扇型交換機投遞消息的拷貝到所有綁定到它的隊列,是以它的應用案例都極其相似:
- 大規模多使用者線上(MMO)遊戲可以使用它來處理排行榜更新等全局事件。
- 體育新聞網站可以用它來近乎實時地将比分更新分發給移動用戶端。
- 分發系統使用它來廣播各種狀态和配置更新。
- 在群聊的時候,它被用來分發消息給參與群聊的使用者。(AMQP 沒有内置 presence 的概念,是以 XMPP 可能會是個更好的選擇)。
扇型交換機圖例:
上圖所示,生産者(P)生産消息 1 ,将消息 1 推送到 Exchange,由于 Exchange Type=fanout ,這時候會遵循 fanout exchange的路由規則,将消息推送到所有與它綁定的 Queue,也就是圖上的兩個 Queue, 最後由監聽對應Queue的消費者消費 。
生産端
routingKey = ""
,因為fanout exchange的路由規則不關心
routingKey
的值(但是不能為空)。
package com.kaven.rabbitmq.exchange.fanoutExchange;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class FanoutProducer {
// 自己伺服器的IP
private static String ip = "IP";
// RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一個 "/" 的虛拟主機
private static String virtualHost = "/";
// fanout exchange ,RabbitMQ提供的fanout exchange
private static String exchangeName = "amq.fanout";
// exchange type
private static String exchangeType= "fanout";
// 交換機路由的routingKey
private static String routingKey = "";
public static void main(String[] args) throws IOException, TimeoutException {
// 1 建立一個連接配接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 建立連接配接
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
// 4 發送消息
String msg = "RabbitMQ:Fanout Exchange 發送資料";
channel.basicPublish(exchangeName ,routingKey ,null, msg.getBytes());
// 5 關閉連接配接
channel.close();
connection.close();
}
}
消費端
routingKey = "test"
,和生産端的
routingKey
不一樣,主要為了看看fanout exchange的路由規則是否如上面解釋的一樣,這裡隻建立一個消費者,大家可以試一試多個消費者的情況,其實是一樣的。
package com.kaven.rabbitmq.exchange.fanoutExchange;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class FanoutConsumer {
// 自己伺服器的IP
private static String ip = "IP";
// RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一個 "/" 的虛拟主機
private static String virtualHost = "/";
// fanout exchange ,RabbitMQ提供的fanout exchange
private static String exchangeName = "amq.fanout";
// exchange type
private static String exchangeType= "fanout";
// 隊列名
private static String queueName = "queue";
// 隊列與交換機綁定的routingKey
private static String routingKey = "test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1 建立一個連接配接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 建立連接配接
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
// 4 定義Queue ,将Queue綁定到direct exchange
channel.queueDeclare(queueName,true , false , false , null);
channel.queueBind(queueName , exchangeName , routingKey);
// 5 建立消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 6 設定
channel.basicConsume(queueName , true , consumer);
// 7 接收消息
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(msg);
}
}
}
測試
因為這裡使用的是RabbitMQ提供給我們的fanout exchange,是以我們無需自己定義。
因為交換機已經定義好了,是以無論先啟動生産端還是消費端,消費端都可以成功收到消息。
消費端輸出如下:
RabbitMQ:Fanout Exchange 發送資料