天天看點

ActiveMQ for .NET消息總線操作

相信ActiveMQ大家一定很熟悉吧,它是JAVA平台下的十分強大的消息總線工具,你可以在JAVA平台下很輕松地使用它的SDK進行消息開發。它同時也提供了.NET版本,這對我們這些微軟的擁趸來說無疑是個好消息。隻要到官網上去下載下傳兩個動态連結庫:Apache.NMS.dll和Apache.NMS.ActiveMQ.dll,即可開始你的開發之旅了。

在進行開發之前,先了解下ActiveMQ中的兩個概念:生産者(Producer)和消費者(Consumer)。我的解釋算不上專業,隻能是通俗易懂。顧名思義,生産者就是生産消息的,也就是發送消息;而消費者是消費消息的,也就是接收消息。好啦,這就是我的了解,要更詳細更準确的解釋請找度娘和谷哥。廢話不多說,上幹貨!

/// <summary>
/// ActiveMq消息總線
/// </summary>
public class ActiveMqHelper:IDisposable
{
    #region 成員變量

    /// <summary>
    /// ActiveMQ消息類型
    /// </summary>
    public enum MqMessageTypeEnum
    { 
        /// <summary>
        /// 主題。生産者釋出主題,所有訂閱者都将收到該主題。
        /// </summary>
        Topic,
        /// <summary>
        /// 消息隊列。點對點模式。
        /// </summary>
        Queue
    }

    private bool _isInitSuccess = false; // 隊列消息消費者是否成功初始化
    private IConnection _connection;
    private ISession _session;

    private string _uriProvider;
    /// <summary>
    /// 生産者消息總線位址
    /// </summary>
    public string UriProvider
    {
        get { return _uriProvider; }
        set { _uriProvider = value; }
    }
    private string _messageName;
    /// <summary>
    /// 主題或消息隊列
    /// </summary>
    public string MessageName
    {
        get { return _messageName; }
        set { _messageName = value; }
    }
    private string _userName;
    /// <summary>
    /// 使用者名
    /// </summary>
    public string UserName
    {
        get { return _userName; }
        set { _userName = value; }
    }
    private string _password;
    /// <summary>
    /// 密碼
    /// </summary>
    public string Password
    {
        get { return _password; }
        set { _password = value; }
    }


    #endregion

    #region 構造函數

    public ActiveMqHelper()
    { 
    
    }

    public ActiveMqHelper(string messageName, string uriProvider)
        : this()
    {
        _uriProvider = uriProvider;
        _messageName = messageName;
    }

    public ActiveMqHelper(string messageName, string uriProvider, string userName, string password)
        : this(messageName, uriProvider)
    {
        _userName = userName;
        _password = password;
    }

    ~ActiveMqHelper()
    {
         Stop();
    }

    #endregion

    #region 公共方法

    /// <summary>
    /// 初始化隊列消息消費者
    /// </summary>
    public void Start()
    {
        try
        {
            if (string.IsNullOrEmpty(_uriProvider))
            {
                throw new ArgumentNullException("生産者消息總線位址為空!");
            }
            if (_isInitSuccess)
            {
                Stop();
            }

            // 得到連接配接工廠
            ConnectionFactory factory = new ConnectionFactory(_uriProvider);
            // 建立一個連接配接
            if (!string.IsNullOrEmpty(_userName))
            {
                _connection = factory.CreateConnection(_userName, _password);
            }
            else
            {
                _connection = factory.CreateConnection();
            }
            // 打開連接配接
            _connection.Start();
            // 得到會話
            _session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
            // 初始化完成
            _isInitSuccess = true;
        }
        catch
        {
            _isInitSuccess = false;
            throw;
        }
    }

    /// <summary>
    /// 停止隊列消息消費者,釋放資源
    /// </summary>
    public void Stop()
    {
        try
        {
            if (!_isInitSuccess)
                return;

            if (_session != null)
            {
                _session.Close();
                _session = null;
            }

            if (_connection != null)
            {
                _connection.Stop();
                _connection.Close();
                _connection = null;
            }
        }
        catch
        {
            throw;
        }
    }

    /// <summary>
    /// 建立生産者
    /// </summary>
    /// <param name="messageType">消息類型</param>
    /// <param name="messageName">主題或消息隊列</param>
    public IMessageProducer CreateProducer(MqMessageTypeEnum messageType, string messageName)
    {
        try
        {
            if (!_isInitSuccess)
            {
                Start();
            }
            if (messageType == MqMessageTypeEnum.Topic)
            {
                return _session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(messageName));
            }
            else
            {
                return _session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(messageName));
            }
            //IDestination destNation = SessionUtil.GetDestination(_session, _messageName, DestinationType.Queue);
            //return _session.CreateProducer(destNation);
        }
        catch
        {
            _isInitSuccess = false;
            throw;
        }
    }

    /// <summary>
    /// 建立生産者
    /// </summary>
    /// <param name="messageType">消息類型</param>
    public IMessageProducer CreateProducer(MqMessageTypeEnum messageType)
    {
        return CreateProducer(messageType, _messageName);
    }

    /// <summary>
    /// 建立消費者
    /// </summary>
    /// <param name="messageType">消息類型</param>
    /// <param name="messageName">主題或消息隊列</param>
    /// <param name="selector">篩選器</param>
    public IMessageConsumer CreateConsumer(MqMessageTypeEnum messageType, string messageName, string selector)
    {
        try
        {
            if (!_isInitSuccess)
            {
                Start();
            }
            if (string.IsNullOrEmpty(selector))
            {
                if (messageType == MqMessageTypeEnum.Topic)
                {
                    return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(messageName));
                }
                else
                {
                    return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(messageName));
                }
            }
            else
            {
                if (messageType == MqMessageTypeEnum.Topic)
                {
                    return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(messageName), selector, false);
                }
                else
                {
                    return _session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(messageName), selector, false);
                }
            }
        }
        catch
        {
            _isInitSuccess = false;
            throw;
        }
    }

    /// <summary>
    /// 建立消費者
    /// </summary>
    /// <param name="messageType">消息類型</param>
    /// <param name="messageName">主題或消息隊列</param>
    public IMessageConsumer CreateConsumer(MqMessageTypeEnum messageType, string messageName)
    {
        return CreateConsumer(messageType, messageName, null);
    }

    /// <summary>
    /// 建立消費者
    /// </summary>
    /// <param name="messageType">消息類型</param>
    public IMessageConsumer CreateConsumer(MqMessageTypeEnum messageType)
    {
        return CreateConsumer(messageType, _messageName, null);
    }


    /// <summary>
    /// 發送消息
    /// </summary>
    /// <param name="messageBody">消息消息主體param>
    /// <param name="producer">生産者</param>
    public void SendMapMessage(IDictionary<string, object> messageBody, IMessageProducer producer)
    {
        try
        {
            if (!_isInitSuccess)
            {
                Start();
            } 
            IMapMessage mapMsg = _session.CreateMapMessage();
            if (mapMsg == null)
            {
                return;
            }

            foreach (KeyValuePair<string, object> item in messageBody)
            {
                if (item.Value is int)
                {
                    mapMsg.Body.SetInt(item.Key, (int)item.Value);
                }
                else if (item.Value is string)
                {
                    mapMsg.Body.SetString(item.Key, (string)item.Value);
                }
                else if (item.Value is bool)
                {
                    mapMsg.Body.SetBool(item.Key, (bool)item.Value);
                }
                else if (item.Value is byte)
                {
                    mapMsg.Body.SetByte(item.Key, (byte)item.Value);
                }
                else if (item.Value is char)
                {
                    mapMsg.Body.SetChar(item.Key, (char)item.Value);
                }
                else if (item.Value is IDictionary)
                {
                    mapMsg.Body.SetDictionary(item.Key, (IDictionary)item.Value);
                }
                else if (item.Value is double)
                {
                    mapMsg.Body.SetDouble(item.Key, (double)item.Value);
                }
                else if (item.Value is float)
                {
                    mapMsg.Body.SetFloat(item.Key, (float)item.Value);
                }
                else if (item.Value is IList)
                {
                    mapMsg.Body.SetList(item.Key, (IList)item.Value);
                }
                else if (item.Value is short)
                {
                    mapMsg.Body.SetShort(item.Key, (short)item.Value);
                }
            }

            //發送消息
            producer.Send(mapMsg, MsgDeliveryMode.Persistent, MsgPriority.High, TimeSpan.MinValue);
        }
        catch
        {
            _isInitSuccess = false;
            throw;
        }
    }

    /// <summary>
    /// 發送消息
    /// </summary>
    /// <param name="messageBody">消息主體</param>
    /// <param name="messageType">消息類型</param>
    public void SendMapMessage(IDictionary<string, object> messageBody, MqMessageTypeEnum messageType)
    {
        try
        {
            if (!_isInitSuccess)
            {
                Start();
            }
            using (IMessageProducer producer = CreateProducer(messageType))
            {
                SendMapMessage(messageBody, producer);
            }
        }
        catch
        {
            _isInitSuccess = false;
            throw;
        }
    }

    /// <summary>
    /// 發送消息
    /// </summary>
    /// <param name="messageText">消息文本</param>
    /// <param name="messageProperties">消息屬性</param>
    /// <param name="producer">生産者</param>
    public void SendTextMessage(string messageText, IDictionary<string, object> messageProperties, IMessageProducer producer)
    {
        try
        {
            if (!_isInitSuccess)
            {
                Start();
            }
            ITextMessage textMsg = _session.CreateTextMessage();
            if (textMsg == null)
            {
                return;
            }
            textMsg.Text = messageText;
            if (messageProperties != null && messageProperties.Count > 0)
            {
                foreach (KeyValuePair<string, object> item in messageProperties)
                {
                    if (item.Value is int)
                    {
                        textMsg.Properties.SetInt(item.Key, (int)item.Value);
                    }
                    else if (item.Value is string)
                    {
                        textMsg.Properties.SetString(item.Key, (string)item.Value);
                    }
                    else if (item.Value is bool)
                    {
                        textMsg.Properties.SetBool(item.Key, (bool)item.Value);
                    }
                    else if (item.Value is byte)
                    {
                        textMsg.Properties.SetByte(item.Key, (byte)item.Value);
                    }
                    else if (item.Value is char)
                    {
                        textMsg.Properties.SetChar(item.Key, (char)item.Value);
                    }
                    else if (item.Value is IDictionary)
                    {
                        textMsg.Properties.SetDictionary(item.Key, (IDictionary)item.Value);
                    }
                    else if (item.Value is double)
                    {
                        textMsg.Properties.SetDouble(item.Key, (double)item.Value);
                    }
                    else if (item.Value is float)
                    {
                        textMsg.Properties.SetFloat(item.Key, (float)item.Value);
                    }
                    else if (item.Value is IList)
                    {
                        textMsg.Properties.SetList(item.Key, (IList)item.Value);
                    }
                    else if (item.Value is short)
                    {
                        textMsg.Properties.SetShort(item.Key, (short)item.Value);
                    }

                }
            }

            //發送消息
            producer.Send(textMsg, MsgDeliveryMode.Persistent, MsgPriority.High, TimeSpan.MinValue);
        }
        catch
        {
            _isInitSuccess = false;
            throw;
        }
    }
    
    /// <summary>
    /// 發送消息
    /// </summary>
    /// <param name="messageText">消息文本</param>
    /// <param name="messageProperties">消息屬性</param>
    /// <param name="messageType">消息類型</param>
    public void SendTextMessage(string messageText, IDictionary<string, object> messageProperties, MqMessageTypeEnum messageType)
    {
        try
        {
            if (!_isInitSuccess)
            {
                Start();
            }
            using (IMessageProducer producer = CreateProducer(messageType))
            {
                SendTextMessage(messageText,messageProperties, producer);
            }
        }
        catch
        {
            _isInitSuccess = false;
            throw;
        }
    }

    #endregion

    #region 私有方法

    // 字元數組轉字元串
    private IList StringToByteList(string content)
    {
        return StringToByteList(content, "GB2312");
    }

    private IList StringToByteList(string content, string encodeName)
    {
        Byte[] resuleBytes = Encoding.GetEncoding(encodeName).GetBytes(content);
        return resuleBytes;
    }

    #endregion

    #region IDisposable 成員

    public void Dispose()
    {
        Stop();
    }

    #endregion
}
           

以上是ActiveMQ的消息發送和接收的Helper類,這裡要說明一點的是,ActiveMQ消息(IMassage)有很多種類型,我這裡隻做了IMapMessage和ITextMessage,其他的都大同小異,讀者自己擴充吧。下面再提供下測試程式。

private static string _uriProvider = "activemq:failover:(tcp://" + ConfigurationManager.AppSettings["VMActiveMQIP"] + ")?maxReconnectAttempts=3";
private static ActiveMqHelper _amh;
private static Apache.NMS.IMessageConsumer _consumer;
/// <summary>
/// 消息響應句柄
/// </summary>
/// <param name="properties">消息屬性</param>
public delegate void MessageResponse(IDictionary<string, object> properties);
/// <summary>
/// 發送配置檢測指令
/// </summary>
/// <param name="command_id">指令ID:0-檢測,1-修複</param>
/// <param name="task_id">任務ID</param>
/// <param name="task_type">任務類型:0-手動,1-自動</param>
public static void SendCommand(int command_id, int task_id, int task_type)
{
    if (_amh == null)
    {
        _amh = new ActiveMqHelper("esp.queue.command_to_virtual_config_data", _uriProvider);
    }
    IDictionary<string, object> properties = new Dictionary<string, object>();
    properties.Add(new KeyValuePair<string, object>("command_id", command_id));
    properties.Add(new KeyValuePair<string, object>("task_id", task_id));
    properties.Add(new KeyValuePair<string, object>("task_type", task_type));
    _amh.SendMapMessage(properties, ActiveMqHelper.MqMessageTypeEnum.Queue);
}
/// <summary>
/// 接收配置檢測消息
/// </summary>
/// <param name="component">消息響應元件</param>
/// <param name="responseMessage">消息響應方法</param>
public static void StartReceiveCommand(System.Windows.Forms.Control component, MessageResponse messageResponse)
{
    if (_amh == null)
    {
        _amh = new ActiveMqHelper("esp.queue.command_to_virtual_config_data", _uriProvider);
    }
    if (_consumer == null)
    {
        _consumer = _amh.CreateConsumer(ActiveMqHelper.MqMessageTypeEnum.Queue);
    }

    _consumer.Listener += (message) =>
    {
        IDictionary<string, object> properties = new Dictionary<string, object>();

        IEnumerator keys = ((Apache.NMS.IMapMessage)message).Body.Keys.GetEnumerator();
        keys.Reset();
        IEnumerator values = ((Apache.NMS.IMapMessage)message).Body.Values.GetEnumerator();
        values.Reset();
        while (keys.MoveNext())
        {
            values.MoveNext();
            properties.Add(new KeyValuePair<string, object>
                (keys.Current.ToString(), values.Current));
        }
        if (component.InvokeRequired)
        {
            component.BeginInvoke(messageResponse, properties);
        }
    };
}
/// <summary>
/// 停止接收配置檢測指令
/// </summary>
public static void StopReceiveCommand()
{
    if (_amh != null)
    {
        _amh.Dispose();
        _amh = null;
    }
    if (_consumer != null)
    {
        _consumer.Close();
        _consumer = null;
    }
}
           

上面這個測試例子是用來做為發送指令的,用的是IMapMessage方式,僅作參考。