天天看點

Azure Service Bus 實作定時器以及按序執行任務

using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using System.Collections;
using Newtonsoft.Json.Linq;
using System.Collections.Generic;

namespace ServiceBusConsole
{
    internal class Program
    {
        // connection string to your Service Bus namespace
        static string connectionString = "Endpoint=sb://";

        // name of your Service Bus topic
        static string topicName = "actieue";

        static Queue<string> a;
        // number of messages to be sent to the topic
        private const int taskNumber = 8;

        static void Main(string[] args)
        {
            Random random = new Random();
            a = new Queue<string>();
            for(int i = 0; i < taskNumber; i++)
            {
                JObject jo = new JObject();
                jo["taskId"] = i;
                jo["excutionTime"] = random.Next(10);
                a.Enqueue(jo.ToString());
            }
            ArrayList b = new ArrayList();
            b.Add(a.Dequeue());
            sendMessage(b).Wait();
            get().Wait();
        }
        public static async Task sendMessage(ArrayList msgs)
        {
            // the client that owns the connection and can be used to create senders and receivers
            ServiceBusClient client;

            // the sender used to publish messages to the topic
            ServiceBusSender sender;

            // The Service Bus client types are safe to cache and use as a singleton for the lifetime
            // of the application, which is best practice when messages are being published or read
            // regularly.
            //
            // Create the clients that we'll use for sending and processing messages.
            client = new ServiceBusClient(connectionString);
            sender = client.CreateSender(topicName);

            // create a batch 
            ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();

            foreach (Object o in msgs)
            {
                string msg = (string)o;
                // try adding a message to the batch
                ServiceBusMessage m = new ServiceBusMessage(msg);
                JObject j = JObject.Parse(msg);
                double d = double.Parse(j["excutionTime"].ToString());
                m.ScheduledEnqueueTime = DateTimeOffset.Now.AddSeconds(d);
                if (!messageBatch.TryAddMessage(m))
                {
                    // if it is too large for the batch
                    throw new Exception($"The message msg is too large to fit in the batch.");
                }
            }

            try
            {
                // Use the producer client to send the batch of messages to the Service Bus topic
                await sender.SendMessagesAsync(messageBatch);
            }
            finally
            {
                // Calling DisposeAsync on client types is required to ensure that network
                // resources and other unmanaged objects are properly cleaned up.
                messageBatch.Dispose();
                await sender.DisposeAsync();
                await client.DisposeAsync();
            }
        }
        public static async Task get()
        {
            ServiceBusClient client;
            ServiceBusProcessor processor;
            client = new ServiceBusClient(connectionString);
            processor = client.CreateProcessor(topicName, new ServiceBusProcessorOptions());

            try
            {
                // add handler to process messages
                processor.ProcessMessageAsync += MessageHandler;

                // add handler to process any errors
                processor.ProcessErrorAsync += ErrorHandler;

                // start processing 
                await processor.StartProcessingAsync();

                Console.WriteLine("Start Task 0 is runing");
                Console.WriteLine($"It is {DateTimeOffset.Now} now");
                Console.ReadKey();

                // stop processing 
                Console.WriteLine("\nStopping the receiver...");
                await processor.StopProcessingAsync();
                Console.WriteLine("Stopped receiving messages");
            }
            finally
            {
                // Calling DisposeAsync on client types is required to ensure that network
                // resources and other unmanaged objects are properly cleaned up.
                await processor.DisposeAsync();
                await client.DisposeAsync();
            }

        }

        static async Task MessageHandler(ProcessMessageEventArgs args)
        {
            ArrayList c = new ArrayList();
            string body = args.Message.Body.ToString();
            try
            {
                c.Add(a.Dequeue());
                sendMessage(c).Wait();
            }
            catch (Exception ex)
            {

            }

            Console.WriteLine($"At {DateTimeOffset.Now},This task is finished:\nReceived: {body} ");

            // complete the message. messages is deleted from the subscription. 
            await args.CompleteMessageAsync(args.Message);
        }

        // handle any errors when receiving messages
        static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        }
    }
}