本篇介紹一下RabbitMQ中的消費模式,在前邊的所有栗子中我們采用的消費者都是EventingBasicConsumer,其實RabbitMQ中還有其他兩種消費模式:BasicGet和QueueBaicConsumer,下邊介紹RabiitMQ的消費模式,及使用它們時需要注意的一些問題。
1 RabbitMQ的消費模式
0 準備工作
使用Web管理工具添加exchange、queue并綁定,bindingKey為“mykey”,如下所示:
生産者代碼如下:
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
//rabbitmq-server所在裝置ip,這裡就是本機
HostName = "127.0.0.1",
UserName = "wyy",//使用者名
Password = "123321"//密碼
};
//建立連接配接connection
using (var connection = factory.CreateConnection())
{
//建立通道channel
using (var channel = connection.CreateModel())
{
Console.WriteLine("生産者準備就緒....");
string message = "";
//在控制台輸入消息,按enter鍵發送消息
while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
{
message = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "myexchange",
routingKey: "mykey",
basicProperties: null,
body: body);
Console.WriteLine($"【{message}】發送到Broke成功!");
}
}
}
Console.ReadKey();
}
1 EventingBasicConsumer介紹
EventingBasicConsumer是釋出/訂閱模式的消費者,即隻要訂閱的queue中有了新消息,Broker就會立即把消息推送給消費者,這種模式可以保證消息及時地被消費者接收到。EventingBasicConsumer是長連接配接的:隻需要建立一個Connection,然後在Connection的基礎上建立通道channel,消息的發送都是通過channel來執行的,這樣可以減少Connection的建立,比較節省資源。前邊我們已經使用了很多次EventingBaiscConsumer,這裡簡單展示一下使用的方式,注釋比較詳細,就不多介紹了。
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
//rabbitmq-server所在裝置ip,這裡就是本機
HostName = "127.0.0.1",
UserName = "wyy",//使用者名
Password = "123321"//密碼
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
#region EventingBasicConsumer
//定義一個EventingBasicConsumer消費者
var consumer = new EventingBasicConsumer(channel);
//接收到消息時觸發的事件
consumer.Received += (model, ea) =>
{
Console.WriteLine(Encoding.UTF8.GetString(ea.Body));
};
Console.WriteLine("消費者準備就緒....");
//調用消費方法 queue指定消費的隊列,autoAck指定是否自動确認,consumer就是消費者對象
channel.BasicConsume(queue: "myqueue",
autoAck: true,
consumer: consumer);
Console.ReadKey();
#endregion
}
}
}
執行程式,結果如下,隻要我們在生産者端發送一條消息到Broker,Broker就會立即推送消息到消費者。
2 BasicGet方法介紹
我們知道使用EventingBasicConsumer可以讓消費者最及時地擷取到消息,使用EventingBasicConsumer模式時消費者在被動的接收消息,即消息是推送過來的,Broker是主動的一方。那麼能不能讓消費者作為主動的一方,消費者什麼時候想要消息了,就自己發送一個請求去找Broker要?答案使用Get方式。Get方式是短連接配接的,消費者每次想要消息的時候,首先建立一個Connection,發送一次請求,Broker接收到請求後,響應一條消息給消費者,然後斷開連接配接。RabbitMQ中Get方式和HTTP的請求響應流程基本一樣,Get方式的實時性比較差,也比較耗費資源。我們看一個Get方式的栗子:
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
//rabbitmq-server所在裝置ip,這裡就是本機
HostName = "127.0.0.1",
UserName = "wyy",//使用者名
Password = "123321"//密碼
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
#region BasicGet
//通過BasicGet擷取消息,開啟自動确認
BasicGetResult result = channel.BasicGet(queue:"myqueue",autoAck:true);
Console.WriteLine($"接收到消息【{Encoding.UTF8.GetString(result.Body)}】");
//列印exchange和routingKey
Console.WriteLine($"exchange:{result.Exchange},routingKey:{result.RoutingKey}");
Console.ReadLine();
#endregion
}
}
}
執行生成者和消費者程式,生産者發送三條消息,而消費者隻擷取了一條消息,這是因為channel.BasicGet()一次隻擷取一條消息,擷取到消息後就把連接配接斷開了。
補充:RabbitMQ還有一種消費者QueueBaicConsumer,用法和Get方式類似,QueueBaicConsumer在官方API中标記已過時,這裡不再介紹,有興趣的小夥伴可以自己研究下。
2 Qos介紹
在介紹Qos(服務品質)前我們先看一下使用EventingBasicConsumer的一個坑,使用代碼示範一下,簡單修改一下上邊栗子的代碼
生産者代碼如下,這裡生産者發送了100條消費到Broker
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
//rabbitmq-server所在裝置ip,這裡就是本機
HostName = "127.0.0.1",
UserName = "wyy",//使用者名
Password = "123321"//密碼
};
//建立連接配接connection
using (var connection = factory.CreateConnection())
{
//建立通道channel
using (var channel = connection.CreateModel())
{
Console.WriteLine("生産者準備就緒....");
#region 添加100條資料
for (int i = 0; i < 100; i++)
{
channel.BasicPublish(exchange: "myexchange",
routingKey: "mykey",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"第{i}條消息"));
}
#endregion
}
}
Console.ReadKey();
}
消費端代碼如下,消費端采用的是自動确認(autoAck=true),即Broker把消息發送給消費者就會确認成功,不關心消息有沒有處理完成,假設每條消息處理需要5s
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
//rabbitmq-server所在裝置ip,這裡就是本機
HostName = "127.0.0.1",
UserName = "wyy",//使用者名
Password = "123321"//密碼
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
#region EventingBasicConsumer
//定義消費者
var consumer = new EventingBasicConsumer(channel);
//接收到消息時執行的任務
consumer.Received += (model, ea) =>
{
Thread.Sleep(1000 * 5);
Console.WriteLine($"處理消息【{Encoding.UTF8.GetString(ea.Body)}】完成");
};
Console.WriteLine("消費者準備就緒....");
//處理消息
channel.BasicConsume(queue: "myqueue",
autoAck: true,
consumer: consumer);
Console.ReadKey();
#endregion
}
}
}
我們先執行生産者程式,執行完成後發現queue中有了100條ready狀态的消息,表示消息成功發送到了隊列
接着我們執行消費者,消費者執行後,Broker會把消息一股腦發送過去,通過Web管理界面我們看到queue中已經沒有消息了,如下:
我們再看一下消費者的執行情況,發現消費者僅僅處理了4條消息,還有96條消息沒有處理,這就是說消費者沒有處理完消息,但是queue中的消息都已經删除了。如果這時消費者挂掉了,所有未處理的消息都會丢失,在某些場合中,丢失資料的後果是十分嚴重的。
對于上邊的問題,我們可能會想到使用顯示确認來保證消息不會丢失:将BasicConsume方法的autoAck設定為false,然後處理一條消息後手動确認一下,這樣的話已處理的消息在接收到确認回執時被删除,未處理的消息以Unacked狀态存放在queue中。如果消費者挂了,Unacked狀态的消息會自動重新變成Ready狀态,如此一來就不用擔心消息丢失了,修改消費者代碼如下:
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
//rabbitmq-server所在裝置ip,這裡就是本機
HostName = "127.0.0.1",
UserName = "wyy",//使用者名
Password = "123321"//密碼
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
#region EventingBasicConsumer
//定義消費者
var consumer = new EventingBasicConsumer(channel);
//接收到消息時執行的任務
consumer.Received += (model, ea) =>
{
Thread.Sleep(1000 * 5);
//處理完成,手動确認
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine($"處理消息【{Encoding.UTF8.GetString(ea.Body)}】完成");
};
Console.WriteLine("消費者準備就緒....");
//處理消息
channel.BasicConsume(queue: "myqueue",
autoAck: false,
consumer: consumer);
Console.ReadKey();
#endregion
}
}
}
重新執行生産者,然後執行消費者,Web管理其中看到結果如下:在執行消費者時,消息會一股腦的發送給消費者,然後狀态都變成Uncacked,消費者執行完一條資料手動确認後,這條消息從queue中删除。當消費者挂了(我們可以直接把消費者關掉來模拟挂掉的情況),沒有處理的消息會自動從Unacked狀态變成Ready狀态,不用擔心消息丢失了!打開Web管理界面看到狀态如下:
通過顯式确認的方式可以解決消息丢失的問題,但這種方式也存在一些問題:①當消息有十萬,百萬條時,一股腦的把消息發送給消費者,可能會造成消費者記憶體爆滿;②當消息處理比較慢的時,單一的消費者處理這些消息可能很長時間,我們自然想到再添加一個消費者加快消息的處理速度,但是這些消息都被原來的消費者接受了,狀态為Unacked,是以這些消息不會再發送給新添加的消費者。針對這些問題怎麼去解決呢?
RabbitMQ提供的Qos(服務品質)可以完美解決上邊的問題,使用Qos時,Broker不會再把消息一股腦的發送給消費者,我們可以設定每次傳輸給消費者的消息條數n,消費者把這n條消息處理完成後,再擷取n條資料進行處理,這樣就不用擔心消息丢失、服務端記憶體爆滿的問題了,因為沒有發送的消息狀态都是Ready,是以當我們新增一個消費者時,消息也可以立即發送給新增的消費者。注意Qos隻有在消費端使用顯示确認時采用效,使用Qos的方式十分簡單,在消費端調用 channel.BasicQos() 方法即可,修改服務端代碼如下:
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
//rabbitmq-server所在裝置ip,這裡就是本機
HostName = "127.0.0.1",
UserName = "wyy",//使用者名
Password = "123321"//密碼
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false);
#region EventingBasicConsumer
//定義消費者
var consumer = new EventingBasicConsumer(channel);
//接收到消息時執行的任務
consumer.Received += (model, ea) =>
{
Thread.Sleep(1000 * 5);
//處理完成,手動确認
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine($"處理消息【{Encoding.UTF8.GetString(ea.Body)}】完成");
};
Console.WriteLine("消費者準備就緒....");
//處理消息
channel.BasicConsume(queue: "myqueue",
autoAck: false,
consumer: consumer);
Console.ReadKey();
#endregion
}
}
}
清空一下queue中的消息,重新啟動生産者,然後啟動消費者,打開Web管理界面,看到狀态如下所示:
channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false) 方法中參數prefetchSize為預取的長度,一般設定為0即可,表示長度不限;prefetchCount表示預取的條數,即發送的最大消息條數;global表示是否在Connection中全局設定,true表示Connetion下的所有channel都設定為這個配置。
3 小結
本節示範了RabbitMQ的兩種消費者:EventingBasicConsumer和BasicGet。EventingBasicConsumer是基于長連接配接,釋出訂閱模式的消費方式,節省資源且實時性好,這是開發中最常用的消費模式。在一些需要消費者主動擷取消息的場合,我們可以使用Get方式,Get方式是基于短連接配接的,請求響應模式的消費方式。
Qos可以設定消費者一次接收消息的最大條數,能夠解決消息擁堵時造成的消費者記憶體爆滿問題。Qos也比較适用于耗時任務隊列,當任務隊列中的任務很多時,使用Qos後我們可以随時添加新的消費者來提高任務的處理效率。