前言
在上一篇文章中,我們知道了RabbitMQ的消息流程如下:

但在具體的使用中,我們還需知道exchange的類型,因為不同的類型對應不同的隊列和路由規則。
在rabbitmq中,exchange有4個類型:direct,topic,fanout,header。
direct exchange
此類型的exchange路由規則很簡單:
exchange在和queue進行binding時會設定routingkey
channel.QueueBind(queue: "create_pdf_queue",
exchange: "pdf_events",
routingKey: "pdf_create",
arguments: null);
然後我們在将消息發送到exchange時會設定對應的routingkey:
channel.BasicPublish(exchange: "pdf_events",
routingKey: "pdf_create",
basicProperties: properties,
body: body);
在direct類型的exchange中,隻有這兩個routingkey完全相同,exchange才會選擇對應的binging進行消息路由。
具體的流程如下:
通過代碼可以會了解好一點:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// Direct類型的exchange, 名稱 pdf_events
channel.ExchangeDeclare(exchange: "pdf_events",
type: ExchangeType.Direct,
durable: true,
autoDelete: false,
arguments: null);
// 建立create_pdf_queue隊列
channel.QueueDeclare(queue: "create_pdf_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//建立 pdf_log_queue隊列
channel.QueueDeclare(queue: "pdf_log_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//綁定 pdf_events --> create_pdf_queue 使用routingkey:pdf_create
channel.QueueBind(queue: "create_pdf_queue",
exchange: "pdf_events",
routingKey: "pdf_create",
arguments: null);
//綁定 pdf_events --> pdf_log_queue 使用routingkey:pdf_log
channel.QueueBind(queue: "pdf_log_queue",
exchange: "pdf_events",
routingKey: "pdf_log",
arguments: null);
var message = "Demo some pdf creating...";
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//發送消息到exchange :pdf_events ,使用routingkey: pdf_create
//通過binding routinekey的比較,次消息會路由到隊列 create_pdf_queue
channel.BasicPublish(exchange: "pdf_events",
routingKey: "pdf_create",
basicProperties: properties,
body: body);
message = "pdf loging ...";
body = Encoding.UTF8.GetBytes(message);
properties = channel.CreateBasicProperties();
properties.Persistent = true;
//發送消息到exchange :pdf_events ,使用routingkey: pdf_log
//通過binding routinekey的比較,次消息會路由到隊列 pdf_log_queue
channel.BasicPublish(exchange: "pdf_events",
routingKey: "pdf_log",
basicProperties: properties,
body: body);
}
topic exchange
此類型exchange和上面的direct類型差不多,但direct類型要求routingkey完全相等,這裡的routingkey可以有通配符:'*','#'.
其中'*'表示比對一個單詞, '#'則表示比對沒有或者多個單詞
如上圖第一個binding:
- exchange: agreements
- queue A: berlin_agreements
- binding routingkey: agreements.eu.berlin.#
第二個binding:
- queue B: all_agreements
- binding routingkey: agreements.#
第三個binding:
- queue c: headstore_agreements
- binding routingkey: agreements.eu.*.headstore
是以如果我們消息的routingkey為agreements.eu.berlin那麼符合第一和第二個binding,但最後一個不符合,具體的代碼如下:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// Topic類型的exchange, 名稱 agreements
channel.ExchangeDeclare(exchange: "agreements",
type: ExchangeType.Topic,
durable: true,
autoDelete: false,
arguments: null);
// 建立berlin_agreements隊列
channel.QueueDeclare(queue: "berlin_agreements",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//建立 all_agreements 隊列
channel.QueueDeclare(queue: "all_agreements",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//建立 headstore_agreements 隊列
channel.QueueDeclare(queue: "headstore_agreements",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//綁定 agreements --> berlin_agreements 使用routingkey:agreements.eu.berlin.#
channel.QueueBind(queue: "berlin_agreements",
exchange: "agreements",
routingKey: "agreements.eu.berlin.#",
arguments: null);
//綁定 agreements --> all_agreements 使用routingkey:agreements.#
channel.QueueBind(queue: "all_agreements",
exchange: "agreements",
routingKey: "agreements.#",
arguments: null);
//綁定 agreements --> headstore_agreements 使用routingkey:agreements.eu.*.headstore
channel.QueueBind(queue: "headstore_agreements",
exchange: "agreements",
routingKey: "agreements.eu.*.headstore",
arguments: null);
var message = "hello world";
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//發送消息到exchange :agreements ,使用routingkey: agreements.eu.berlin
//agreements.eu.berlin 比對 agreements.eu.berlin.# 和agreements.#
//agreements.eu.berlin 不比對 agreements.eu.*.headstore
//最終次消息會路由到隊裡:berlin_agreements(agreements.eu.berlin.#) 和 all_agreements(agreements.#)
channel.BasicPublish(exchange: "agreements",
routingKey: "agreements.eu.berlin",
basicProperties: properties,
body: body);
}
fanout exchange
此exchange的路由規則很簡單直接将消息路由到所有綁定的隊列中,無須對消息的routingkey進行比對操作。
header exchange
此類型的exchange和以上三個都不一樣,其路由的規則是根據header來判斷,其中的header就是以下方法的arguments參數:
Dictionary<string, object> aHeader = new Dictionary<string, object>();
aHeader.Add("format", "pdf");
aHeader.Add("type", "report");
aHeader.Add("x-match", "all");
channel.QueueBind(queue: "queue.A",
exchange: "agreements",
routingKey: string.Empty,
arguments: aHeader);
其中的x-match為特殊的header,可以為all則表示要比對所有的header,如果為any則表示隻要比對其中的一個header即可。
在釋出消息的時候就需要傳入header值:
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
mHeader1.Add("format", "pdf");
mHeader1.Add("type", "report");
properties.Headers = mHeader1;
具體的規則可以看以下代碼:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// Headers類型的exchange, 名稱 agreements
channel.ExchangeDeclare(exchange: "agreements",
type: ExchangeType.Headers,
durable: true,
autoDelete: false,
arguments: null);
// 建立queue.A隊列
channel.QueueDeclare(queue: "queue.A", durable: true, exclusive: false, autoDelete: false, arguments: null);
//建立 queue.B 隊列
channel.QueueDeclare(queue: "queue.B", durable: true, exclusive: false, autoDelete: false, arguments: null);
//建立 queue.C 隊列
channel.QueueDeclare(queue: "queue.C", durable: true, exclusive: false, autoDelete: false, arguments: null);
//綁定 agreements --> queue.A 使用arguments (format=pdf, type=report, x-match=all)
Dictionary<string, object> aHeader = new Dictionary<string, object>();
aHeader.Add("format", "pdf");
aHeader.Add("type", "report");
aHeader.Add("x-match", "all");
channel.QueueBind(queue: "queue.A",
exchange: "agreements",
routingKey: string.Empty,
arguments: aHeader);
//綁定 agreements --> queue.B 使用arguments (format=pdf, type=log, x-match=any)
Dictionary<string, object> bHeader = new Dictionary<string, object>();
bHeader.Add("format", "pdf");
bHeader.Add("type", "log");
bHeader.Add("x-match", "any");
channel.QueueBind(queue: "queue.B",
exchange: "agreements",
routingKey: string.Empty,
arguments: bHeader);
//綁定 agreements --> queue.C 使用arguments (format=zip, type=report, x-match=all)
Dictionary<string, object> cHeader = new Dictionary<string, object>();
cHeader.Add("format", "zip");
cHeader.Add("type", "report");
cHeader.Add("x-match", "all");
channel.QueueBind(queue: "queue.C",
exchange: "agreements",
routingKey: string.Empty,
arguments: cHeader);
string message1 = "hello world";
var body = Encoding.UTF8.GetBytes(message1);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
mHeader1.Add("format", "pdf");
mHeader1.Add("type", "report");
properties.Headers = mHeader1;
//此消息路由到 queue.A 和 queue.B
//queue.A 的binding (format=pdf, type=report, x-match=all)
//queue.B 的binding (format = pdf, type = log, x - match = any)
channel.BasicPublish(exchange: "agreements",
routingKey: string.Empty,
basicProperties: properties,
body: body);
string message2 = "hello world";
body = Encoding.UTF8.GetBytes(message2);
properties = channel.CreateBasicProperties();
properties.Persistent = true;
Dictionary<string, object> mHeader2 = new Dictionary<string, object>();
mHeader2.Add("type", "log");
properties.Headers = mHeader2;
//x-match 配置queue.B
//queue.B 的binding (format = pdf, type = log, x-match = any)
channel.BasicPublish(exchange: "agreements",
routingKey: string.Empty,
basicProperties: properties,
body: body);
string message3= "hello world";
body = Encoding.UTF8.GetBytes(message3);
properties = channel.CreateBasicProperties();
properties.Persistent = true;
Dictionary<string, object> mHeader3 = new Dictionary<string, object>();
mHeader3.Add("format", "zip");
properties.Headers = mHeader3;
//配置失敗,不會被路由
channel.BasicPublish(exchange: "agreements",
routingKey: string.Empty,
basicProperties: properties,
body: body);
}
總計
以上就是exchange 類型的總結,一般來說direct和topic用來具體的路由消息,如果要用廣播的消息一般用fanout的exchange。
header類型用的比較少,但還是知道一點好。