十年河東,十年河西,莫欺少年窮
學無止境,精益求精
上篇部落格介紹了RabbitMQ的六種工作模式 RabbitMQ的六種工作模式
RabbitMQ的簡單模式和Work工作模式請參考:NetCore RabbitMQ 簡介及兔子生産者、消費者 【簡單模式,work工作模式,競争消費】
本篇部落格使用NetCore完成RabbitMQ釋出訂閱模式中的廣播模式
何為廣播模式?
publish/subscribe釋出訂閱(共享資源)
- X代表交換機rabbitMQ内部元件,erlang 消息産生者是代碼完成,代碼的執行效率不高,消息産生者将消息放入交換機,交換機釋出訂閱把消息發送到所有消息隊列中,對應消息隊列的消費者拿到消息進行消費
- 相關場景:郵件群發,群聊天,廣播(廣告)
上圖中X代表交換機,RabbitMQ中交換機的類型分為四種,分别為廣播模式,定向模式,通配符模式,參數比對模式
ExchangeType.Fanout【廣播模式】
ExchangeType.Direct【定向模式】
ExchangeType.Topic【通配符模式】
ExchangeType.Headers 【參數比對模式】
廣播模式生産者
廣播模式建立生産者,分為如下步驟,
1、聲明一個交換機
2、聲明廣播的隊列
3、交換機和隊列進行綁定
4、生産消息
以上步驟用NetCore 實作如下:
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMqProducer
{
class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; //主機名
factory.UserName = "guest";//使用的使用者
factory.Password = "guest";//使用者密碼
factory.Port = 5672;//端口号
factory.VirtualHost = "/"; //虛拟主機
factory.MaxMessageSize = 1024; //消息最大位元組數
using (var connection = factory.CreateConnection())
{
//rabbitMQ 基于信道進行通信,是以,我們需要執行個體化信道Channel
using (var channel = connection.CreateModel())
{
//exchange 交換機名稱
//type 交換機類型 ExchangeType.Direct【定向模式】 ExchangeType.Fanout【廣播模式】 ExchangeType.Topic【通配符模式】 ExchangeType.Headers 【參數比對模式】
//durable 是否持久化
//autoDelete 隊列是否為臨時隊列
//arguments 其他配置 詳見部落格:javascript:void(0)
//void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);
//聲明一個交換機
string Ename = "ExRabbitMQ";
channel.ExchangeDeclare(Ename, ExchangeType.Fanout, false, false, null);
//聲明廣播的隊列
string Qname_1 = "RabbitMQ_Queue_1";
string Qname_2 = "RabbitMQ_Queue_2";
channel.QueueDeclare(Qname_1, false, false, false, null);
channel.QueueDeclare(Qname_2, false, false, false, null);
//交換機 隊列 綁定
//queue 隊列名稱
//exchange 交換機名稱
//routingKey 路由規則
//void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
string routingKey = "EQ"; //路由比對規則 交換機通過routingKey廣播消息到綁定的隊列
channel.QueueBind(Qname_1, Ename, routingKey);
channel.QueueBind(Qname_2, Ename, routingKey);
//發送消息
for(int i = 0; i < 100; i++)
{
var messages = "I am RabbitMQ"; //傳遞的消息内容
//exchange 交換機,如果使用預設的交換機,那麼routingKey要和隊列的名稱一緻
//routingKey:路由
//basicProperties : 用于基礎屬性設定
///BasicPublish(this IModel model, string exchange, string routingKey, IBasicProperties basicProperties, ReadOnlyMemory<byte> body);
channel.BasicPublish(Ename, routingKey, null, Encoding.UTF8.GetBytes(messages+"_"+i)); //生産消息
}
}
}
Console.Read();
}
}
}
這裡需要說明的是,當釋出訂閱模式為廣播時,需要定義路由規則routingKey,綁定時,routingKey必須保持一緻,RabbitMQ廣播是通過routingKey進行隊列比對的。
消費者
廣播模式生産者代碼中,我們建立了兩個隊列,是以,我們需要兩個消費者分别消費不同隊列的消息,如下:
消費者1
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace RabbitMqConsumer
{
class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; //主機名
factory.UserName = "guest";//使用的使用者
factory.Password = "guest";//使用者密碼
factory.Port = 5672;//端口号
factory.VirtualHost = "/"; //虛拟主機
factory.MaxMessageSize = 1024; //消息最大位元組數
//建立連接配接
var connection = factory.CreateConnection();
//建立通道
var channel = connection.CreateModel();
//事件基本消費者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"RabbitMQConsumer_1 收到消息: {message}");
//确認該消息已被消費
channel.BasicAck(ea.DeliveryTag, false); Thread.Sleep(100);
};
//啟動消費者
string Qname = "RabbitMQ_Queue_1";
channel.BasicConsume(Qname, false, consumer);
Console.WriteLine("消費者已啟動");
Console.ReadKey();
channel.Dispose();
connection.Close();
}
}
}
消費者2
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
namespace RabbitMQConsumer_2
{
class Program
{
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "127.0.0.1"; //主機名
factory.UserName = "guest";//使用的使用者
factory.Password = "guest";//使用者密碼
factory.Port = 5672;//端口号
factory.VirtualHost = "/"; //虛拟主機
factory.MaxMessageSize = 1024; //消息最大位元組數
//建立連接配接
var connection = factory.CreateConnection();
//建立通道
var channel = connection.CreateModel();
//事件基本消費者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"RabbitMQConsumer_2 收到消息: {message}");
//确認該消息已被消費
channel.BasicAck(ea.DeliveryTag, false);
Thread.Sleep(100);
};
//啟動消費者
string Qname = "RabbitMQ_Queue_2";
channel.BasicConsume(Qname, false, consumer);
Console.WriteLine("消費者已啟動");
Console.ReadKey();
channel.Dispose();
connection.Close();
}
}
}
效果如下: