文章目錄
- Exchange
- 描述
- 結構圖
- 交換機屬性
- Direct Exchange
- Topic Exchange
- Fanout Exchange
- Binding
- Queue
- 屬性
- Message
- 屬性
- Virtual host - 虛拟主機
- 項目代碼
Exchange
描述
用于接收消息,并根據路由鍵轉發消息到所綁定的隊列
結構圖
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiQDOxEzX3xCZlhXam9VbsUmepNXZy9CXwJWZ3xCdh1mcvZ2Lc1zaHRGcWdUYuVzVa9GczoVdG1mWfVGc5RHLwIzX39GZhh2csATMflHLwEzX4xSZz91ZsAzMfRHLGZkRGZkRfJ3bs92YskmNhVTYykVNQJVMRhXVEF1X0hXZ0xiNx8VZ6l2cssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL4gTMxYWNmZDNmZ2N4U2YyYzX5EzM1YTMzEzLcdDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
藍色框表示發送消息,然後消息通過路由關系路由到Queue1或Queue2
綠色框表示接收消息,消費者與隊列建立監聽,去消費消息
黃色框表示路由鍵綁定關系
交換機屬性
- Name:交換機名稱
- Type:交換機類型direct、topic、fanout、headers
- Durablity:是否需要持久化 true|false
- Auto Delete:當最後一個綁定到Exchange上的隊列删除後,自動删除該Exchange
- Internal:目前Exchange是否用于RabbitMQ内部使用,預設為false
- Arguments:擴充參數,用于擴充AMQP協定自制定使用
Direct Exchange
- 所有發送到Direct Exchange的消息被轉發到RoutingKey中指定的Queue
如圖所示,路由到隊列名為Key的隊列中去了
生産者代碼:
public class Producer4DirectExchange {
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel();
//聲明
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct";
//發送
String msg = "Hello, I am Producer4DirectExchange";
channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());
}
}
消費者代碼:
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel();
//聲明
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
//聲明Exchange
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
//聲明一個隊列
channel.queueDeclare(queueName,false,false,false,null);
//建立一個綁定關系
channel.queueBind(queueName,exchangeName,routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱,是否自動ACK,Consumer
channel.basicConsume(queueName,true,consumer);
while (true) {
//阻塞擷取消息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:"+msg);
}
}
公共類:
public class ConnectionUtil {
public static final String MQ_HOST = "192.168.222.101";
public static final String MQ_VHOST = "/";
public static final int MQ_PORT = 5672;
public static Connection getConn() {
//1. 建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(MQ_HOST);//配置host
connectionFactory.setPort(MQ_PORT);//配置port
connectionFactory.setVirtualHost(MQ_VHOST);//配置vHost
//2. 通過連接配接工廠建立連接配接
try {
return connectionFactory.newConnection();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
登入控制台,可以看到名為
test_direct_exchange
的交換機通過路由鍵
test.direct
綁定到
test_direct_queue
Topic Exchange
- 所有發送到Topic Exchange的消息都被轉發到所有關心RoutingKey中指定Topic的隊列上
- Exchange将RoutingKey和某Topic進行模糊比對,此時隊列需要綁定一個Topic
可以使用通配符進行模糊比對:
#
比對一個或多個詞(單詞的意思,不是一個字元)
*
比對一個詞
log.#
能比對到"log.info.a"
隻比對"log.error"
log.*
如上圖,比如,user.news和user.weather能路由到第一個隊列
生成者代碼:
public class Producer4TopicExchange {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel();
//聲明
String exchangeName = "test_topic_exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.find.abc";
//發送
String msg = "Hello, I am Producer4TopicExchange";
channel.basicPublish(exchangeName,routingKey1,null,msg.getBytes());
channel.basicPublish(exchangeName,routingKey2,null,msg.getBytes());
channel.basicPublish(exchangeName,routingKey3,null,msg.getBytes());
CloseTool.closeElegantly(channel,connection);
}
}
消費者代碼:
public class Consumer4TopicExchange {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel();
//聲明
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
String routingKey = "user.#";
//聲明Exchange
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
//聲明一個隊列
channel.queueDeclare(queueName,false,false,false,null);
//建立一個綁定關系
channel.queueBind(queueName,exchangeName,routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱,是否自動ACK,Consumer
channel.basicConsume(queueName,true,consumer);
while (true) {
//阻塞擷取消息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:"+msg);
}
}
}
先啟動消費者,然後發現下面兩個綁定關系:
隊列
再啟動生産者,從消費者端可以看到如下輸出:
收到消息:Hello, I am Producer4TopicExchange
收到消息:Hello, I am Producer4TopicExchange
收到消息:Hello, I am Producer4TopicExchange
說明3條消息都收到了,接下來,我們改一下消費者的路由鍵,改為:
String routingKey = "user.*";
再次啟動
收到消息:Hello, I am Producer4TopicExchange
收到消息:Hello, I am Producer4TopicExchange
收到消息:Hello, I am Producer4TopicExchange
奇怪,怎麼還是收到3條,帶着疑問我們去看控制台
可以看到,之前的路由規則綁定還在。是以可以解釋為啥能收到3條。
點選unbind解綁"user.#"
然後繼續操作一把,檢視輸出
收到消息:Hello, I am Producer4TopicExchange
收到消息:Hello, I am Producer4TopicExchange
此時,隻收到兩條了。
Fanout Exchange
- 不處理路由鍵,隻需要簡單的将隊列綁定到交換機上
- 發送到交換機的消息都會被轉發到與該互動機綁定的所有隊列上
- 轉發消息是最快的
消息不走路由鍵,隻要隊列與交換機有綁定關系就能收到。
生産者:
public class Producer4FanoutExchange {
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel();
//聲明
String exchangeName = "test_fanout_exchange";
String routingKey = "nothing";
//發送
String msg = "Hello, I am Producer4FanoutExchange";
channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());
CloseTool.closeElegantly(channel,connection);
}
}
消費者:
public class Consumer4FanoutExchange {
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = ConnectionUtil.getConn();
Channel channel = connection.createChannel();
//聲明
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = "";//不設定路由鍵
//聲明Exchange
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
//聲明一個隊列
channel.queueDeclare(queueName,false,false,false,null);
//建立一個綁定關系
channel.queueBind(queueName,exchangeName,routingKey);
//durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱,是否自動ACK,Consumer
channel.basicConsume(queueName,true,consumer);
while (true) {
//阻塞擷取消息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:"+msg);
}
}
}
和
Binding
- Exchange和Exchange、Queue之間的連接配接關系
- Binding中可以包含RoutingKey或參數
Queue
- 消息隊列,實際存儲消息
屬性
- Durability:是否持久化
- Auto delete: 若為yes,代表當最後一個監聽被移除後,該Queue會自動被删除
Message
- 伺服器和應用程式之間傳遞的資料
- 本質就是一段資料,由Properties和Payload(Body)組成
屬性
- delivery mode
- headers(自定義屬性放到這裡面)
- content_type
- content_encoding
- priority
- correlation_id
- reply_to:指令消息失敗傳回的隊列
- expiration:過期時間
-
message_id:消息ID
…
Producer:
public class Producer {
public static final String MQ_HOST = "192.168.222.101";
public static final String MQ_VHOST = "/";
public static final int MQ_PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(MQ_HOST);//配置host
connectionFactory.setPort(MQ_PORT);//配置port
connectionFactory.setVirtualHost(MQ_VHOST);//配置vHost
//2. 通過連接配接工廠建立連接配接
Connection connection = connectionFactory.newConnection();
//3. 通過connection建立一個Channel
Channel channel = connection.createChannel();
Map<String,Object> headers = new HashMap<>();
headers.put("var1","abc");
headers.put("var2","sdd");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) //2:持久化投遞;1:非持久化(未消費的消息重新開機後就沒了)
.contentEncoding("UTF-8")
.expiration("5000")//5s
.headers(headers)
.build();
//4. 通過Channel發送資料
for (int i = 0; i < 10; i++) {
String message = "Hello" + i;
//exchange為"",則通過routingKey取尋找隊列
channel.basicPublish("","testQueue",properties,message.getBytes());
}
//5. 關閉連接配接
channel.close();
connection.close();
}
}
public class Consumer {
public static final String QUEUE_NAME = "testQueue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConn();
//3. 通過connection建立一個Channel
Channel channel = connection.createChannel();
//4. 聲明(建立)一個隊列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//5. 建立消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6. 設定Channel
channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
int num = 0;
//7. 擷取消息
while (true) {
//nextDelivery 會阻塞直到有消息過來
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("收到:" + message);
Map<String,Object > headers = delivery.getProperties().getHeaders();
System.out.println("headers get va1 :" + headers.get("var1"));
}
}
}
Virtual host - 虛拟主機
- 虛拟位址,用于進行邏輯隔離,最上層的消息路由
- 一個Virtuall Host裡面可以有若幹個Exchange和Queue
- 同一個Virtual Host裡面不能有相同名稱的Exchange或Queue