上一篇:Window Azure ServiceBus Messaging消息隊列技術系列2-程式設計SDK入門 http://www.cnblogs.com/tianqing/p/5944573.html
介紹了Azure Service Bus的程式設計SDK(主要的程式設計接口)
本文中我們以實際的使用場景來說明Azure Messaging是否支援以及如何編碼實作:消息的收發順序保證。
消息的收發在實際業務中往往是有順序的:發送時1-2-3-4-5,接收時也必須是1-2-3-4-5,即FIFO特性。
在本文的Demo中,我們模拟銷售訂單消息隊列異步處理場景,消息體是一條SalesOrder,順序發送,順序接收。
1. 我們還是使用上篇部落格中在Windows Azure的Portal上建立好的NameSpace:servicebustest
銷售訂單隊列名稱:OrderQueue
2.簡單封裝一個Service Bus的工具類:ServiceBusUtils: 用于建立隊列、删除隊列、建立QueueClient、建立BrokerdMessage
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading.Tasks;
namespace AzureMessaging.FIFO
{
/// <summary>
/// ServiceBus工具類
/// </summary>
class ServiceBusUtils
{
//Namespace名稱
private static readonly string namespaceName = "servicebustest";
/// <summary>
/// 建立隊列
/// </summary>
/// <param name="queueName">隊列名稱</param>
/// <param name="isSession">是否支援會話</param>
public void CreateQueue(string queueName, bool isSession = true)
{
var namespaceClient = NamespaceManager.Create();
if (namespaceClient.QueueExists(queueName))
{
namespaceClient.DeleteQueue(queueName);
}
var queue = new QueueDescription(queueName) { RequiresSession = isSession };
namespaceClient.CreateQueue(queue);
}
/// <summary>
/// 删除隊列
/// </summary>
/// <param name="queueName">隊列名稱</param>
public void DeleteQueue(string queueName)
{
var namespaceClient = NamespaceManager.Create();
if (namespaceClient.QueueExists(queueName))
{
namespaceClient.DeleteQueue(queueName);
}
}
/// <summary>
/// 建立隊列用戶端
/// </summary>
/// <returns>隊列用戶端</returns>
public QueueClient GetQueueClient(string queueName, bool isSession = false, ReceiveMode mode = ReceiveMode.ReceiveAndDelete)
{
return QueueClient.Create(queueName, mode);
}
/// <summary>
/// 建立隊列用戶端
/// </summary>
/// <returns>隊列用戶端</returns>
public QueueClient GetReceiveQueueClient(string queueName, ReceiveMode mode = ReceiveMode.PeekLock)
{
var namespaceClient = NamespaceManager.Create();
return QueueClient.Create(queueName, mode);
}
/// <summary>
/// 構造消息
/// </summary>
/// <param name="serializableObject">可序列化的對象</param>
/// <returns>消息</returns>
public BrokeredMessage Create(Object serializableObject)
{
var serializer = new DataContractSerializer(serializableObject.GetType(),
new DataContractSerializerSettings() { IgnoreExtensionDataObject = true, PreserveObjectReferences = false });
var message = new BrokeredMessage(serializableObject);
message.Properties.Add("Type", serializableObject.GetType().ToString());
return message;
}
}
}
2. 示例SalesOrder實體類
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace AzureMessaging.FIFO
{
/// <summary>
/// 銷售訂單類
/// </summary>
public class SalesOrder
{
/// <summary>
/// 訂單ID
/// </summary>
public string OrderID { get; set; }
/// <summary>
/// 訂單編号
/// </summary>
public string Code { get; set; }
/// <summary>
/// 建立時間
/// </summary>
public DateTime CreateTime { get; set; }
/// <summary>
/// 總價格
/// </summary>
public Decimal TotalPrice { get; set; }
/// <summary>
/// 産品ID
/// </summary>
public int ProductID { get; set; }
}
}
3. 消息順序發送
向OrderQueue發送10條消息訂單消息,輸出每條消息的順序号以及MessageID
private static readonly string queueName = "OrderQueue";
/// <summary>
/// 發送消息
/// </summary>
private static void MessageSend()
{
var sbUtils = new ServiceBusUtils();
//建立隊列
sbUtils.CreateQueue(queueName, false);
//順序發送消息到OrderQueue
var queueSendClient = sbUtils.GetQueueClient(queueName);
for (int i = 0; i < 10; i++)
{
var order = new SalesOrder() { OrderID = i.ToString(), Code = "SalesOrder_" + i, CreateTime = DateTime.Now, ProductID = 17967, TotalPrice = new decimal(19999) };
var message = sbUtils.Create(order);
queueSendClient.Send(message);
Console.WriteLine(string.Format("Send {0} Message: {1}", i, message.MessageId));
}
Console.WriteLine("Send Completed!");
}
程式輸出:
4. 消息順序接收
消費OrderQueue中的消息,驗證消息的接收順序
private static readonly string queueName = "OrderQueue";
/// <summary>
/// 接收消息
/// </summary>
private static void MessageReceive()
{
int index = 0;
BrokeredMessage msg = null;
var sbUtils = new ServiceBusUtils();
var queueReveiveClient = sbUtils.GetReceiveQueueClient(queueName, ReceiveMode.ReceiveAndDelete);
while ((msg = queueReveiveClient.Receive(TimeSpan.FromMilliseconds(3))) != null)
{
Console.WriteLine(string.Format("Received {0} Message: {1}", index, msg.MessageId));
index++;
}
////删除隊列
//sbUtils.DeleteQueue(queueName);
Console.WriteLine("Receive Completed!");
}
可以看出,Azure Messaging中ServiceBus對消息的收發是有順序保證的。
下一篇我們繼續其他特性的驗證和介紹。
周國慶
2017/3/9