天天看點

Azure Messaging-ServiceBus Messaging消息隊列技術系列6-消息回執

上篇博文中我們介紹了Azure Messaging的重複消息機制、At most once 和At least once.

 Azure Messaging-ServiceBus Messaging消息隊列技術系列5-重複消息:at-least-once at-most-once

本文中我們主要研究并介紹Azure Messaging的消息回執機制:實際應用場景:

同步收發場景下,消息生産者和消費者雙向應答模式,例如:張三寫封信送到郵局中轉站,然後李四從中轉站獲得信,然後在寫一份回執信,放到中轉站,然後張三去取,當然張三寫信的時候就得寫明回信位址。還

有,生成訂單編号場景,發送一個生成訂單編号的消息,消息消費者接收生成訂單編号的消息,并通過消息回執傳回。

Azure Messaging的消息回執機制主要通過:基于帶會話的Queue/Topic、SessionId、ReplyTo屬性來實作

在代碼實作中,我們需要:

1. 兩個工作線程,一個線程用于消息發送和接收回執消息,一個線程用于消息接收和發送消息回執。

2. 一個會話辨別:ReceiptSession  

3. 兩個隊列Queue:RequestQueue:發送消息、接收消息,ResponseQueue:發送回執消息,接收回執消息。

直接Show Code:

首先,我們在ServiceBusMQManager增加一個線程安全的建立帶回話的QueueClient方法:

private static object syncObj = new object();
        /// <summary>
        /// 擷取要求會話帶Session的QueueClient
        /// </summary>
        /// <param name="queueName">隊列名稱</param>
        /// <returns>QueueClient</returns>
        public QueueClient GetSessionQueueClient(string queueName)
        {
            var namespaceClient = NamespaceManager.Create();
            if (!namespaceClient.QueueExists(queueName))
            {
                lock (syncObj)
                {
                    if (!namespaceClient.QueueExists(queueName))
                    {
                        var queue = new QueueDescription(queueName) { RequiresSession = true };
                        namespaceClient.CreateQueue(queue);
                    }
                }
            }

            return QueueClient.Create(queueName, ReceiveMode.ReceiveAndDelete);
        }      

然後我們定義一些常量:

private static readonly string ReplyToSessionId = "ReceiptSession";

        const double ResponseMessageTimeout = 20.0;

        private static readonly string requestQueueName = "RequestQueue";

        private static readonly string responseQueueName = "ResponseQueue";      

實作發送并接收回執消息的方法:

/// <summary>
        /// 發送并接收回執消息
        /// </summary>
        /// <param name="bills"></param>
        public static void SendMessage()
        {
            var manager = new ServiceBusUtils();
            var responseClient = manager.GetSessionQueueClient(responseQueueName);
            var requestClient = manager.GetSessionQueueClient(requestQueueName);

            var messsageReceiver = responseClient.AcceptMessageSession(ReplyToSessionId);
            var order = CreateSalesOrder(1);

            //發送消息
            var message = new BrokeredMessage(order);
            message.Properties.Add("Type", order.GetType().ToString());
            message.SessionId = ReplyToSessionId;
            message.MessageId = "OrderMessage001";
            message.ReplyTo = responseQueueName;
            requestClient.Send(message);
            Console.WriteLine("Send message: " + message.MessageId + ", SalesOrder ID: " + order.OrderID);

            //接收消息回執
            var receivedMessage = messsageReceiver.Receive(TimeSpan.FromSeconds(ResponseMessageTimeout * 2));

            var receivedOrder = receivedMessage.GetBody<SalesOrder>();
            Console.WriteLine("Receive receipt message: " + receivedMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);
            messsageReceiver.Close();
        }      

實作接收消息并發送回執方法:

1         /// <summary>
 2         /// 接收消息并回執
 3         /// </summary>
 4         public static void ReceiveMessage()
 5         {
 6             var manager = new ServiceBusUtils();
 7 
 8             var requestClient = manager.GetSessionQueueClient(requestQueueName);
 9             var session = requestClient.AcceptMessageSession();
10             var requestMessage = session.Receive();
11            
12             if (requestMessage != null)
13             {
14                 var receivedOrder = requestMessage.GetBody<SalesOrder>();
15                 Console.WriteLine("Receive message: " + requestMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);
16 
17                 var responseMessage = new BrokeredMessage(receivedOrder);
18                 responseMessage.Properties.Add("Type", receivedOrder.GetType().ToString());
19                 responseMessage.ReplyToSessionId = ReplyToSessionId;
20                 responseMessage.MessageId = "ResponseOrderMessage001";
21                 responseMessage.SessionId = requestMessage.SessionId;
22                
23                 //發送回執消息
24                 var responseClient = manager.GetSessionQueueClient(requestMessage.ReplyTo);
25                 responseClient.Send(responseMessage);
26                 Console.WriteLine("Send receipt message: " + responseMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);               
27             }
28         }      

Main方法中,啟動兩個工作線程:一個線程用于消息發送和接收回執消息,一個線程用于消息接收和發送消息回執。

因為涉及到Azure Messaging中隊列的第一次建立,Azure Messaging是不支援多個請求同時建立同一個隊列的,是以,我們兩個線程間做一個簡單的Task.Delay(3000).Wait();

1         static void Main(string[] args)
 2         {
 3             var sendTask = Task.Factory.StartNew(() => { SendMessage(); });
 4             Task.Delay(3000).Wait();
 5             var receiveTask = Task.Factory.StartNew(() => { ReceiveMessage(); });
 6 
 7             Task.WaitAll(sendTask, receiveTask);
 8 
 9             Console.ReadKey();           
10         }      

我們看看程式輸出:

Azure Messaging-ServiceBus Messaging消息隊列技術系列6-消息回執

Azure 服務總線中的隊列:

Azure Messaging-ServiceBus Messaging消息隊列技術系列6-消息回執

可以看出:Azure Messaging-ServiceBus Messaging 基于帶會話的Queue/Topic、SessionId、ReplyTo屬性來實作消息回執機制。

周國慶

2017/3/23

繼續閱讀