天天看點

.NET 雲原生架構師訓練營(子產品二 基礎鞏固 RabbitMQ 工作隊列和交換機)--學習筆記2.6.4 RabbitMQ -- 工作隊列和交換機

2.6.4 RabbitMQ -- 工作隊列和交換機

  • WorkQueue
  • Publish/Subscribe
  • Routing
  • EmitLog

WorkQueue:

https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html
  • 一個消息生産者,多個消息消費者
  • exchange 交換機自動恢複
  • 對消息進行持久化
  • 手動确認消息

var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
                     routingKey: "task_queue",
                     basicProperties: properties,
                     body: body);      

autoAck: false

channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);      

手動調用 BasicAck

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);      

修改接收端為手動确認消息

channel.BasicConsume(queue: "hello",
    autoAck: false,
    consumer: consumer);      

BasicAck

consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Thread.Sleep(2000);// 示範多個接收端
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine(" [x] Received {0}", message);
};      

啟動多個接收端

.NET 雲原生架構師訓練營(子產品二 基礎鞏固 RabbitMQ 工作隊列和交換機)--學習筆記2.6.4 RabbitMQ -- 工作隊列和交換機

Publish/Subscribe:

https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html

Fanout 交換機,每個隊列都會收到

channel.ExchangeDeclare("logs", ExchangeType.Fanout);      

Routing:

https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html

Bindings

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");      

Direct exchange

channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");      

建立控制台項目 EmitLogDirect,ReceiveLogsDirect

發送端

namespace EmitLogDirect
{
    class EmitLogDirect
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 聲明交換機
                var severity = (args.Length > 0) ? args[0] : "info";
                var message = (args.Length > 1)
                    ? string.Join(" ", args.Skip( 1 ).ToArray())
                    : "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "direct_logs",
                    routingKey: severity,// 路由 Key 自動帶上嚴重級别
                    basicProperties: null,
                    body: body);
                Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
            }
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}      
.NET 雲原生架構師訓練營(子產品二 基礎鞏固 RabbitMQ 工作隊列和交換機)--學習筆記2.6.4 RabbitMQ -- 工作隊列和交換機

error 級别單獨發送到一個隊列

接收端

namespace ReceiveLogsDirect
{
    class ReceiveLogsDirect
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);// 聲明交換機
                var queueName = channel.QueueDeclare().QueueName;
                if (args.Length < 1)
                {
                    Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                        Environment.GetCommandLineArgs()[0]);
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                    Environment.ExitCode = 1;
                    return;
                }
                foreach (var severity in args)
                {
                    channel.QueueBind(queue: queueName,
                        exchange: "direct_logs",
                        routingKey: severity);// 路由 Key 自動帶上嚴重級别
                }
                Console.WriteLine(" [*] Waiting for messages.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" [x] Received '{0}':'{1}'",
                        routingKey, message);
                };
                channel.BasicConsume(queue: queueName,
                    autoAck: true,
                    consumer: consumer);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}      

替換發送端,接收端的 localhost 為伺服器位址

接收端控制台啟動

dotnet run info waring error      

發送端控制台啟動

dotnet run info
dotnet run error
dotnet run waring test      

接收端輸出

[x] Received 'info':'Hello World!'
 [x] Received 'error':'Hello World!'
 [x] Received 'waring':'test'      

GitHub源碼連結:

https://github.com/MINGSON666/Personal-Learning-Library/tree/main/ArchitectTrainingCamp