天天看點

RabbitMQ 生産者,消費者,持久化,分發方式

.net core 下nuget :RabbitMQ.Client

API:https://www.rabbitmq.com/dotnet-api-guide.html

RabbitMQ建立exchange的方式一共有4中:direct,fanout,headers,topic.

direct:直接建立發送消息

fanout:将不通過路由,發送該exchange上所有綁定的隊列

topic:将通過路由比對,發送相應的隊列

public static class ExchangeType
    {
        //
        // Summary:
        //     Exchange type used for AMQP direct exchanges.
        public const string Direct = "direct";
        //
        // Summary:
        //     Exchange type used for AMQP fanout exchanges.
        public const string Fanout = "fanout";
        //
        // Summary:
        //     Exchange type used for AMQP headers exchanges.
        public const string Headers = "headers";
        //
        // Summary:
        //     Exchange type used for AMQP topic exchanges.
        public const string Topic = "topic";

        //
        // Summary:
        //     Retrieve a collection containing all standard exchange types.
        public static ICollection<string> All();
    }
           

生産者代碼:

string exchangeName = "TestFanoutChange";
            string queueName1 = "hello1";
            string queueName2 = "hello2";
            string routeKey = "";

            //建立連接配接工廠
            ConnectionFactory factory = new ConnectionFactory
            {
                UserName = "guest",//使用者名
                Password = "guest",//密碼
                HostName = "localhost"//rabbitmq ip
            };

            //建立連接配接
            var connection = factory.CreateConnection();
            //建立通道
            var channel = connection.CreateModel();

            //定義一個Direct類型交換機
            channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);//第三個參數表示交換機持久化

            //定義隊列
            channel.QueueDeclare(queueName1, true, false, false, null);//第二個參數表示隊列持久化
           

            //将隊列綁定到交換機
            channel.QueueBind(queueName1, exchangeName, routeKey, null);
           
            IBasicProperties props = channel.CreateBasicProperties();
            props.ContentType = "text/plain";
            props.DeliveryMode = 2;//2代表隊列中的消息持久化
            props.Expiration = "36000000";
           

            Console.WriteLine($"\nRabbitMQ連接配接成功,\n\n請輸入消息,輸入exit退出!");

           
            for (int i = 0; i < 1000; i++)
            {
                var sendBytes = Encoding.UTF8.GetBytes(i.ToString());
                //釋出消息

                channel.BasicPublish(exchangeName, routeKey, props, sendBytes);
            }
            channel.Close();
            connection.Close();
           

 消費者代碼:

//建立連接配接工廠
            ConnectionFactory factory = new ConnectionFactory
            {
                UserName = "guest",//使用者名
                Password = "guest",//密碼
                HostName = "localhost"//rabbitmq ip
            };

            //建立連接配接
            var connection = factory.CreateConnection();
            //建立通道
            var channel = connection.CreateModel();
            channel.BasicQos(0, 1, false);
            //事件基本消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //接收到消息事件
            consumer.Received += (ch, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);

                Console.WriteLine($"收到消息: {message}");

                //				Console.WriteLine($"收到該消息[{ea.DeliveryTag}] 延遲10s發送回執");
                //				Thread.Sleep(10000);
                //确認該消息已被消費
                channel.BasicAck(ea.DeliveryTag, false);
                //		        Console.WriteLine($"已發送回執[{ea.DeliveryTag}]");
            };
            //啟動消費者 設定為手動應答消息
            channel.BasicConsume("hello1", false, consumer);
            Console.WriteLine("消費者已啟動");
            Console.ReadKey();
            channel.Dispose();
            connection.Close();
           

持久化:将記憶體中的資料寫入到磁盤上

如果需要達到持久化的目的,必須要讓隊列持久化,然後再需要消息持久化

隊列持久化: channel.QueueDeclare(queueName1, true, false, false, null);//第二個參數表示隊列持久化

消息持久化:  channel.QueueBind(queueName1, exchangeName, routeKey, null);

            //channel.QueueBind(queueName2, exchangeName, routeKey, null);

            IBasicProperties props = channel.CreateBasicProperties();

            props.ContentType = "text/plain";

            props.DeliveryMode = 2;//2代表隊列中的消息持久化

            props.Expiration = "36000000";

消費确認:一般使用手動方式來進行消費确認,一旦消費确認,該條消息會從隊列中删除,如果出現業務邏輯執行到一半,機器挂了,由于沒有執行到消費确認,該條消息會被分發到其他機器上,這就是手動确認的好處。

分發方式是對于消費者來說,如果不配置,預設循環分發,意思是如果有2台消費者伺服器,會你一個我一個,但是這樣可能會出現一個問題,比如一台機器執行的業務很快,而另外一台因為伺服器記憶體小,執行速度慢,但是循環分發,導緻記憶體小的機器滿負荷,是以可以進行配置,讓代碼沒有執行到消費确認那一步的時候,不允許有新的消息被分發過來,可以這樣設定

  var channel = connection.CreateModel();

            channel.BasicQos(0, 1, false);//代碼沒有執行到消費确認那一步的時候,不允許有新的消息被分發過來

            //事件基本消費者

            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

 代碼:https://github.com/xiaomifengmaidi/RabbmitMQ