上篇博文中我們介紹了Azure Messaging的重複消息機制、At most once 和At least once.
Azure Messaging-ServiceBus Messaging消息隊列技術系列5-重複消息:at-least-once at-most-once
本文中我們主要研究并介紹Azure Messaging的消息回執機制:實際應用場景:
同步收發場景下,消息生産者和消費者雙向應答模式,例如:張三寫封信送到郵局中轉站,然後李四從中轉站獲得信,然後在寫一份回執信,放到中轉站,然後張三去取,當然張三寫信的時候就得寫明回信位址。還
有,生成訂單編号場景,發送一個生成訂單編号的消息,消息消費者接收生成訂單編号的消息,并通過消息回執傳回。
Azure Messaging的消息回執機制主要通過:基于帶會話的Queue/Topic、SessionId、ReplyTo屬性來實作
在代碼實作中,我們需要:
1. 兩個工作線程,一個線程用于消息發送和接收回執消息,一個線程用于消息接收和發送消息回執。
2. 一個會話辨別:ReceiptSession
3. 兩個隊列Queue:RequestQueue:發送消息、接收消息,ResponseQueue:發送回執消息,接收回執消息。
直接Show Code:
首先,我們在ServiceBusMQManager增加一個線程安全的建立帶回話的QueueClient方法:
private static object syncObj = new object();
/// <summary>
/// 擷取要求會話帶Session的QueueClient
/// </summary>
/// <param name="queueName">隊列名稱</param>
/// <returns>QueueClient</returns>
public QueueClient GetSessionQueueClient(string queueName)
{
var namespaceClient = NamespaceManager.Create();
if (!namespaceClient.QueueExists(queueName))
{
lock (syncObj)
{
if (!namespaceClient.QueueExists(queueName))
{
var queue = new QueueDescription(queueName) { RequiresSession = true };
namespaceClient.CreateQueue(queue);
}
}
}
return QueueClient.Create(queueName, ReceiveMode.ReceiveAndDelete);
}
然後我們定義一些常量:
private static readonly string ReplyToSessionId = "ReceiptSession";
const double ResponseMessageTimeout = 20.0;
private static readonly string requestQueueName = "RequestQueue";
private static readonly string responseQueueName = "ResponseQueue";
實作發送并接收回執消息的方法:
/// <summary>
/// 發送并接收回執消息
/// </summary>
/// <param name="bills"></param>
public static void SendMessage()
{
var manager = new ServiceBusUtils();
var responseClient = manager.GetSessionQueueClient(responseQueueName);
var requestClient = manager.GetSessionQueueClient(requestQueueName);
var messsageReceiver = responseClient.AcceptMessageSession(ReplyToSessionId);
var order = CreateSalesOrder(1);
//發送消息
var message = new BrokeredMessage(order);
message.Properties.Add("Type", order.GetType().ToString());
message.SessionId = ReplyToSessionId;
message.MessageId = "OrderMessage001";
message.ReplyTo = responseQueueName;
requestClient.Send(message);
Console.WriteLine("Send message: " + message.MessageId + ", SalesOrder ID: " + order.OrderID);
//接收消息回執
var receivedMessage = messsageReceiver.Receive(TimeSpan.FromSeconds(ResponseMessageTimeout * 2));
var receivedOrder = receivedMessage.GetBody<SalesOrder>();
Console.WriteLine("Receive receipt message: " + receivedMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);
messsageReceiver.Close();
}
實作接收消息并發送回執方法:
1 /// <summary>
2 /// 接收消息并回執
3 /// </summary>
4 public static void ReceiveMessage()
5 {
6 var manager = new ServiceBusUtils();
7
8 var requestClient = manager.GetSessionQueueClient(requestQueueName);
9 var session = requestClient.AcceptMessageSession();
10 var requestMessage = session.Receive();
11
12 if (requestMessage != null)
13 {
14 var receivedOrder = requestMessage.GetBody<SalesOrder>();
15 Console.WriteLine("Receive message: " + requestMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);
16
17 var responseMessage = new BrokeredMessage(receivedOrder);
18 responseMessage.Properties.Add("Type", receivedOrder.GetType().ToString());
19 responseMessage.ReplyToSessionId = ReplyToSessionId;
20 responseMessage.MessageId = "ResponseOrderMessage001";
21 responseMessage.SessionId = requestMessage.SessionId;
22
23 //發送回執消息
24 var responseClient = manager.GetSessionQueueClient(requestMessage.ReplyTo);
25 responseClient.Send(responseMessage);
26 Console.WriteLine("Send receipt message: " + responseMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);
27 }
28 }
Main方法中,啟動兩個工作線程:一個線程用于消息發送和接收回執消息,一個線程用于消息接收和發送消息回執。
因為涉及到Azure Messaging中隊列的第一次建立,Azure Messaging是不支援多個請求同時建立同一個隊列的,是以,我們兩個線程間做一個簡單的Task.Delay(3000).Wait();
1 static void Main(string[] args)
2 {
3 var sendTask = Task.Factory.StartNew(() => { SendMessage(); });
4 Task.Delay(3000).Wait();
5 var receiveTask = Task.Factory.StartNew(() => { ReceiveMessage(); });
6
7 Task.WaitAll(sendTask, receiveTask);
8
9 Console.ReadKey();
10 }
我們看看程式輸出:
Azure 服務總線中的隊列:
可以看出:Azure Messaging-ServiceBus Messaging 基于帶會話的Queue/Topic、SessionId、ReplyTo屬性來實作消息回執機制。
周國慶
2017/3/23