相信ActiveMQ大家一定很熟悉吧,它是JAVA平台下的十分强大的消息总线工具,你可以在JAVA平台下很轻松地使用它的SDK进行消息开发。它同时也提供了.NET版本,这对我们这些微软的拥趸来说无疑是个好消息。只要到官网上去下载两个动态链接库:Apache.NMS.dll和Apache.NMS.ActiveMQ.dll,即可开始你的开发之旅了。
在进行开发之前,先了解下ActiveMQ中的两个概念:生产者(Producer)和消费者(Consumer)。我的解释算不上专业,只能是通俗易懂。顾名思义,生产者就是生产消息的,也就是发送消息;而消费者是消费消息的,也就是接收消息。好啦,这就是我的理解,要更详细更准确的解释请找度娘和谷哥。废话不多说,上干货!
/// <summary>
/// ActiveMq消息总线
/// </summary>
public class ActiveMqHelper:IDisposable
{
#region 成员变量
/// <summary>
/// ActiveMQ消息类型
/// </summary>
public enum MqMessageTypeEnum
{
/// <summary>
/// 主题。生产者发布主题,所有订阅者都将收到该主题。
/// </summary>
Topic,
/// <summary>
/// 消息队列。点对点模式。
/// </summary>
Queue
}
private bool _isInitSuccess = false; // 队列消息消费者是否成功初始化
private IConnection _connection;
private ISession _session;
private string _uriProvider;
/// <summary>
/// 生产者消息总线地址
/// </summary>
public string UriProvider
{
get { return _uriProvider; }
set { _uriProvider = value; }
}
private string _messageName;
/// <summary>
/// 主题或消息队列
/// </summary>
public string MessageName
{
get { return _messageName; }
set { _messageName = value; }
}
private string _userName;
/// <summary>
/// 用户名
/// </summary>
public string UserName
{
get { return _userName; }
set { _userName = value; }
}
private string _password;
/// <summary>
/// 密码
/// </summary>
public string Password
{
get { return _password; }
set { _password = value; }
}
#endregion
#region 构造函数
public ActiveMqHelper()
{
}
public ActiveMqHelper(string messageName, string uriProvider)
: this()
{
_uriProvider = uriProvider;
_messageName = messageName;
}
public ActiveMqHelper(string messageName, string uriProvider, string userName, string password)
: this(messageName, uriProvider)
{
_userName = userName;
_password = password;
}
~ActiveMqHelper()
{
Stop();
}
#endregion
#region 公共方法
/// <summary>
/// 初始化队列消息消费者
/// </summary>
public void Start()
{
try
{
if (string.IsNullOrEmpty(_uriProvider))
{
throw new ArgumentNullException("生产者消息总线地址为空!");
}
if (_isInitSuccess)
{
Stop();
}
// 得到连接工厂
ConnectionFactory factory = new ConnectionFactory(_uriProvider);
// 创建一个连接
if (!string.IsNullOrEmpty(_userName))
{
_connection = factory.CreateConnection(_userName, _password);
}
else
{
_connection = factory.CreateConnection();
}
// 打开连接
_connection.Start();
// 得到会话
_session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
// 初始化完成
_isInitSuccess = true;
}
catch
{
_isInitSuccess = false;
throw;
}
}
/// <summary>
/// 停止队列消息消费者,释放资源
/// </summary>
public void Stop()
{
try
{
if (!_isInitSuccess)
return;
if (_session != null)
{
_session.Close();
_session = null;
}
if (_connection != null)
{
_connection.Stop();
_connection.Close();
_connection = null;
}
}
catch
{
throw;
}
}
/// <summary>
/// 创建生产者
/// </summary>
/// <param name="messageType">消息类型</param>
/// <param name="messageName">主题或消息队列</param>
public IMessageProducer CreateProducer(MqMessageTypeEnum messageType, string messageName)
{
try
{
if (!_isInitSuccess)
{
Start();
}
if (messageType == MqMessageTypeEnum.Topic)
{
return _session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(messageName));
}
else
{
return _session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(messageName));
}
//IDestination destNation = SessionUtil.GetDestination(_session, _messageName, DestinationType.Queue);
//return _session.CreateProducer(destNation);
}
catch
{
_isInitSuccess = false;
throw;
}
}
/// <summary>
/// 创建生产者
/// </summary>
/// <param name="messageType">消息类型</param>
public IMessageProducer CreateProducer(MqMessageTypeEnum messageType)
{
return CreateProducer(messageType, _messageName);
}
/// <summary>
/// 创建消费者
/// </summary>
/// <param name="messageType">消息类型</param>
/// <param name="messageName">主题或消息队列</param>
/// <param name="selector">筛选器</param>
public IMessageConsumer CreateConsumer(MqMessageTypeEnum messageType, string messageName, string selector)
{
try
{
if (!_isInitSuccess)
{
Start();
}
if (string.IsNullOrEmpty(selector))
{
if (messageType == MqMessageTypeEnum.Topic)
{
return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(messageName));
}
else
{
return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(messageName));
}
}
else
{
if (messageType == MqMessageTypeEnum.Topic)
{
return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(messageName), selector, false);
}
else
{
return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(messageName), selector, false);
}
}
}
catch
{
_isInitSuccess = false;
throw;
}
}
/// <summary>
/// 创建消费者
/// </summary>
/// <param name="messageType">消息类型</param>
/// <param name="messageName">主题或消息队列</param>
public IMessageConsumer CreateConsumer(MqMessageTypeEnum messageType, string messageName)
{
return CreateConsumer(messageType, messageName, null);
}
/// <summary>
/// 创建消费者
/// </summary>
/// <param name="messageType">消息类型</param>
public IMessageConsumer CreateConsumer(MqMessageTypeEnum messageType)
{
return CreateConsumer(messageType, _messageName, null);
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="messageBody">消息消息主体param>
/// <param name="producer">生产者</param>
public void SendMapMessage(IDictionary<string, object> messageBody, IMessageProducer producer)
{
try
{
if (!_isInitSuccess)
{
Start();
}
IMapMessage mapMsg = _session.CreateMapMessage();
if (mapMsg == null)
{
return;
}
foreach (KeyValuePair<string, object> item in messageBody)
{
if (item.Value is int)
{
mapMsg.Body.SetInt(item.Key, (int)item.Value);
}
else if (item.Value is string)
{
mapMsg.Body.SetString(item.Key, (string)item.Value);
}
else if (item.Value is bool)
{
mapMsg.Body.SetBool(item.Key, (bool)item.Value);
}
else if (item.Value is byte)
{
mapMsg.Body.SetByte(item.Key, (byte)item.Value);
}
else if (item.Value is char)
{
mapMsg.Body.SetChar(item.Key, (char)item.Value);
}
else if (item.Value is IDictionary)
{
mapMsg.Body.SetDictionary(item.Key, (IDictionary)item.Value);
}
else if (item.Value is double)
{
mapMsg.Body.SetDouble(item.Key, (double)item.Value);
}
else if (item.Value is float)
{
mapMsg.Body.SetFloat(item.Key, (float)item.Value);
}
else if (item.Value is IList)
{
mapMsg.Body.SetList(item.Key, (IList)item.Value);
}
else if (item.Value is short)
{
mapMsg.Body.SetShort(item.Key, (short)item.Value);
}
}
//发送消息
producer.Send(mapMsg, MsgDeliveryMode.Persistent, MsgPriority.High, TimeSpan.MinValue);
}
catch
{
_isInitSuccess = false;
throw;
}
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="messageBody">消息主体</param>
/// <param name="messageType">消息类型</param>
public void SendMapMessage(IDictionary<string, object> messageBody, MqMessageTypeEnum messageType)
{
try
{
if (!_isInitSuccess)
{
Start();
}
using (IMessageProducer producer = CreateProducer(messageType))
{
SendMapMessage(messageBody, producer);
}
}
catch
{
_isInitSuccess = false;
throw;
}
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="messageText">消息文本</param>
/// <param name="messageProperties">消息属性</param>
/// <param name="producer">生产者</param>
public void SendTextMessage(string messageText, IDictionary<string, object> messageProperties, IMessageProducer producer)
{
try
{
if (!_isInitSuccess)
{
Start();
}
ITextMessage textMsg = _session.CreateTextMessage();
if (textMsg == null)
{
return;
}
textMsg.Text = messageText;
if (messageProperties != null && messageProperties.Count > 0)
{
foreach (KeyValuePair<string, object> item in messageProperties)
{
if (item.Value is int)
{
textMsg.Properties.SetInt(item.Key, (int)item.Value);
}
else if (item.Value is string)
{
textMsg.Properties.SetString(item.Key, (string)item.Value);
}
else if (item.Value is bool)
{
textMsg.Properties.SetBool(item.Key, (bool)item.Value);
}
else if (item.Value is byte)
{
textMsg.Properties.SetByte(item.Key, (byte)item.Value);
}
else if (item.Value is char)
{
textMsg.Properties.SetChar(item.Key, (char)item.Value);
}
else if (item.Value is IDictionary)
{
textMsg.Properties.SetDictionary(item.Key, (IDictionary)item.Value);
}
else if (item.Value is double)
{
textMsg.Properties.SetDouble(item.Key, (double)item.Value);
}
else if (item.Value is float)
{
textMsg.Properties.SetFloat(item.Key, (float)item.Value);
}
else if (item.Value is IList)
{
textMsg.Properties.SetList(item.Key, (IList)item.Value);
}
else if (item.Value is short)
{
textMsg.Properties.SetShort(item.Key, (short)item.Value);
}
}
}
//发送消息
producer.Send(textMsg, MsgDeliveryMode.Persistent, MsgPriority.High, TimeSpan.MinValue);
}
catch
{
_isInitSuccess = false;
throw;
}
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="messageText">消息文本</param>
/// <param name="messageProperties">消息属性</param>
/// <param name="messageType">消息类型</param>
public void SendTextMessage(string messageText, IDictionary<string, object> messageProperties, MqMessageTypeEnum messageType)
{
try
{
if (!_isInitSuccess)
{
Start();
}
using (IMessageProducer producer = CreateProducer(messageType))
{
SendTextMessage(messageText,messageProperties, producer);
}
}
catch
{
_isInitSuccess = false;
throw;
}
}
#endregion
#region 私有方法
// 字符数组转字符串
private IList StringToByteList(string content)
{
return StringToByteList(content, "GB2312");
}
private IList StringToByteList(string content, string encodeName)
{
Byte[] resuleBytes = Encoding.GetEncoding(encodeName).GetBytes(content);
return resuleBytes;
}
#endregion
#region IDisposable 成员
public void Dispose()
{
Stop();
}
#endregion
}
以上是ActiveMQ的消息发送和接收的Helper类,这里要说明一点的是,ActiveMQ消息(IMassage)有很多种类型,我这里只做了IMapMessage和ITextMessage,其他的都大同小异,读者自己扩展吧。下面再提供下测试程序。
private static string _uriProvider = "activemq:failover:(tcp://" + ConfigurationManager.AppSettings["VMActiveMQIP"] + ")?maxReconnectAttempts=3";
private static ActiveMqHelper _amh;
private static Apache.NMS.IMessageConsumer _consumer;
/// <summary>
/// 消息响应句柄
/// </summary>
/// <param name="properties">消息属性</param>
public delegate void MessageResponse(IDictionary<string, object> properties);
/// <summary>
/// 发送配置检测命令
/// </summary>
/// <param name="command_id">命令ID:0-检测,1-修复</param>
/// <param name="task_id">任务ID</param>
/// <param name="task_type">任务类型:0-手动,1-自动</param>
public static void SendCommand(int command_id, int task_id, int task_type)
{
if (_amh == null)
{
_amh = new ActiveMqHelper("esp.queue.command_to_virtual_config_data", _uriProvider);
}
IDictionary<string, object> properties = new Dictionary<string, object>();
properties.Add(new KeyValuePair<string, object>("command_id", command_id));
properties.Add(new KeyValuePair<string, object>("task_id", task_id));
properties.Add(new KeyValuePair<string, object>("task_type", task_type));
_amh.SendMapMessage(properties, ActiveMqHelper.MqMessageTypeEnum.Queue);
}
/// <summary>
/// 接收配置检测消息
/// </summary>
/// <param name="component">消息响应组件</param>
/// <param name="responseMessage">消息响应方法</param>
public static void StartReceiveCommand(System.Windows.Forms.Control component, MessageResponse messageResponse)
{
if (_amh == null)
{
_amh = new ActiveMqHelper("esp.queue.command_to_virtual_config_data", _uriProvider);
}
if (_consumer == null)
{
_consumer = _amh.CreateConsumer(ActiveMqHelper.MqMessageTypeEnum.Queue);
}
_consumer.Listener += (message) =>
{
IDictionary<string, object> properties = new Dictionary<string, object>();
IEnumerator keys = ((Apache.NMS.IMapMessage)message).Body.Keys.GetEnumerator();
keys.Reset();
IEnumerator values = ((Apache.NMS.IMapMessage)message).Body.Values.GetEnumerator();
values.Reset();
while (keys.MoveNext())
{
values.MoveNext();
properties.Add(new KeyValuePair<string, object>
(keys.Current.ToString(), values.Current));
}
if (component.InvokeRequired)
{
component.BeginInvoke(messageResponse, properties);
}
};
}
/// <summary>
/// 停止接收配置检测命令
/// </summary>
public static void StopReceiveCommand()
{
if (_amh != null)
{
_amh.Dispose();
_amh = null;
}
if (_consumer != null)
{
_consumer.Close();
_consumer = null;
}
}
上面这个测试例子是用来做为发送命令的,用的是IMapMessage方式,仅作参考。