上次我們介紹了在單機、叢集下高并發場景可以選擇的一些方案,傳送門: 高并發場景之一般解決方案
但是也發現了一些問題,比如叢集下使用ConcurrentQueue或加鎖都不能解決問題,後來采用Redis隊列也不能完全解決問題,
因為使用Redis要自己實作分布式鎖
這次我們來了解一下一個專門處理隊列的元件:RabbitMQ,這個東西天生支援分布式隊列。
下面我們來用RabbitMQ來實作上一篇的場景
一、建立RabbitMQ.Receive
private static ConnectionFactory factory = new ConnectionFactory
{ HostName = "192.168.1.109", UserName = "ljr", Password = "root", VirtualHost = "/" };

1 static void Main(string[] args)
2 {
3 using (var connection = factory.CreateConnection())
4 {
5 using (var channel = connection.CreateModel())
6 {
7 var consumer = new EventingBasicConsumer();
8 consumer.Received += (model, ea) =>
9 {
10 var body = ea.Body;
11 var message = Encoding.UTF8.GetString(body);
12 Console.WriteLine(" [x] Received {0}", message);
13
14 var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString();
15 var value = int.Parse(total) + 1;
16
17 DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null);
18 };
19
20 channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null);
21 channel.BasicConsume(queue: "queueName", noAck: true, consumer: consumer);
22
23 Console.WriteLine(" Press [enter] to exit.");
24 Console.ReadLine();
25 }
26 }
27 }

二、建立RabbitMQ.Send

1 static void Main(string[] args)
2 {
3 for (int i = 1; i <= 500; i++)
4 {
5 Task.Run(async () =>
6 {
7 await Produce();
8 });
9
10 Console.WriteLine(i);
11 }
12
13 Console.ReadKey();
14 }
15
16 public static Task Produce()
17 {
18 return Task.Factory.StartNew(() =>
19 {
20 using (var connection = factory.CreateConnection())
21 {
22 using (var channel = connection.CreateModel())
23 {
24 var body = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
25 channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null);
26 channel.BasicPublish(exchange: "", routingKey: "queueName", basicProperties: null, body: body);
27 }
28 }
29 });
30 }

這裡是模拟500個使用者請求,正常的話最後Total就等于500
我們來說試試看,運作程式
2.1、打開接收端
2.2 運作用戶端
2.3、可以看到2邊幾乎是實時的,再去看看資料庫
三、我們在叢集裡執行
最後資料是1000
完全沒有沖突,好了,就是這樣 、。