天天看點

.net RabbitMQ的幾種工作模式.net RabbitMQ的幾種工作模式

文章目錄

  • .net RabbitMQ的幾種工作模式
    • "Hello World"
        • demo
    • "Work queues"
        • demo
    • "Publish/Subscribe"
        • demo
    • "Routing"
        • demo
    • "Topics"
        • demo

.net RabbitMQ的幾種工作模式

檢視官網https://www.rabbitmq.com/getstarted.html可以看到RabbitMQ的幾種工作模式

.net RabbitMQ的幾種工作模式.net RabbitMQ的幾種工作模式

“Hello World”

.net RabbitMQ的幾種工作模式.net RabbitMQ的幾種工作模式

一個生産者一個隊列一個消費者,生産者生産消息放入隊列,消費者從隊列中接收消息消費

最基本的模式

demo

代碼示範見這篇文章

“Work queues”

.net RabbitMQ的幾種工作模式.net RabbitMQ的幾種工作模式

一個生産者一個隊列多個消費者,生産者生産消費放入隊列,消費者們從隊列中接收消息消費,消費者之間存在競争關系,也就是說,一個消息,一旦被A消費者消費,就不能被B消費者消費了。

這樣工作模式的好處是,假設一個生産者1s生産2000條消息,而一個消費者1s隻能消費1000條消息,為了提高消費的速度,可以設定多個消費者來對同一個隊列進行消費

demo

生産者

using System;
using RabbitMQ.Client;
using System.Text;

namespace RabbitMQSender
{
  public class Send
  {
    public static void Main()
    {
      // 建立連接配接工廠ConnectionFactory
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //建立連接配接
      using (IConnection connection = connectionFactory.CreateConnection())
      //建立管道
      using (IModel channel = connection.CreateModel())
      {
        //建立隊列
        channel.QueueDeclare(queue: "queue",
                           durable: false, //是否持久化
                           exclusive: false, //是否獨占
                           autoDelete: false, //是否自動删除
                           arguments: null); //參數

        //建立消息
        string message = "Hello World!";

        for (int i = 1; i <= 10; i++)
        {
          //釋出消息
          channel.BasicPublish(exchange: "", //指定采用哪個交換機
                               routingKey: "queue", //路由鍵
                               basicProperties: null,
                               body: Encoding.Default.GetBytes($"第{i}條消息:" + message));
          Console.WriteLine(" [x] Sent {0}", message);
        }
      }
    }
  }
}
           

消費者1

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive
  {
    public static void Main()
    {
      //建立連接配接工廠
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //建立預設連接配接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //建立管道
        using (IModel channel = connection.CreateModel())
        {
          //建立隊列
          channel.QueueDeclare(
            queue: "queue",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //建立消費者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定義回調事件,每次接收消息時觸發
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消費消息
          channel.BasicConsume(queue: "queue",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}
           

消費者2

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive2
  {
    public static void Main()
    {
      //建立連接配接工廠
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //建立預設連接配接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //建立管道
        using (IModel channel = connection.CreateModel())
        {
          //建立隊列
          channel.QueueDeclare(
            queue: "queue",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //建立消費者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定義回調事件,每次接收消息時觸發
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消費消息
          channel.BasicConsume(queue: "queue",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}
           
.net RabbitMQ的幾種工作模式.net RabbitMQ的幾種工作模式

先運作消費者1和消費者2

再運作生産者,發現消費者1接收到了第1、3、5、7、9條消息,消費者2接收到了第2、4、6、8、10條消息

“Publish/Subscribe”

.net RabbitMQ的幾種工作模式.net RabbitMQ的幾種工作模式

一個生産者多個隊列多個消費者,一個隊列對應一個消費者,生産者生産的消息将放進每個隊列

消費者隻關心它的隊列并将其中的消息消費,消費者之間不存在競争關系。

如果不同的消費者收到同一個消息後需要執行不同的操作,例如消費者A接收到X消息後需要在控制台列印,而消費者B接收到X消息後需要将X消息存入資料庫。像這種情形下,"Work queues"模式肯定是行不通的,因為"Work queues"中的消息隻能被一個消費者處理,這意味着消費者A接收到X消息後就會把X消息消費掉了,那麼消費者B就接收不到X消息了

demo

要實作這種模式,需要将交換機模式設定為扇形模式(ExchangeType.Fanout),這樣它就會将消息分發到每一個與之綁定的隊列中

生産者

using System;
using RabbitMQ.Client;
using System.Text;

namespace RabbitMQSender
{
  public class Send
  {
    public static void Main()
    {
      // 建立連接配接工廠ConnectionFactory
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //建立連接配接
      using (IConnection connection = connectionFactory.CreateConnection())
      //建立管道
      using (IModel channel = connection.CreateModel())
      {
        //建立交換機
        string exchange = "exchange1";
        //交換機采用扇形模式
        channel.ExchangeDeclare(exchange, ExchangeType.Fanout, false);

        //建立隊列1
        channel.QueueDeclare(queue: "queue1",
                           durable: false, //是否持久化
                           exclusive: false, //是否獨占
                           autoDelete: false, //是否自動删除
                           arguments: null); //參數
        //建立隊列2
        channel.QueueDeclare(queue: "queue2",
                           durable: false, //是否持久化
                           exclusive: false, //是否獨占
                           autoDelete: false, //是否自動删除
                           arguments: null); //參數

        //如果交換機采用扇形模式,則routeKey置為""
        channel.QueueBind("queue1", exchange, "");
        channel.QueueBind("queue2", exchange, "");

        //建立消息
        string message = "Hello World!";

        for (int i = 1; i <= 10; i++)
        {
          //釋出消息
          channel.BasicPublish(exchange: exchange, //指定采用哪個交換機
                               routingKey: "", //路由鍵
                               basicProperties: null,
                               body: Encoding.Default.GetBytes($"第{i}條消息:" + message));
          Console.WriteLine(" [x] Sent {0}", message);
        }
      }
    }
  }
}
           

消費者1

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive
  {
    public static void Main()
    {
      //建立連接配接工廠
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //建立預設連接配接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //建立管道
        using (IModel channel = connection.CreateModel())
        {
          //建立隊列
          channel.QueueDeclare(
            queue: "queue1",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //建立消費者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定義回調事件,每次接收消息時觸發
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消費消息
          channel.BasicConsume(queue: "queue1",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}
           

消費者2

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive2
  {
    public static void Main()
    {
      //建立連接配接工廠
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //建立預設連接配接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //建立管道
        using (IModel channel = connection.CreateModel())
        {
          //建立隊列
          channel.QueueDeclare(
            queue: "queue2",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //建立消費者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定義回調事件,每次接收消息時觸發
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消費消息
          channel.BasicConsume(queue: "queue2",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}
           

先運作消費者1和消費者2

再運作生産者,發現消費者1和消費者2都接收到了生産者生産的10條消息

.net RabbitMQ的幾種工作模式.net RabbitMQ的幾種工作模式

“Routing”

.net RabbitMQ的幾種工作模式.net RabbitMQ的幾種工作模式

一個生産者多個隊列多個消費者,一個隊列對應一個消費者,生産者生産的消息不是簡單的放入每個隊列,而是根據不同的條件,由交換機來決定将消息放入哪個消息隊列中。

例如不太重要的消息隻需要在控制台列印即可,不需要存資料庫了,

而重要的消息既需要在控制台列印,還需要存資料庫

demo

要實作這種模式,需要将交換機模式設定為直連模式(ExchangeType.Direct),

并配置路由規則,也就是指定哪個routingKey對應哪個消息隊列

在釋出消息時需要指定routingKey,這樣交換機就能根據路由規則來進行消息的分發了

生産者

using System;
using RabbitMQ.Client;
using System.Text;

namespace RabbitMQSender
{
  public class Send
  {
    public static void Main()
    {
      // 建立連接配接工廠ConnectionFactory
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //建立連接配接
      using (IConnection connection = connectionFactory.CreateConnection())
      //建立管道
      using (IModel channel = connection.CreateModel())
      {
        //建立交換機
        string exchange = "exchange1";
        //交換機采用直連模式
        channel.ExchangeDeclare(exchange, ExchangeType.Direct, false);

        //建立隊列1
        channel.QueueDeclare(queue: "queue1",
                           durable: false, //是否持久化
                           exclusive: false, //是否獨占
                           autoDelete: false, //是否自動删除
                           arguments: null); //參數
        //建立隊列2
        channel.QueueDeclare(queue: "queue2",
                           durable: false, //是否持久化
                           exclusive: false, //是否獨占
                           autoDelete: false, //是否自動删除
                           arguments: null); //參數

        //配置路由
        //routingKey為key1的消息将放入隊列queue1中
        channel.QueueBind("queue1", exchange, "key1");
        //routingKey為key2的消息将放入隊列queue1和queue2中
        channel.QueueBind("queue1", exchange, "key2"); 
        channel.QueueBind("queue2", exchange, "key2"); 

        //建立消息
        string message = "Hello World!";

        for (int i = 1; i <= 10; i++)
        {
          //釋出消息
          channel.BasicPublish(exchange: exchange, //指定采用哪個交換機
                               routingKey: "key1", //路由鍵
                               basicProperties: null,
                               body: Encoding.Default.GetBytes($"第{i}條key為key1的消息:" + message));
          Console.WriteLine(" [x] Sent {0} to queue1", message);
        }

        for (int i = 1; i <= 5; i++)
        {
          //釋出消息
          channel.BasicPublish(exchange: exchange, //指定采用哪個交換機
                               routingKey: "key2", //路由鍵
                               basicProperties: null,
                               body: Encoding.Default.GetBytes($"第{i}條key為key2的消息:" + message));
          Console.WriteLine(" [x] Sent {0} to queue2", message);
        }
      }
    }
  }
}
           

消費者1

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive
  {
    public static void Main()
    {
      //建立連接配接工廠
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //建立預設連接配接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //建立管道
        using (IModel channel = connection.CreateModel())
        {
          //建立隊列
          channel.QueueDeclare(
            queue: "queue1",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //建立消費者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定義回調事件,每次接收消息時觸發
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消費消息
          channel.BasicConsume(queue: "queue1",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}
           

消費者2

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive2
  {
    public static void Main()
    {
      //建立連接配接工廠
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //建立預設連接配接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //建立管道
        using (IModel channel = connection.CreateModel())
        {
          //建立隊列
          channel.QueueDeclare(
            queue: "queue2",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //建立消費者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定義回調事件,每次接收消息時觸發
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消費消息
          channel.BasicConsume(queue: "queue2",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}
           

先運作消費者1和消費者2

再運作生産者,發現

消費者1接收到了key為key1的10條消息以及key為key2的5條消息

消費者2隻接收key為key2的5條消息

.net RabbitMQ的幾種工作模式.net RabbitMQ的幾種工作模式

“Topics”

.net RabbitMQ的幾種工作模式.net RabbitMQ的幾種工作模式

Topic模式和Routing模式的工作模式類似,也是一個生産者多個隊列多個消費者,一個隊列對應一個消費者,根據不同的條件,由交換機來決定将消息放入哪個消息隊列中

差別在于Routing模式是精确比對,也就是說一個routingKey為“key1”的消息,隻能比對到綁定了"key1"的隊列

而Topic模式下可以進行模糊比對

一個綁定了"Word.*“的隊列,既能比對routingKey為"Word.A"的消息,還能比對routingKey為"Word.B”、"Word.C"的消息

*是一種通配符,可以比對一個單詞

#是另一種通配符,可以比對零個或多個單詞

這是RabbitMQ官網的解釋

.net RabbitMQ的幾種工作模式.net RabbitMQ的幾種工作模式

為了說明這些比對規則,官網還給出了一些例子,那我就不再舉例了

We created three bindings: Q1 is bound with binding key “.orange.” and Q2 with “..rabbit” and “lazy.#”.

These bindings can be summarised as:

Q1 is interested in all the orange animals.

Q2 wants to hear everything about rabbits, and everything about lazy animals.

A message with a routing key set to “quick.orange.rabbit” will be delivered to both queues. Message “lazy.orange.elephant” also will go to both of them. On the other hand “quick.orange.fox” will only go to the first queue, and “lazy.brown.fox” only to the second. “lazy.pink.rabbit” will be delivered to the second queue only once, even though it matches two bindings. “quick.brown.fox” doesn’t match any binding so it will be discarded.

What happens if we break our contract and send a message with one or four words, like “orange” or “quick.orange.male.rabbit”? Well, these messages won’t match any bindings and will be lost.

On the other hand “lazy.orange.male.rabbit”, even though it has four words, will match the last binding and will be delivered to the second queue.

demo

要實作這種模式,需要将交換機模式設定為Topic模式(ExchangeType.Topic),這樣才能使用通配符*或#進行模糊比對

生産者:生産帶有不同routingKey的消息

using System;
using RabbitMQ.Client;
using System.Text;

namespace RabbitMQSender
{
  public class Send
  {
    public static void Main()
    {
      // 建立連接配接工廠ConnectionFactory
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //建立連接配接
      using (IConnection connection = connectionFactory.CreateConnection())
      //建立管道
      using (IModel channel = connection.CreateModel())
      {
        //建立交換機
        string exchange = "exchange1";
        //交換機采用Topic模式
        channel.ExchangeDeclare(exchange, ExchangeType.Topic, false);

        //建立隊列1
        channel.QueueDeclare(queue: "queue1",
                           durable: false, //是否持久化
                           exclusive: false, //是否獨占
                           autoDelete: false, //是否自動删除
                           arguments: null); //參數
        //建立隊列2
        channel.QueueDeclare(queue: "queue2",
                           durable: false, //是否持久化
                           exclusive: false, //是否獨占
                           autoDelete: false, //是否自動删除
                           arguments: null); //參數

        //配置路由
        channel.QueueBind("queue1", exchange, "key1.*");
        channel.QueueBind("queue1", exchange, "key2");
        channel.QueueBind("queue2", exchange, "key2.#");
        channel.QueueBind("queue2", exchange, "*.key2.#");

        //建立消息
        string message = "Hello World!";

        //釋出消息
        channel.BasicPublish(exchange: exchange, //指定采用哪個交換機
                             routingKey: "key1.A", //路由鍵
                             basicProperties: null,
                             body: Encoding.Default.GetBytes($"key為key1.A的消息:" + message));
        channel.BasicPublish(exchange: exchange, //指定采用哪個交換機
                             routingKey: "key1.B", //路由鍵
                             basicProperties: null,
                             body: Encoding.Default.GetBytes($"key為key1.B的消息:" + message));
        channel.BasicPublish(exchange: exchange, //指定采用哪個交換機
                             routingKey: "key2", //路由鍵
                             basicProperties: null,
                             body: Encoding.Default.GetBytes($"key為key2的消息:" + message));
        channel.BasicPublish(exchange: exchange, //指定采用哪個交換機
                             routingKey: "key2.A.B.C", //路由鍵
                             basicProperties: null,
                             body: Encoding.Default.GetBytes($"key為key2.A.B.C的消息:" + message));
        channel.BasicPublish(exchange: exchange, //指定采用哪個交換機
                             routingKey: "key1.key2.A", //路由鍵
                             basicProperties: null,
                             body: Encoding.Default.GetBytes($"key為key1.key2.A的消息:" + message));
      }
    }
  }
}
           

消費者1

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive
  {
    public static void Main()
    {
      //建立連接配接工廠
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //建立預設連接配接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //建立管道
        using (IModel channel = connection.CreateModel())
        {
          //建立隊列
          channel.QueueDeclare(
            queue: "queue1",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //建立消費者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定義回調事件,每次接收消息時觸發
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消費消息
          channel.BasicConsume(queue: "queue1",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}
           

消費者2

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace RabbitMQReceiver
{
  class Receive2
  {
    public static void Main()
    {
      //建立連接配接工廠
      ConnectionFactory connectionFactory = new ConnectionFactory();
      //建立預設連接配接
      using (IConnection connection = connectionFactory.CreateConnection())
      {

        //建立管道
        using (IModel channel = connection.CreateModel())
        {
          //建立隊列
          channel.QueueDeclare(
            queue: "queue2",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

          //建立消費者
          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
          //定義回調事件,每次接收消息時觸發
          consumer.Received += (model, basicDeliverEventArgs) =>
          {
            ReadOnlyMemory<byte> body = basicDeliverEventArgs.Body;
            string message = Encoding.Default.GetString(body.ToArray());
            Console.WriteLine(" [x] Received {0}", message);
          };
          //消費消息
          channel.BasicConsume(queue: "queue2",
                               autoAck: true,
                               consumer: consumer);
          Console.ReadLine();
        }
      }
    }
  }
}
           

先運作消費者1和消費者2

再運作生産者,發現不同routingKey的消息被分發到了與之比對的隊列中

.net RabbitMQ的幾種工作模式.net RabbitMQ的幾種工作模式