天天看點

Azure Messaging-ServiceBus Messaging消息隊列技術系列5-重複消息:at-least-once at-most-once

上篇部落格中,我們用實際的業務場景和代碼示例了Azure Messaging-ServiceBus Messaging對複雜對象消息的支援和消息的持久化:

Azure Messaging-ServiceBus Messaging消息隊列技術系列4-複雜對象消息是否需要支援序列化和消息持久化

本文中我們主要研究并介紹Azure Messaging對重複消息的支援。

MessageReceiver 對象建立時可以指定消息接收模式: ReceiveAndDelete 和 PeekLock (預設),其中:

1. 使用 ReceiveAndDelete 模式時,接收是單步操作,即當 Service Bus 收到請求時,它将消息标記為“正在使用”,然後将其傳回給應用程式。ReceiveAndDelete 模式是最簡

單的模型,并且最适合在出現故障時應用程式能夠容許不處理消息的場景。了解此模式時,可考慮這種情況:使用者發出了接收請求,但在處理消息之前發生崩潰。由于 Service B

us已将消息标記為“正在使用”,是以當應用程式重新啟動并重新開始使用消息時,它就會錯過在崩潰前已使用的消息。

2. 在 PeekLock 模式下,接收變成兩階段操作,是以可以支援不能容許錯過消息的應用程式。當 Service Bus 收到請求時,它會找到下一條要使用的消息,将其鎖定以防止其他使

用者接收它,然後将其傳回給應用程式。應用程式完成消息處理(或将消息可靠地存儲以便将來處理)後,會對收到的消息調用 Complete 以完成接收過程的第二階段。當Service

Bus 看到 Complete 時,會将該消息标記為“正在使用”。另外兩個結果也是可能的。第一個結果,如果由于某種原因應用程式無法處理該消息,它可以對收到的消息Abandon(而

不是 Complete)。這将導緻 Service Bus 解鎖該消息,并使該消息可以重新被同一使用者或其他競争的使用者接收。第二個結果,即存在與鎖定關聯的逾時,如果應用程式在鎖

定逾時到期前無法處理改消息(例如,應用程式崩潰)則 Service Bus 将解鎖該消息并使其可以重新被接收。如果應用程式在處理該消息後崩潰,但此時尚未發出 Complete 請

求,則在應用程式重新啟動時,該消息将重新傳遞給應用程式。這通常稱為“至少一次”處理。這意味着每條消息都将至少處理一次,但在某些情況下可能會重新傳遞同一消息。如果

方案不能容許重複處理,則需要在應用程式中添加檢測重複項的邏輯。這可以基于消息的 MessageId 屬性來實作。此屬性的值在傳遞嘗試過程中保持不變。這稱為“恰好一次”處、

理。

接下來,我們通過Code show一下消息的重複發送和重複接收。

消息重複發送:同一個消息BrokeredMessage發送兩次

/// <summary>
        /// 發送消息
        /// </summary>
        private static void MessageMultiSendTest()
        {
            var sbUtils = new ServiceBusUtils();

            //建立隊列
            sbUtils.CreateQueue(queueName, false);

            //多次發送消息到OrderQueue
            var queueSendClient = sbUtils.GetQueueClient(queueName);

            var order = CreateSalesOrder(1);
            var message = sbUtils.Create(order);
            queueSendClient.Send(message);
            queueSendClient.Send(message);

            Console.WriteLine("Send Completed!");
        }      

實際執行過程中是出錯的:

Azure Messaging-ServiceBus Messaging消息隊列技術系列5-重複消息:at-least-once at-most-once

由此可以得出:

Azure Messaging 不支援同一個消息發送多次,必須通過new多個BrokeredMessage執行個體實作。

同時,消息的唯一性由消息的MessageID來辨別!

PeekAndLock模式下消息的重複接收:

接收模式PeekAndLock,同一個隊列,第一個Consumer接收消息,但是不Complete;然後第二個Consumer繼續接收消息,此時第一個Consumer未Complete的消息有一個

TTL,在TTL時間區間之内,第二個Consumer可以繼續接收目前隊列未鎖定的消息,當TTL時間到達後,釋放第一個Consumer鎖定的消息,第二個Consumer讀取到了第一個

Consumer未Complete的消息。

The duration of a peek lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes;

the default value is 1 minute.

/// <summary>
        /// 接收消息
        /// </summary>
        private static void MessageReceive()
        {
            int index = 0;            
            var sbUtils = new ServiceBusUtils();
            var queueReveiveClient1 = sbUtils.GetReceiveQueueClient(queueName, ReceiveMode.PeekLock);
            for (int i = 0; i < 10; i++)
            {
                var msg = queueReveiveClient1.Peek();
                Console.WriteLine(string.Format("Received {0} MessageID: {1}", i, msg.MessageId));
            }

            var queueReveiveClient2 = sbUtils.GetReceiveQueueClient(queueName, ReceiveMode.PeekLock);
            for (int i = 0; i < 10; i++)
            {
                var msg = queueReveiveClient2.Receive();
                Console.WriteLine(string.Format("Second received {0} MessageID: {1}", i, msg.MessageId));
                msg.Complete();
            }

            ////删除隊列
            //sbUtils.DeleteQueue(queueName);

            Console.WriteLine("Receive Completed!");
        }      

第一個隊列Consumer使用Peek模式接收到消息,隻是取出消息,不從消息隊列中移出。

第二個隊列Consumer使用Receive模式同樣可以接收消息。

Azure Messaging-ServiceBus Messaging消息隊列技術系列5-重複消息:at-least-once at-most-once

如果兩個隊列Consumer都使用Receive模式接收消息,隻有第一個Consumer可以接收到,第二個Consumer則接收不到,一直在等待消息的入隊!

/// <summary>
        /// 接收消息
        /// </summary>
        private static void MessageReceive()
        {
            int index = 0;            
            var sbUtils = new ServiceBusUtils();
            var queueReveiveClient1 = sbUtils.GetReceiveQueueClient(queueName, ReceiveMode.PeekLock);
            for (int i = 0; i < 10; i++)
            {
                var msg = queueReveiveClient1.Receive();
                Console.WriteLine(string.Format("Received {0} MessageID: {1}", i, msg.MessageId));
            }

            var queueReveiveClient2 = sbUtils.GetReceiveQueueClient(queueName, ReceiveMode.PeekLock);
            for (int i = 0; i < 10; i++)
            {
                var msg = queueReveiveClient2.Receive();
                Console.WriteLine(string.Format("Second received {0} MessageID: {1}", i, msg.MessageId));
                msg.Complete();
            }

            ////删除隊列
            //sbUtils.DeleteQueue(queueName);

            Console.WriteLine("Receive Completed!");
        }      
Azure Messaging-ServiceBus Messaging消息隊列技術系列5-重複消息:at-least-once at-most-once

因為消息已經被第一個Consumer消費。

通過本篇我們了解了Azure Messaging-ServiceBus Messaging對重複消息的處理機制。

周國慶

2017/3/16

繼續閱讀