.net core 下nuget :RabbitMQ.Client
API:https://www.rabbitmq.com/dotnet-api-guide.html
RabbitMQ建立exchange的方式一共有4中:direct,fanout,headers,topic.
direct:直接建立發送消息
fanout:将不通過路由,發送該exchange上所有綁定的隊列
topic:将通過路由比對,發送相應的隊列
public static class ExchangeType
{
//
// Summary:
// Exchange type used for AMQP direct exchanges.
public const string Direct = "direct";
//
// Summary:
// Exchange type used for AMQP fanout exchanges.
public const string Fanout = "fanout";
//
// Summary:
// Exchange type used for AMQP headers exchanges.
public const string Headers = "headers";
//
// Summary:
// Exchange type used for AMQP topic exchanges.
public const string Topic = "topic";
//
// Summary:
// Retrieve a collection containing all standard exchange types.
public static ICollection<string> All();
}
生産者代碼:
string exchangeName = "TestFanoutChange";
string queueName1 = "hello1";
string queueName2 = "hello2";
string routeKey = "";
//建立連接配接工廠
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//使用者名
Password = "guest",//密碼
HostName = "localhost"//rabbitmq ip
};
//建立連接配接
var connection = factory.CreateConnection();
//建立通道
var channel = connection.CreateModel();
//定義一個Direct類型交換機
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);//第三個參數表示交換機持久化
//定義隊列
channel.QueueDeclare(queueName1, true, false, false, null);//第二個參數表示隊列持久化
//将隊列綁定到交換機
channel.QueueBind(queueName1, exchangeName, routeKey, null);
IBasicProperties props = channel.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;//2代表隊列中的消息持久化
props.Expiration = "36000000";
Console.WriteLine($"\nRabbitMQ連接配接成功,\n\n請輸入消息,輸入exit退出!");
for (int i = 0; i < 1000; i++)
{
var sendBytes = Encoding.UTF8.GetBytes(i.ToString());
//釋出消息
channel.BasicPublish(exchangeName, routeKey, props, sendBytes);
}
channel.Close();
connection.Close();
消費者代碼:
//建立連接配接工廠
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//使用者名
Password = "guest",//密碼
HostName = "localhost"//rabbitmq ip
};
//建立連接配接
var connection = factory.CreateConnection();
//建立通道
var channel = connection.CreateModel();
channel.BasicQos(0, 1, false);
//事件基本消費者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine($"收到消息: {message}");
// Console.WriteLine($"收到該消息[{ea.DeliveryTag}] 延遲10s發送回執");
// Thread.Sleep(10000);
//确認該消息已被消費
channel.BasicAck(ea.DeliveryTag, false);
// Console.WriteLine($"已發送回執[{ea.DeliveryTag}]");
};
//啟動消費者 設定為手動應答消息
channel.BasicConsume("hello1", false, consumer);
Console.WriteLine("消費者已啟動");
Console.ReadKey();
channel.Dispose();
connection.Close();
持久化:将記憶體中的資料寫入到磁盤上
如果需要達到持久化的目的,必須要讓隊列持久化,然後再需要消息持久化
隊列持久化: channel.QueueDeclare(queueName1, true, false, false, null);//第二個參數表示隊列持久化
消息持久化: channel.QueueBind(queueName1, exchangeName, routeKey, null);
//channel.QueueBind(queueName2, exchangeName, routeKey, null);
IBasicProperties props = channel.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;//2代表隊列中的消息持久化
props.Expiration = "36000000";
消費确認:一般使用手動方式來進行消費确認,一旦消費确認,該條消息會從隊列中删除,如果出現業務邏輯執行到一半,機器挂了,由于沒有執行到消費确認,該條消息會被分發到其他機器上,這就是手動确認的好處。
分發方式是對于消費者來說,如果不配置,預設循環分發,意思是如果有2台消費者伺服器,會你一個我一個,但是這樣可能會出現一個問題,比如一台機器執行的業務很快,而另外一台因為伺服器記憶體小,執行速度慢,但是循環分發,導緻記憶體小的機器滿負荷,是以可以進行配置,讓代碼沒有執行到消費确認那一步的時候,不允許有新的消息被分發過來,可以這樣設定
var channel = connection.CreateModel();
channel.BasicQos(0, 1, false);//代碼沒有執行到消費确認那一步的時候,不允許有新的消息被分發過來
//事件基本消費者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
代碼:https://github.com/xiaomifengmaidi/RabbmitMQ