1,路由模式 direct
direct 模式基礎概念請參考 RabbitMQ消息隊列之基礎 (二)
direct類型的路由規則很簡單,它會把消息路由到那些binding key與routing key完全比對的Queue中;
也就是說:隻要隊列名、交換機、路由key完全一緻,就可以比對到該消息。
這裡直接示範代碼:
生産者:
1 //direct類型 路由模式 1對1比對
2 //生産者發送消息時需要指定一個路由鍵(routingKey),交換機隻會把消息轉發給包含該路由鍵的隊列
3 //string exchange = "TestMq_Exchange"; //交換機
4 //string routingKey = "TestMq_RoutingKey"; //路由鍵
5
6 string queueName = "TestMq"; //隊列名
7 for (int i = 0; i < 10; i++)
8 {
9 string message = "Hello World:" + i;
10 RabbitMqConfig rabbitMqConfig = RabbitMqConfig.Init(); //初始化配置檔案
11 ConnectionFactory connFactory = new ConnectionFactory
12 {
13 Uri = new Uri(url),
14 RequestedConnectionTimeout = rabbitMqConfig.RequestedConnectionTimeout,
15 RequestedChannelMax = rabbitMqConfig.RequestedChannelMax,
16 RequestedHeartbeat = rabbitMqConfig.RequestedHeartbeat,
17 AutomaticRecoveryEnabled = false
18 };
19 string exchange = queueName + "_Exchange"; //交換機
20 string routingKey = queueName + "_RoutingKey"; //路由鍵
21 string exchangeType = ExchangeType.Direct; //類型 direct
22
23 IModel model = this._conn.CreateModel();
24 model.ExchangeDeclare(exchange, exchangeType, true, false, null); //聲明交換機
25 model.QueueDeclare(queueName, true, false, false, null); //聲明隊列
26 model.QueueBind(queueName, exchange, routingKey, null); //綁定
27
28 IBasicProperties basicProperties = model.CreateBasicProperties();
29 basicProperties.Persistent = true; //消息持久化
30 basicProperties.DeliveryMode = 2; //消息持久化, 預設為1 非持久化
31 byte[] bytes = Encoding.UTF8.GetBytes(message);
32 model.BasicPublish(exchange, routingKey, basicProperties, bytes);
33
34 Console.WriteLine(DateTime.Now);
35 }
初始化配置檔案,在config當中配置
1 ConfigurationManager.GetSection("RabbitMqConfig"); //讀取配置檔案
2
3 //config配置檔案
4 <configSections>
5 <section name="RabbitMqConfig" type="Rabbit.Common.RabbitMqConfig,Rabbit.Common" />
6 </configSections>
7 <RabbitMqConfig RequestedHeartbeat="60" RequestedConnectionTimeout="300" RequestedChannelMax="500" Uri="amqp://admin:[email protected]:5672//" />
消費者1:
1 //消費者1
2 string queueName = "TestMq"; //隊列名
3 string exchange = queueName + "_Exchange"; //交換器
4 string routingKey = queueName + "_RoutingKey"; //路由關鍵字
5
6 var rabbitMq = RabbitMqConfig.Init(); //擷取Rabbit隊列配置
7 var rm = new ConnectionFactory()
8 {
9 Uri = new Uri(rabbitMq.Uri),
10 RequestedConnectionTimeout = rabbitMq.RequestedConnectionTimeout,
11 RequestedChannelMax = rabbitMq.RequestedChannelMax,
12 RequestedHeartbeat = rabbitMq.RequestedHeartbeat,
13 };
14
15 var connection = rm.CreateConnection(); //建立連接配接
16 var channel = connection.CreateModel(); //建立通道
17 //開啟隊列持久化(durable = true),不自動删除(autoDelete = false),是否專屬(exclusive: false)
18 channel.ExchangeDeclare(exchange, ExchangeType.Direct, true, false, null); //聲明一個交換器
19 channel.QueueDeclare(queueName, true, false, false, null); //聲明一個隊列
20 channel.QueueBind(queueName, exchange, routingKey, null); //綁定交換器和路由
21 channel.BasicQos(0, 1, false); //每次隻接收1個,處理完後再接收下一個
22 var consumer = new EventingBasicConsumer(channel);
23
24 consumer.Received += (model, ea) =>
25 {
26 var message = Encoding.UTF8.GetString(ea.Body); //消息主體
27
28 //處理消息邏輯,可以使用異步處理
29 Console.WriteLine(DateTime.Now + " 收到消息:" + message);
30 Thread.Sleep(3000); //模拟消耗延時
31
32 channel.BasicAck(ea.DeliveryTag, false); //傳回确認狀态 該條消息将會從隊列當中移除
33
34 };
35 //監聽隊列,手動傳回完成 第二個參數值為false代表關閉RabbitMQ的自動應答機制,改為手動應答。
36 channel.BasicConsume(queueName, false, consumer);
消費者2:
//消費者2
string queueName = "TestMq2"; //隊列名2
string exchange = "TestMq_Exchange"; //交換器 TestMq_Exchange
string routingKey = "TestMq_RoutingKey"; //路由關鍵字 TestMq_RoutingKey
。。。
。。。
下圖所示: 消費者1 完全比對, 消費者2的隊列名稱為:TestMq2,與生産者不比對

2,廣播模式 / 分發模式 ( fanout )
這種模式下,消息會被所有消費者消費.也就是說,隻要是"綁定"到某個交換機的隊列,都會收到生産者發送到該交換機的消息.
fanout 類型的發送規則非常簡單,它會把所有發送到該交換機Exchange的消息路由到所有與它綁定的Queue中;
也就是說:在fanout模式下,隻跟 交換機Exchange有關系,跟路由key無關。
1 string queueName = "TestMq";
2 string exchange = "TestMq_Exchange"; //隻需要聲明交換機 即可
3 string routingKey = ""; //路由鍵
4 string exchangeType = ExchangeType.Fanout; //類型 Fanout
5 for (int i = 0; i < 10; i++)
6 {
7 string message = "Hello World:" + i;
8 RabbitMqConfig rabbitMqConfig = RabbitMqConfig.Init(); //初始化配置檔案
9 ConnectionFactory connFactory = new ConnectionFactory
10 {
11 Uri = new Uri(url),
12 RequestedConnectionTimeout = rabbitMqConfig.RequestedConnectionTimeout,
13 RequestedChannelMax = rabbitMqConfig.RequestedChannelMax,
14 RequestedHeartbeat = rabbitMqConfig.RequestedHeartbeat,
15 AutomaticRecoveryEnabled = false
16 };
17
18 IModel model = this._conn.CreateModel();
19 model.ExchangeDeclare(exchange, exchangeType, true, false, null); //聲明交換機
20 model.QueueDeclare(queueName, true, false, false, null); //聲明隊列
21 model.QueueBind(queueName, exchange, routingKey, null); //綁定
22
23 IBasicProperties basicProperties = model.CreateBasicProperties();
24 basicProperties.Persistent = true; //消息持久化
25 basicProperties.DeliveryMode = 2; //消息持久化, 預設為1 非持久化
26 byte[] bytes = Encoding.UTF8.GetBytes(message);
27 model.BasicPublish(exchange, routingKey, basicProperties, bytes);
28
29 Console.WriteLine(DateTime.Now + ":" + message);
30 }
消費者1: 這裡設定: queueName = TestMq
string queueName = "TestMq"; //隊列名
string exchange = "TestMq_Exchange"; //交換機
string routingKey = ""; //路由鍵
....
....
channel.ExchangeDeclare(exchange, ExchangeType.Fanout, true, false, null); //聲明一個交換器
消費者2: 這裡設定: queueName = TestMq
1 string queueName = "TestMq"; //隊列名
2 string exchange = "TestMq_Exchange"; //交換機
3 string routingKey = ""; //路由鍵
4
5 ....
6 ....
7
8 channel.ExchangeDeclare(exchange, ExchangeType.Fanout, true, false, null); //聲明一個交換器
上邊示例當中使用相同的 queueName = "TestMq"; 會出現如下圖所示的情況,各消費一半的情況,因為系統認為是一個隊列
下邊我們修改一下隊列名稱:
1 //消費者1
2 string queueName = "TestMq1"; //隊列名1
3 string exchange = "TestMq_Exchange"; //交換機
4 string routingKey = ""; //路由鍵
1 //消費者2
2 string queueName = "TestMq2"; //隊列名2
3 string exchange = "TestMq_Exchange"; //交換機
4 string routingKey = ""; //路由鍵
運作結果如下:分别由 隊列1和隊列2 接收
3,主題模式/模糊比對 ( fanout )
對于topic類型,
生産者:的消息的路由鍵routing key 一般不會任意給定。它一般是一些單詞的集合,中間用點号.分割。這些單詞可以任意,但通常要展現出消息的特征。
一些有效的路由鍵示例:stock.usd.nyse,nyse.vmw,quick.orange.rabbit。這些路由鍵可以包含很多單詞,但路由鍵總長度不能超過255個位元組;
消費者:binding key也一般是這種形式。topic類型的邏輯與direct類似:以特定路由鍵發送的消息将會發送到所有綁定鍵與之比對的隊列中。
但綁定鍵有兩種特殊的情況:
*(星号)僅代表一個單詞
#(井号)代表任意個單詞
也就是說:
當消費者隊列以綁定鍵“#”綁定,它将會接收到所有的消息,而無視路由鍵(實際是綁定鍵
#
比對了任意的路由鍵)。----這和
fanout模式
一樣了。
當消費者隊列中,
*
和
#
這兩個特殊的字元不出現在綁定鍵中,
Topic類型
就會和
direct類型一樣
了。
特别重要的提示:比對的最小機關是 “單詞”,且中間用點号.分割
下圖很好的說明了 topic模式的比對規則:
1 //生産者
2 string queueName = "TestMq";
3 string exchange = "TestMq_Exchange"; //交換機
4 string routingKey = "test.key"; //路由鍵
5 string exchangeType = ExchangeType.Topic; //類型 Topic
6
7 。。。
8 。。。
1 //消費者1
2 string queueName = "TestMq1"; //隊列名1
3 string exchange = "TestMq_Exchange"; //交換機
4 string routingKey = "test"; //路由鍵為test
5
6 。。。
7 。。。
8 //開啟隊列持久化(durable = true),不自動删除(autoDelete = false),是否專屬(exclusive: false)
9 channel.ExchangeDeclare(exchange, ExchangeType.Topic, true, false, null); //聲明一個交換器
1 //消費者2
2 string queueName = "TestMq2"; //隊列名2
3 string exchange = "TestMq_Exchange"; //交換機
4 string routingKey = "#"; //路由鍵
5
6 。。。
下圖所示:消費者1 的路由key沒有比對到消息,而 消費者2 使用 # 作為路由key,比對到消息
接下來,我們修改一下消費者的路由key
1 //消費者1
2 string queueName = "TestMq1"; //隊列名1
3 string exchange = "TestMq_Exchange"; //交換機
4 string routingKey = "test.*"; //路由鍵 修改為 test.* 效果同:*.key
1 //消費者2
2 string queueName = "TestMq2"; //隊列名2
3 string exchange = "TestMq_Exchange"; //交換機
4 string routingKey = "#.key"; //路由鍵 修改為 #.key 效果同:test.#
下圖表示:上邊兩個消費者均比對到了生産者釋出的消息
參考文檔: https://www.cnblogs.com/zhangbLearn/p/9559336.html
作者:PeterZhang
出處:https://www.cnblogs.com/peterzhang123
本文版權歸作者和部落格園共有,歡迎轉載,但必須給出原文連結,并保留此段聲明,否則保留追究法律責任的權利。