using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using System.Collections;
using Newtonsoft.Json.Linq;
using System.Collections.Generic;
namespace ServiceBusConsole
{
internal class Program
{
// connection string to your Service Bus namespace
static string connectionString = "Endpoint=sb://";
// name of your Service Bus topic
static string topicName = "actieue";
static Queue<string> a;
// number of messages to be sent to the topic
private const int taskNumber = 8;
static void Main(string[] args)
{
Random random = new Random();
a = new Queue<string>();
for(int i = 0; i < taskNumber; i++)
{
JObject jo = new JObject();
jo["taskId"] = i;
jo["excutionTime"] = random.Next(10);
a.Enqueue(jo.ToString());
}
ArrayList b = new ArrayList();
b.Add(a.Dequeue());
sendMessage(b).Wait();
get().Wait();
}
public static async Task sendMessage(ArrayList msgs)
{
// the client that owns the connection and can be used to create senders and receivers
ServiceBusClient client;
// the sender used to publish messages to the topic
ServiceBusSender sender;
// The Service Bus client types are safe to cache and use as a singleton for the lifetime
// of the application, which is best practice when messages are being published or read
// regularly.
//
// Create the clients that we'll use for sending and processing messages.
client = new ServiceBusClient(connectionString);
sender = client.CreateSender(topicName);
// create a batch
ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();
foreach (Object o in msgs)
{
string msg = (string)o;
// try adding a message to the batch
ServiceBusMessage m = new ServiceBusMessage(msg);
JObject j = JObject.Parse(msg);
double d = double.Parse(j["excutionTime"].ToString());
m.ScheduledEnqueueTime = DateTimeOffset.Now.AddSeconds(d);
if (!messageBatch.TryAddMessage(m))
{
// if it is too large for the batch
throw new Exception($"The message msg is too large to fit in the batch.");
}
}
try
{
// Use the producer client to send the batch of messages to the Service Bus topic
await sender.SendMessagesAsync(messageBatch);
}
finally
{
// Calling DisposeAsync on client types is required to ensure that network
// resources and other unmanaged objects are properly cleaned up.
messageBatch.Dispose();
await sender.DisposeAsync();
await client.DisposeAsync();
}
}
public static async Task get()
{
ServiceBusClient client;
ServiceBusProcessor processor;
client = new ServiceBusClient(connectionString);
processor = client.CreateProcessor(topicName, new ServiceBusProcessorOptions());
try
{
// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;
// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;
// start processing
await processor.StartProcessingAsync();
Console.WriteLine("Start Task 0 is runing");
Console.WriteLine($"It is {DateTimeOffset.Now} now");
Console.ReadKey();
// stop processing
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
finally
{
// Calling DisposeAsync on client types is required to ensure that network
// resources and other unmanaged objects are properly cleaned up.
await processor.DisposeAsync();
await client.DisposeAsync();
}
}
static async Task MessageHandler(ProcessMessageEventArgs args)
{
ArrayList c = new ArrayList();
string body = args.Message.Body.ToString();
try
{
c.Add(a.Dequeue());
sendMessage(c).Wait();
}
catch (Exception ex)
{
}
Console.WriteLine($"At {DateTimeOffset.Now},This task is finished:\nReceived: {body} ");
// complete the message. messages is deleted from the subscription.
await args.CompleteMessageAsync(args.Message);
}
// handle any errors when receiving messages
static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
}
}