The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.
1. creater a publisher to create a queuevar factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var message = GetMessage(args);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
for (int i = 0; i < 20; i++)
{
var body = Encoding.UTF8.GetBytes(message + i.ToString());
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message + i.ToString());
}
}
2. create two consumers
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" Consumer1 Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(1000);
Console.WriteLine(" [x] Done");
// Note: it is possible to access the channel via
// ((EventingBasicConsumer)sender).Model here
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",
autoAck: false,
consumer: consumer);
3. attentions
4. Testing (1th sleep 2 seconds, 2th sleep 1 second )