天天看點

高并發場景之RabbitMQ篇

上次我們介紹了在單機、叢集下高并發場景可以選擇的一些方案,傳送門: 高并發場景之一般解決方案

但是也發現了一些問題,比如叢集下使用ConcurrentQueue或加鎖都不能解決問題,後來采用Redis隊列也不能完全解決問題,

因為使用Redis要自己實作分布式鎖

這次我們來了解一下一個專門處理隊列的元件:RabbitMQ,這個東西天生支援分布式隊列。

下面我們來用RabbitMQ來實作上一篇的場景

一、建立RabbitMQ.Receive

private static ConnectionFactory factory = new ConnectionFactory 
{ HostName = "192.168.1.109", UserName = "ljr", Password = "root", VirtualHost = "/" };      
高并發場景之RabbitMQ篇
1         static void Main(string[] args)
 2         {
 3             using (var connection = factory.CreateConnection())
 4             {
 5                 using (var channel = connection.CreateModel())
 6                 {
 7                     var consumer = new EventingBasicConsumer();
 8                     consumer.Received += (model, ea) =>
 9                     {
10                         var body = ea.Body;
11                         var message = Encoding.UTF8.GetString(body);
12                         Console.WriteLine(" [x] Received {0}", message);
13 
14                         var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString();
15                         var value = int.Parse(total) + 1;
16 
17                         DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null);
18                     };
19 
20                     channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null);
21                     channel.BasicConsume(queue: "queueName", noAck: true, consumer: consumer);
22 
23                     Console.WriteLine(" Press [enter] to exit.");
24                     Console.ReadLine();
25                 }
26             }
27         }      
高并發場景之RabbitMQ篇

二、建立RabbitMQ.Send  

高并發場景之RabbitMQ篇
1         static void Main(string[] args)
 2         {
 3             for (int i = 1; i <= 500; i++)
 4             {
 5                 Task.Run(async () =>
 6                 {
 7                     await Produce();
 8                 });
 9 
10                 Console.WriteLine(i);
11             }
12 
13             Console.ReadKey();
14         }
15 
16         public static Task Produce()
17         {
18             return Task.Factory.StartNew(() =>
19             {
20                 using (var connection = factory.CreateConnection())
21                 {
22                     using (var channel = connection.CreateModel())
23                     {
24                         var body = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
25                         channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null);
26                         channel.BasicPublish(exchange: "", routingKey: "queueName", basicProperties: null, body: body);
27                     }
28                 }
29             });
30         }      
高并發場景之RabbitMQ篇

這裡是模拟500個使用者請求,正常的話最後Total就等于500

我們來說試試看,運作程式

2.1、打開接收端

高并發場景之RabbitMQ篇

2.2 運作用戶端

高并發場景之RabbitMQ篇

2.3、可以看到2邊幾乎是實時的,再去看看資料庫

高并發場景之RabbitMQ篇

三、我們在叢集裡執行 

高并發場景之RabbitMQ篇
高并發場景之RabbitMQ篇

最後資料是1000

高并發場景之RabbitMQ篇

 完全沒有沖突,好了,就是這樣 、。