天天看点

ActiveMQ for .NET消息总线操作

相信ActiveMQ大家一定很熟悉吧,它是JAVA平台下的十分强大的消息总线工具,你可以在JAVA平台下很轻松地使用它的SDK进行消息开发。它同时也提供了.NET版本,这对我们这些微软的拥趸来说无疑是个好消息。只要到官网上去下载两个动态链接库:Apache.NMS.dll和Apache.NMS.ActiveMQ.dll,即可开始你的开发之旅了。

在进行开发之前,先了解下ActiveMQ中的两个概念:生产者(Producer)和消费者(Consumer)。我的解释算不上专业,只能是通俗易懂。顾名思义,生产者就是生产消息的,也就是发送消息;而消费者是消费消息的,也就是接收消息。好啦,这就是我的理解,要更详细更准确的解释请找度娘和谷哥。废话不多说,上干货!

/// <summary>
/// ActiveMq消息总线
/// </summary>
public class ActiveMqHelper:IDisposable
{
    #region 成员变量

    /// <summary>
    /// ActiveMQ消息类型
    /// </summary>
    public enum MqMessageTypeEnum
    { 
        /// <summary>
        /// 主题。生产者发布主题,所有订阅者都将收到该主题。
        /// </summary>
        Topic,
        /// <summary>
        /// 消息队列。点对点模式。
        /// </summary>
        Queue
    }

    private bool _isInitSuccess = false; // 队列消息消费者是否成功初始化
    private IConnection _connection;
    private ISession _session;

    private string _uriProvider;
    /// <summary>
    /// 生产者消息总线地址
    /// </summary>
    public string UriProvider
    {
        get { return _uriProvider; }
        set { _uriProvider = value; }
    }
    private string _messageName;
    /// <summary>
    /// 主题或消息队列
    /// </summary>
    public string MessageName
    {
        get { return _messageName; }
        set { _messageName = value; }
    }
    private string _userName;
    /// <summary>
    /// 用户名
    /// </summary>
    public string UserName
    {
        get { return _userName; }
        set { _userName = value; }
    }
    private string _password;
    /// <summary>
    /// 密码
    /// </summary>
    public string Password
    {
        get { return _password; }
        set { _password = value; }
    }


    #endregion

    #region 构造函数

    public ActiveMqHelper()
    { 
    
    }

    public ActiveMqHelper(string messageName, string uriProvider)
        : this()
    {
        _uriProvider = uriProvider;
        _messageName = messageName;
    }

    public ActiveMqHelper(string messageName, string uriProvider, string userName, string password)
        : this(messageName, uriProvider)
    {
        _userName = userName;
        _password = password;
    }

    ~ActiveMqHelper()
    {
         Stop();
    }

    #endregion

    #region 公共方法

    /// <summary>
    /// 初始化队列消息消费者
    /// </summary>
    public void Start()
    {
        try
        {
            if (string.IsNullOrEmpty(_uriProvider))
            {
                throw new ArgumentNullException("生产者消息总线地址为空!");
            }
            if (_isInitSuccess)
            {
                Stop();
            }

            // 得到连接工厂
            ConnectionFactory factory = new ConnectionFactory(_uriProvider);
            // 创建一个连接
            if (!string.IsNullOrEmpty(_userName))
            {
                _connection = factory.CreateConnection(_userName, _password);
            }
            else
            {
                _connection = factory.CreateConnection();
            }
            // 打开连接
            _connection.Start();
            // 得到会话
            _session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
            // 初始化完成
            _isInitSuccess = true;
        }
        catch
        {
            _isInitSuccess = false;
            throw;
        }
    }

    /// <summary>
    /// 停止队列消息消费者,释放资源
    /// </summary>
    public void Stop()
    {
        try
        {
            if (!_isInitSuccess)
                return;

            if (_session != null)
            {
                _session.Close();
                _session = null;
            }

            if (_connection != null)
            {
                _connection.Stop();
                _connection.Close();
                _connection = null;
            }
        }
        catch
        {
            throw;
        }
    }

    /// <summary>
    /// 创建生产者
    /// </summary>
    /// <param name="messageType">消息类型</param>
    /// <param name="messageName">主题或消息队列</param>
    public IMessageProducer CreateProducer(MqMessageTypeEnum messageType, string messageName)
    {
        try
        {
            if (!_isInitSuccess)
            {
                Start();
            }
            if (messageType == MqMessageTypeEnum.Topic)
            {
                return _session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(messageName));
            }
            else
            {
                return _session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(messageName));
            }
            //IDestination destNation = SessionUtil.GetDestination(_session, _messageName, DestinationType.Queue);
            //return _session.CreateProducer(destNation);
        }
        catch
        {
            _isInitSuccess = false;
            throw;
        }
    }

    /// <summary>
    /// 创建生产者
    /// </summary>
    /// <param name="messageType">消息类型</param>
    public IMessageProducer CreateProducer(MqMessageTypeEnum messageType)
    {
        return CreateProducer(messageType, _messageName);
    }

    /// <summary>
    /// 创建消费者
    /// </summary>
    /// <param name="messageType">消息类型</param>
    /// <param name="messageName">主题或消息队列</param>
    /// <param name="selector">筛选器</param>
    public IMessageConsumer CreateConsumer(MqMessageTypeEnum messageType, string messageName, string selector)
    {
        try
        {
            if (!_isInitSuccess)
            {
                Start();
            }
            if (string.IsNullOrEmpty(selector))
            {
                if (messageType == MqMessageTypeEnum.Topic)
                {
                    return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(messageName));
                }
                else
                {
                    return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(messageName));
                }
            }
            else
            {
                if (messageType == MqMessageTypeEnum.Topic)
                {
                    return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(messageName), selector, false);
                }
                else
                {
                    return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(messageName), selector, false);
                }
            }
        }
        catch
        {
            _isInitSuccess = false;
            throw;
        }
    }

    /// <summary>
    /// 创建消费者
    /// </summary>
    /// <param name="messageType">消息类型</param>
    /// <param name="messageName">主题或消息队列</param>
    public IMessageConsumer CreateConsumer(MqMessageTypeEnum messageType, string messageName)
    {
        return CreateConsumer(messageType, messageName, null);
    }

    /// <summary>
    /// 创建消费者
    /// </summary>
    /// <param name="messageType">消息类型</param>
    public IMessageConsumer CreateConsumer(MqMessageTypeEnum messageType)
    {
        return CreateConsumer(messageType, _messageName, null);
    }


    /// <summary>
    /// 发送消息
    /// </summary>
    /// <param name="messageBody">消息消息主体param>
    /// <param name="producer">生产者</param>
    public void SendMapMessage(IDictionary<string, object> messageBody, IMessageProducer producer)
    {
        try
        {
            if (!_isInitSuccess)
            {
                Start();
            } 
            IMapMessage mapMsg = _session.CreateMapMessage();
            if (mapMsg == null)
            {
                return;
            }

            foreach (KeyValuePair<string, object> item in messageBody)
            {
                if (item.Value is int)
                {
                    mapMsg.Body.SetInt(item.Key, (int)item.Value);
                }
                else if (item.Value is string)
                {
                    mapMsg.Body.SetString(item.Key, (string)item.Value);
                }
                else if (item.Value is bool)
                {
                    mapMsg.Body.SetBool(item.Key, (bool)item.Value);
                }
                else if (item.Value is byte)
                {
                    mapMsg.Body.SetByte(item.Key, (byte)item.Value);
                }
                else if (item.Value is char)
                {
                    mapMsg.Body.SetChar(item.Key, (char)item.Value);
                }
                else if (item.Value is IDictionary)
                {
                    mapMsg.Body.SetDictionary(item.Key, (IDictionary)item.Value);
                }
                else if (item.Value is double)
                {
                    mapMsg.Body.SetDouble(item.Key, (double)item.Value);
                }
                else if (item.Value is float)
                {
                    mapMsg.Body.SetFloat(item.Key, (float)item.Value);
                }
                else if (item.Value is IList)
                {
                    mapMsg.Body.SetList(item.Key, (IList)item.Value);
                }
                else if (item.Value is short)
                {
                    mapMsg.Body.SetShort(item.Key, (short)item.Value);
                }
            }

            //发送消息
            producer.Send(mapMsg, MsgDeliveryMode.Persistent, MsgPriority.High, TimeSpan.MinValue);
        }
        catch
        {
            _isInitSuccess = false;
            throw;
        }
    }

    /// <summary>
    /// 发送消息
    /// </summary>
    /// <param name="messageBody">消息主体</param>
    /// <param name="messageType">消息类型</param>
    public void SendMapMessage(IDictionary<string, object> messageBody, MqMessageTypeEnum messageType)
    {
        try
        {
            if (!_isInitSuccess)
            {
                Start();
            }
            using (IMessageProducer producer = CreateProducer(messageType))
            {
                SendMapMessage(messageBody, producer);
            }
        }
        catch
        {
            _isInitSuccess = false;
            throw;
        }
    }

    /// <summary>
    /// 发送消息
    /// </summary>
    /// <param name="messageText">消息文本</param>
    /// <param name="messageProperties">消息属性</param>
    /// <param name="producer">生产者</param>
    public void SendTextMessage(string messageText, IDictionary<string, object> messageProperties, IMessageProducer producer)
    {
        try
        {
            if (!_isInitSuccess)
            {
                Start();
            }
            ITextMessage textMsg = _session.CreateTextMessage();
            if (textMsg == null)
            {
                return;
            }
            textMsg.Text = messageText;
            if (messageProperties != null && messageProperties.Count > 0)
            {
                foreach (KeyValuePair<string, object> item in messageProperties)
                {
                    if (item.Value is int)
                    {
                        textMsg.Properties.SetInt(item.Key, (int)item.Value);
                    }
                    else if (item.Value is string)
                    {
                        textMsg.Properties.SetString(item.Key, (string)item.Value);
                    }
                    else if (item.Value is bool)
                    {
                        textMsg.Properties.SetBool(item.Key, (bool)item.Value);
                    }
                    else if (item.Value is byte)
                    {
                        textMsg.Properties.SetByte(item.Key, (byte)item.Value);
                    }
                    else if (item.Value is char)
                    {
                        textMsg.Properties.SetChar(item.Key, (char)item.Value);
                    }
                    else if (item.Value is IDictionary)
                    {
                        textMsg.Properties.SetDictionary(item.Key, (IDictionary)item.Value);
                    }
                    else if (item.Value is double)
                    {
                        textMsg.Properties.SetDouble(item.Key, (double)item.Value);
                    }
                    else if (item.Value is float)
                    {
                        textMsg.Properties.SetFloat(item.Key, (float)item.Value);
                    }
                    else if (item.Value is IList)
                    {
                        textMsg.Properties.SetList(item.Key, (IList)item.Value);
                    }
                    else if (item.Value is short)
                    {
                        textMsg.Properties.SetShort(item.Key, (short)item.Value);
                    }

                }
            }

            //发送消息
            producer.Send(textMsg, MsgDeliveryMode.Persistent, MsgPriority.High, TimeSpan.MinValue);
        }
        catch
        {
            _isInitSuccess = false;
            throw;
        }
    }
    
    /// <summary>
    /// 发送消息
    /// </summary>
    /// <param name="messageText">消息文本</param>
    /// <param name="messageProperties">消息属性</param>
    /// <param name="messageType">消息类型</param>
    public void SendTextMessage(string messageText, IDictionary<string, object> messageProperties, MqMessageTypeEnum messageType)
    {
        try
        {
            if (!_isInitSuccess)
            {
                Start();
            }
            using (IMessageProducer producer = CreateProducer(messageType))
            {
                SendTextMessage(messageText,messageProperties, producer);
            }
        }
        catch
        {
            _isInitSuccess = false;
            throw;
        }
    }

    #endregion

    #region 私有方法

    // 字符数组转字符串
    private IList StringToByteList(string content)
    {
        return StringToByteList(content, "GB2312");
    }

    private IList StringToByteList(string content, string encodeName)
    {
        Byte[] resuleBytes = Encoding.GetEncoding(encodeName).GetBytes(content);
        return resuleBytes;
    }

    #endregion

    #region IDisposable 成员

    public void Dispose()
    {
        Stop();
    }

    #endregion
}
           

以上是ActiveMQ的消息发送和接收的Helper类,这里要说明一点的是,ActiveMQ消息(IMassage)有很多种类型,我这里只做了IMapMessage和ITextMessage,其他的都大同小异,读者自己扩展吧。下面再提供下测试程序。

private static string _uriProvider = "activemq:failover:(tcp://" + ConfigurationManager.AppSettings["VMActiveMQIP"] + ")?maxReconnectAttempts=3";
private static ActiveMqHelper _amh;
private static Apache.NMS.IMessageConsumer _consumer;
/// <summary>
/// 消息响应句柄
/// </summary>
/// <param name="properties">消息属性</param>
public delegate void MessageResponse(IDictionary<string, object> properties);
/// <summary>
/// 发送配置检测命令
/// </summary>
/// <param name="command_id">命令ID:0-检测,1-修复</param>
/// <param name="task_id">任务ID</param>
/// <param name="task_type">任务类型:0-手动,1-自动</param>
public static void SendCommand(int command_id, int task_id, int task_type)
{
    if (_amh == null)
    {
        _amh = new ActiveMqHelper("esp.queue.command_to_virtual_config_data", _uriProvider);
    }
    IDictionary<string, object> properties = new Dictionary<string, object>();
    properties.Add(new KeyValuePair<string, object>("command_id", command_id));
    properties.Add(new KeyValuePair<string, object>("task_id", task_id));
    properties.Add(new KeyValuePair<string, object>("task_type", task_type));
    _amh.SendMapMessage(properties, ActiveMqHelper.MqMessageTypeEnum.Queue);
}
/// <summary>
/// 接收配置检测消息
/// </summary>
/// <param name="component">消息响应组件</param>
/// <param name="responseMessage">消息响应方法</param>
public static void StartReceiveCommand(System.Windows.Forms.Control component, MessageResponse messageResponse)
{
    if (_amh == null)
    {
        _amh = new ActiveMqHelper("esp.queue.command_to_virtual_config_data", _uriProvider);
    }
    if (_consumer == null)
    {
        _consumer = _amh.CreateConsumer(ActiveMqHelper.MqMessageTypeEnum.Queue);
    }

    _consumer.Listener += (message) =>
    {
        IDictionary<string, object> properties = new Dictionary<string, object>();

        IEnumerator keys = ((Apache.NMS.IMapMessage)message).Body.Keys.GetEnumerator();
        keys.Reset();
        IEnumerator values = ((Apache.NMS.IMapMessage)message).Body.Values.GetEnumerator();
        values.Reset();
        while (keys.MoveNext())
        {
            values.MoveNext();
            properties.Add(new KeyValuePair<string, object>
                (keys.Current.ToString(), values.Current));
        }
        if (component.InvokeRequired)
        {
            component.BeginInvoke(messageResponse, properties);
        }
    };
}
/// <summary>
/// 停止接收配置检测命令
/// </summary>
public static void StopReceiveCommand()
{
    if (_amh != null)
    {
        _amh.Dispose();
        _amh = null;
    }
    if (_consumer != null)
    {
        _consumer.Close();
        _consumer = null;
    }
}
           

上面这个测试例子是用来做为发送命令的,用的是IMapMessage方式,仅作参考。