相信ActiveMQ大家一定很熟悉吧,它是JAVA平台下的十分強大的消息總線工具,你可以在JAVA平台下很輕松地使用它的SDK進行消息開發。它同時也提供了.NET版本,這對我們這些微軟的擁趸來說無疑是個好消息。隻要到官網上去下載下傳兩個動态連結庫:Apache.NMS.dll和Apache.NMS.ActiveMQ.dll,即可開始你的開發之旅了。
在進行開發之前,先了解下ActiveMQ中的兩個概念:生産者(Producer)和消費者(Consumer)。我的解釋算不上專業,隻能是通俗易懂。顧名思義,生産者就是生産消息的,也就是發送消息;而消費者是消費消息的,也就是接收消息。好啦,這就是我的了解,要更詳細更準确的解釋請找度娘和谷哥。廢話不多說,上幹貨!
/// <summary>
/// ActiveMq消息總線
/// </summary>
public class ActiveMqHelper:IDisposable
{
#region 成員變量
/// <summary>
/// ActiveMQ消息類型
/// </summary>
public enum MqMessageTypeEnum
{
/// <summary>
/// 主題。生産者釋出主題,所有訂閱者都将收到該主題。
/// </summary>
Topic,
/// <summary>
/// 消息隊列。點對點模式。
/// </summary>
Queue
}
private bool _isInitSuccess = false; // 隊列消息消費者是否成功初始化
private IConnection _connection;
private ISession _session;
private string _uriProvider;
/// <summary>
/// 生産者消息總線位址
/// </summary>
public string UriProvider
{
get { return _uriProvider; }
set { _uriProvider = value; }
}
private string _messageName;
/// <summary>
/// 主題或消息隊列
/// </summary>
public string MessageName
{
get { return _messageName; }
set { _messageName = value; }
}
private string _userName;
/// <summary>
/// 使用者名
/// </summary>
public string UserName
{
get { return _userName; }
set { _userName = value; }
}
private string _password;
/// <summary>
/// 密碼
/// </summary>
public string Password
{
get { return _password; }
set { _password = value; }
}
#endregion
#region 構造函數
public ActiveMqHelper()
{
}
public ActiveMqHelper(string messageName, string uriProvider)
: this()
{
_uriProvider = uriProvider;
_messageName = messageName;
}
public ActiveMqHelper(string messageName, string uriProvider, string userName, string password)
: this(messageName, uriProvider)
{
_userName = userName;
_password = password;
}
~ActiveMqHelper()
{
Stop();
}
#endregion
#region 公共方法
/// <summary>
/// 初始化隊列消息消費者
/// </summary>
public void Start()
{
try
{
if (string.IsNullOrEmpty(_uriProvider))
{
throw new ArgumentNullException("生産者消息總線位址為空!");
}
if (_isInitSuccess)
{
Stop();
}
// 得到連接配接工廠
ConnectionFactory factory = new ConnectionFactory(_uriProvider);
// 建立一個連接配接
if (!string.IsNullOrEmpty(_userName))
{
_connection = factory.CreateConnection(_userName, _password);
}
else
{
_connection = factory.CreateConnection();
}
// 打開連接配接
_connection.Start();
// 得到會話
_session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
// 初始化完成
_isInitSuccess = true;
}
catch
{
_isInitSuccess = false;
throw;
}
}
/// <summary>
/// 停止隊列消息消費者,釋放資源
/// </summary>
public void Stop()
{
try
{
if (!_isInitSuccess)
return;
if (_session != null)
{
_session.Close();
_session = null;
}
if (_connection != null)
{
_connection.Stop();
_connection.Close();
_connection = null;
}
}
catch
{
throw;
}
}
/// <summary>
/// 建立生産者
/// </summary>
/// <param name="messageType">消息類型</param>
/// <param name="messageName">主題或消息隊列</param>
public IMessageProducer CreateProducer(MqMessageTypeEnum messageType, string messageName)
{
try
{
if (!_isInitSuccess)
{
Start();
}
if (messageType == MqMessageTypeEnum.Topic)
{
return _session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(messageName));
}
else
{
return _session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(messageName));
}
//IDestination destNation = SessionUtil.GetDestination(_session, _messageName, DestinationType.Queue);
//return _session.CreateProducer(destNation);
}
catch
{
_isInitSuccess = false;
throw;
}
}
/// <summary>
/// 建立生産者
/// </summary>
/// <param name="messageType">消息類型</param>
public IMessageProducer CreateProducer(MqMessageTypeEnum messageType)
{
return CreateProducer(messageType, _messageName);
}
/// <summary>
/// 建立消費者
/// </summary>
/// <param name="messageType">消息類型</param>
/// <param name="messageName">主題或消息隊列</param>
/// <param name="selector">篩選器</param>
public IMessageConsumer CreateConsumer(MqMessageTypeEnum messageType, string messageName, string selector)
{
try
{
if (!_isInitSuccess)
{
Start();
}
if (string.IsNullOrEmpty(selector))
{
if (messageType == MqMessageTypeEnum.Topic)
{
return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(messageName));
}
else
{
return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(messageName));
}
}
else
{
if (messageType == MqMessageTypeEnum.Topic)
{
return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(messageName), selector, false);
}
else
{
return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(messageName), selector, false);
}
}
}
catch
{
_isInitSuccess = false;
throw;
}
}
/// <summary>
/// 建立消費者
/// </summary>
/// <param name="messageType">消息類型</param>
/// <param name="messageName">主題或消息隊列</param>
public IMessageConsumer CreateConsumer(MqMessageTypeEnum messageType, string messageName)
{
return CreateConsumer(messageType, messageName, null);
}
/// <summary>
/// 建立消費者
/// </summary>
/// <param name="messageType">消息類型</param>
public IMessageConsumer CreateConsumer(MqMessageTypeEnum messageType)
{
return CreateConsumer(messageType, _messageName, null);
}
/// <summary>
/// 發送消息
/// </summary>
/// <param name="messageBody">消息消息主體param>
/// <param name="producer">生産者</param>
public void SendMapMessage(IDictionary<string, object> messageBody, IMessageProducer producer)
{
try
{
if (!_isInitSuccess)
{
Start();
}
IMapMessage mapMsg = _session.CreateMapMessage();
if (mapMsg == null)
{
return;
}
foreach (KeyValuePair<string, object> item in messageBody)
{
if (item.Value is int)
{
mapMsg.Body.SetInt(item.Key, (int)item.Value);
}
else if (item.Value is string)
{
mapMsg.Body.SetString(item.Key, (string)item.Value);
}
else if (item.Value is bool)
{
mapMsg.Body.SetBool(item.Key, (bool)item.Value);
}
else if (item.Value is byte)
{
mapMsg.Body.SetByte(item.Key, (byte)item.Value);
}
else if (item.Value is char)
{
mapMsg.Body.SetChar(item.Key, (char)item.Value);
}
else if (item.Value is IDictionary)
{
mapMsg.Body.SetDictionary(item.Key, (IDictionary)item.Value);
}
else if (item.Value is double)
{
mapMsg.Body.SetDouble(item.Key, (double)item.Value);
}
else if (item.Value is float)
{
mapMsg.Body.SetFloat(item.Key, (float)item.Value);
}
else if (item.Value is IList)
{
mapMsg.Body.SetList(item.Key, (IList)item.Value);
}
else if (item.Value is short)
{
mapMsg.Body.SetShort(item.Key, (short)item.Value);
}
}
//發送消息
producer.Send(mapMsg, MsgDeliveryMode.Persistent, MsgPriority.High, TimeSpan.MinValue);
}
catch
{
_isInitSuccess = false;
throw;
}
}
/// <summary>
/// 發送消息
/// </summary>
/// <param name="messageBody">消息主體</param>
/// <param name="messageType">消息類型</param>
public void SendMapMessage(IDictionary<string, object> messageBody, MqMessageTypeEnum messageType)
{
try
{
if (!_isInitSuccess)
{
Start();
}
using (IMessageProducer producer = CreateProducer(messageType))
{
SendMapMessage(messageBody, producer);
}
}
catch
{
_isInitSuccess = false;
throw;
}
}
/// <summary>
/// 發送消息
/// </summary>
/// <param name="messageText">消息文本</param>
/// <param name="messageProperties">消息屬性</param>
/// <param name="producer">生産者</param>
public void SendTextMessage(string messageText, IDictionary<string, object> messageProperties, IMessageProducer producer)
{
try
{
if (!_isInitSuccess)
{
Start();
}
ITextMessage textMsg = _session.CreateTextMessage();
if (textMsg == null)
{
return;
}
textMsg.Text = messageText;
if (messageProperties != null && messageProperties.Count > 0)
{
foreach (KeyValuePair<string, object> item in messageProperties)
{
if (item.Value is int)
{
textMsg.Properties.SetInt(item.Key, (int)item.Value);
}
else if (item.Value is string)
{
textMsg.Properties.SetString(item.Key, (string)item.Value);
}
else if (item.Value is bool)
{
textMsg.Properties.SetBool(item.Key, (bool)item.Value);
}
else if (item.Value is byte)
{
textMsg.Properties.SetByte(item.Key, (byte)item.Value);
}
else if (item.Value is char)
{
textMsg.Properties.SetChar(item.Key, (char)item.Value);
}
else if (item.Value is IDictionary)
{
textMsg.Properties.SetDictionary(item.Key, (IDictionary)item.Value);
}
else if (item.Value is double)
{
textMsg.Properties.SetDouble(item.Key, (double)item.Value);
}
else if (item.Value is float)
{
textMsg.Properties.SetFloat(item.Key, (float)item.Value);
}
else if (item.Value is IList)
{
textMsg.Properties.SetList(item.Key, (IList)item.Value);
}
else if (item.Value is short)
{
textMsg.Properties.SetShort(item.Key, (short)item.Value);
}
}
}
//發送消息
producer.Send(textMsg, MsgDeliveryMode.Persistent, MsgPriority.High, TimeSpan.MinValue);
}
catch
{
_isInitSuccess = false;
throw;
}
}
/// <summary>
/// 發送消息
/// </summary>
/// <param name="messageText">消息文本</param>
/// <param name="messageProperties">消息屬性</param>
/// <param name="messageType">消息類型</param>
public void SendTextMessage(string messageText, IDictionary<string, object> messageProperties, MqMessageTypeEnum messageType)
{
try
{
if (!_isInitSuccess)
{
Start();
}
using (IMessageProducer producer = CreateProducer(messageType))
{
SendTextMessage(messageText,messageProperties, producer);
}
}
catch
{
_isInitSuccess = false;
throw;
}
}
#endregion
#region 私有方法
// 字元數組轉字元串
private IList StringToByteList(string content)
{
return StringToByteList(content, "GB2312");
}
private IList StringToByteList(string content, string encodeName)
{
Byte[] resuleBytes = Encoding.GetEncoding(encodeName).GetBytes(content);
return resuleBytes;
}
#endregion
#region IDisposable 成員
public void Dispose()
{
Stop();
}
#endregion
}
以上是ActiveMQ的消息發送和接收的Helper類,這裡要說明一點的是,ActiveMQ消息(IMassage)有很多種類型,我這裡隻做了IMapMessage和ITextMessage,其他的都大同小異,讀者自己擴充吧。下面再提供下測試程式。
private static string _uriProvider = "activemq:failover:(tcp://" + ConfigurationManager.AppSettings["VMActiveMQIP"] + ")?maxReconnectAttempts=3";
private static ActiveMqHelper _amh;
private static Apache.NMS.IMessageConsumer _consumer;
/// <summary>
/// 消息響應句柄
/// </summary>
/// <param name="properties">消息屬性</param>
public delegate void MessageResponse(IDictionary<string, object> properties);
/// <summary>
/// 發送配置檢測指令
/// </summary>
/// <param name="command_id">指令ID:0-檢測,1-修複</param>
/// <param name="task_id">任務ID</param>
/// <param name="task_type">任務類型:0-手動,1-自動</param>
public static void SendCommand(int command_id, int task_id, int task_type)
{
if (_amh == null)
{
_amh = new ActiveMqHelper("esp.queue.command_to_virtual_config_data", _uriProvider);
}
IDictionary<string, object> properties = new Dictionary<string, object>();
properties.Add(new KeyValuePair<string, object>("command_id", command_id));
properties.Add(new KeyValuePair<string, object>("task_id", task_id));
properties.Add(new KeyValuePair<string, object>("task_type", task_type));
_amh.SendMapMessage(properties, ActiveMqHelper.MqMessageTypeEnum.Queue);
}
/// <summary>
/// 接收配置檢測消息
/// </summary>
/// <param name="component">消息響應元件</param>
/// <param name="responseMessage">消息響應方法</param>
public static void StartReceiveCommand(System.Windows.Forms.Control component, MessageResponse messageResponse)
{
if (_amh == null)
{
_amh = new ActiveMqHelper("esp.queue.command_to_virtual_config_data", _uriProvider);
}
if (_consumer == null)
{
_consumer = _amh.CreateConsumer(ActiveMqHelper.MqMessageTypeEnum.Queue);
}
_consumer.Listener += (message) =>
{
IDictionary<string, object> properties = new Dictionary<string, object>();
IEnumerator keys = ((Apache.NMS.IMapMessage)message).Body.Keys.GetEnumerator();
keys.Reset();
IEnumerator values = ((Apache.NMS.IMapMessage)message).Body.Values.GetEnumerator();
values.Reset();
while (keys.MoveNext())
{
values.MoveNext();
properties.Add(new KeyValuePair<string, object>
(keys.Current.ToString(), values.Current));
}
if (component.InvokeRequired)
{
component.BeginInvoke(messageResponse, properties);
}
};
}
/// <summary>
/// 停止接收配置檢測指令
/// </summary>
public static void StopReceiveCommand()
{
if (_amh != null)
{
_amh.Dispose();
_amh = null;
}
if (_consumer != null)
{
_consumer.Close();
_consumer = null;
}
}
上面這個測試例子是用來做為發送指令的,用的是IMapMessage方式,僅作參考。