天天看點

NET下RabbitMQ實踐[實戰篇]

     之前的文章中,介紹了如何将RabbitMQ以WCF方式進行釋出。今天就介紹一下我們産品中如何使用RabbitMQ的!

     在Discuz!NT企業版中,提供了對HTTP錯誤日志的記錄功能,這一點對企業版非常重要,另外存儲錯誤日志使用了MongoDB,理由很簡單,MongoDB的添加操作飛快,即使數量過億之後插入速度依舊不減。

     在開始正文之前,先說明一下本文的代碼分析順序,即:程式入口==》RabbitMQ用戶端===>RabbitMQ服務端。好了,閑話少說,開始正文!    

     首先是程式入口,也就是WCF+RabbitMQ用戶端實作:    

     因為Discuz!NT使用了HttpModule方式來接管HTTP連結請求,而在.NET的HttpModule模闆中,可以通過如下方法來接管程式運作時發生的ERROR,如下:    

context.Error += new EventHandler(Application_OnError);

     而“記錄錯誤日志"的功能入口就在這裡:    

public void Application_OnError(Object sender, EventArgs e)

    {

        string requestUrl = DNTRequest.GetUrl();

        HttpApplication application = (HttpApplication)sender;

        HttpContext context = application.Context;

#if EntLib

        if (RabbitMQConfigs.GetConfig() != null && RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable)//當開啟errlog錯誤日志記錄功能時

        {

            RabbitMQClientHelper.GetHttpModuleErrLogClient().AsyncAddLog(new HttpModuleErrLogData(LogLevel.High, context.Server.GetLastError().ToString()));//異步方式

            //RabbitMQHelper.GetHttpModuleErrLogClient().AddLog(new HttpModuleErrLogData(LogLevel.High, "wrong message infomation!"));//同步方式

            return;

        }

#endif

        ...

     }

     當然從代碼可以看出,記錄日志的工作基本是通過配置檔案控制的,即“HttpModuleErrLog.Enable”。

     而RabbitMQClientHelper是一個封裝類,主要用于反射生成IHttpModuleErrlogClient接口執行個體,該執行個體就是“基于WCF釋出的RabbitMQ”的用戶端通路對象。

/// <summary>

/// RabbitMQ

/// </summary>

public class RabbitMQClientHelper

{

    static IHttpModuleErrlogClient ihttpModuleErrLogClient;

    private static object lockHelper = new object();

    public static IHttpModuleErrlogClient GetHttpModuleErrLogClient()

    {

        if (ihttpModuleErrLogClient == null)

        {

            lock (lockHelper)

            {

                if (ihttpModuleErrLogClient == null)

                {

                    try

                    {

                        if (RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable)

                        {

                            ihttpModuleErrLogClient = (IHttpModuleErrlogClient)Activator.CreateInstance(Type.GetType(

                                  "Discuz.EntLib.RabbitMQ.Client.HttpModuleErrLogClient, Discuz.EntLib.RabbitMQ.Client", false, true));

                        }

                    }

                    catch

                    {

                        throw new Exception("請檢查 Discuz.EntLib.RabbitMQ.dll 檔案是否被放置到了bin目錄下!");

                    }

                }

            }

        }

        return ihttpModuleErrLogClient;

    }

}

    可以看出它反射的是Discuz.EntLib.RabbitMQ.dll檔案的HttpModuleErrLogClient對象(注:使用反射的原因主要是解決企業版代碼與普遍版代碼在項目引用上的互相依賴),下面就是其接口和具體要求實作:       

    /// <summary>

    /// IHttpModuleErrlogClient用戶端接口類,用于反射執行個體化綁定

    /// </summary>

    public interface IHttpModuleErrlogClient

    {

        void AddLog(HttpModuleErrLogData httpModuleErrLogData);

        void AsyncAddLog(HttpModuleErrLogData httpModuleErrLogData);

    }

    public class HttpModuleErrLogClient : IHttpModuleErrlogClient

    {

        public void AddLog(HttpModuleErrLogData httpModuleErrLogData)

        {

            try

            {

                //((RabbitMQBinding)binding).OneWayOnly = true;

                ChannelFactory<IHttpModuleErrLogService> m_factory = new ChannelFactory<IHttpModuleErrLogService>(GetBinding(), "soap.amqp:///HttpModuleErrLogService");

                m_factory.Open();

                IHttpModuleErrLogService m_client = m_factory.CreateChannel();

                m_client.AddLog(httpModuleErrLogData);

                ((IClientChannel)m_client).Close();

                m_factory.Close();

            }

            catch (System.Exception e)

            {

                string msg = e.Message;

            }

        }

        private delegate void delegateAddLog(HttpModuleErrLogData httpModuleErrLogData);

        public void AsyncAddLog(HttpModuleErrLogData httpModuleErrLogData)

        {

            delegateAddLog AddLog_aysncallback = new delegateAddLog(AddLog);

            AddLog_aysncallback.BeginInvoke(httpModuleErrLogData, null, null);

        }

        public Binding GetBinding()

        {

            return new RabbitMQBinding(RabbitMQConfigs.GetConfig().HttpModuleErrLog.RabbitMQAddress);

        }

    }

    可以看出,AddLog方法與上一篇中的用戶端内容基本上沒什麼太大差别,隻不過它提供了同步和異步通路兩種方式,這樣做的目的主要是使用者可根據生産環境來靈活配置。

    下面就來看一下RabbitMQ的服務端實作,首先看一下其運作效果,如下圖:

NET下RabbitMQ實踐[實戰篇]

    接着看一下啟動rabbitmq服務的代碼:   

public void StartService(System.ServiceModel.Channels.Binding binding)

    {

        m_host = new ServiceHost(typeof(HttpModuleErrLogService), new Uri("soap.amqp:///"));

        //((RabbitMQBinding)binding).OneWayOnly = true;

        m_host.AddServiceEndpoint(typeof(IHttpModuleErrLogService), binding, "HttpModuleErrLogService");

        m_host.Open();

        m_serviceStarted = true;            

    }    

    上面代碼會添加IHttpModuleErrLogService接口實作類HttpModuleErrLogService 的Endpoint,并啟動它,下面就是該接口聲明:

    /// <summary>

    /// IHttpModuleErrLogService接口類

    /// </summary>  

    [ServiceContract]

    public interface IHttpModuleErrLogService

    {

        /// <summary>

        /// 添加httpModuleErrLogData日志資訊

        /// </summary>

        /// <param name="httpModuleErrLogData"></param>

        [OperationContract]

        void AddLog(HttpModuleErrLogData httpModuleErrLogData);

    }

    代碼很簡單,就是定義了一個添加日志的方法:void AddLog(HttpModuleErrLogData httpModuleErrLogData)

    下面就是接口的具體實作,首先是類聲明及初始化代碼:   

[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] //Single - 為所有用戶端調用配置設定一個服務執行個體。

    public class HttpModuleErrLogService : IHttpModuleErrLogService

    {    

        /// <summary>

        /// 擷取HttpModuleErrLogInfo配置檔案對象執行個體

        /// </summary>

        private static HttpModuleErrLogInfo httpModuleErrorLogInfo = RabbitMQConfigs.GetConfig().HttpModuleErrLog;

        /// <summary>

        /// 定時器對象

        /// </summary>

        private static System.Timers.Timer _timer;

        /// <summary>

        /// 定時器的時間

        /// </summary>

        private static int _elapsed = 0;

        public static void Initial(System.Windows.Forms.RichTextBox msgBox, int elapsed)

        {

            _msgBox = msgBox;

            _elapsed = elapsed;

            //初始定時器

            if (_elapsed > 0)

            {

                _timer = new System.Timers.Timer() { Interval = elapsed * 1000,  Enabled = true, AutoReset = true };            

                _timer.Elapsed += new System.Timers.ElapsedEventHandler(Timer_Elapsed);

                _timer.Start();

            }

        }

        /// <summary>

        /// 時間到時執行出隊操作

        /// </summary>

        /// <param name="sender"></param>

        /// <param name="e"></param>

        private static void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)

        {    

            Dequeue();    

        }

        可以看出,這裡使用了靜态定時器對象,來進行定時通路隊列資訊功能(“非同步出隊”操作),這樣設計的原因主要是為使用者提供适合的配置方式,即如果不使用定時器(為0時),則系統會在日志入隊後,就立即啟動出隊(“同步出隊”)操作擷取日志資訊并插入到MongoDB資料庫中。

      下面介紹一下入隊操作實作:      

        /// <summary>

        /// 添加httpModuleErrLogData日志資訊

        /// </summary>

        /// <param name="httpModuleErrLogData"></param>

        public void AddLog(HttpModuleErrLogData httpModuleErrLogData)

        {

            Enqueue(httpModuleErrLogData);

            if (_elapsed <=0) //如果使用定時器(為0時),則立即執行出隊操作

                Dequeue();

        }   

        /// <summary>

        /// 交換機名稱

        /// </summary>

        private const string EXCHANGE = "ex1";

        /// <summary>

        /// 交換方法,更多内容參見:http://melin.javaeye.com/blog/691265

        /// </summary>

        private const string EXCHANGE_TYPE = "direct";

        /// <summary>

        /// 路由key,更多内容參見:http://sunjun041640.blog.163.com/blog/static/256268322010328102029919/

        /// </summary>

        private const string ROUTING_KEY = "m1";

        /// <summary>

        /// 日志入隊

        /// </summary>

        /// <param name="httpModuleErrLogData"></param>

        public static void Enqueue(HttpModuleErrLogData httpModuleErrLogData)

        {

            Uri uri = new Uri(httpModuleErrorLogInfo.RabbitMQAddress);         

            ConnectionFactory cf = new ConnectionFactory()

            {

                UserName = httpModuleErrorLogInfo.UserName,

                Password = httpModuleErrorLogInfo.PassWord,

                VirtualHost = "dnt_mq",

                RequestedHeartbeat = 0,

                Endpoint = new AmqpTcpEndpoint(uri)

            };

            using (IConnection conn = cf.CreateConnection())

            {

                using (IModel ch = conn.CreateModel())

                {

                    if (EXCHANGE_TYPE != null)

                    {

                        ch.ExchangeDeclare(EXCHANGE, EXCHANGE_TYPE);//,true,true,false,false, true,null);

                        ch.QueueDeclare(httpModuleErrorLogInfo.QueueName, true);//true, true, true, false, false, null);

                        ch.QueueBind(httpModuleErrorLogInfo.QueueName, EXCHANGE, ROUTING_KEY, false, null);

                    }

                    IMapMessageBuilder b = new MapMessageBuilder(ch);

                    IDictionary target = b.Headers;

                    target["header"] = "HttpErrLog";

                    IDictionary targetBody = b.Body;

                    targetBody["body"] = SerializationHelper.Serialize(httpModuleErrLogData);

                    ((IBasicProperties)b.GetContentHeader()).DeliveryMode = 2;//persistMode                   

                    ch.BasicPublish(EXCHANGE, ROUTING_KEY,

                                               (IBasicProperties)b.GetContentHeader(),

                                               b.GetContentBody());

                }

            }

        }

        代碼很簡單,主要構造rabbitmq連結(ConnectionFactory)并初始化相應參數如使用者名,密碼,ROUTING_KEY等。

        然後将傳入的日志對象序列化成字元串對象,指派給targetBody["body"],這樣做主要是因為我沒找到更好的方法來指派(之前嘗試直接綁定httpModuleErrLogData到targetBody["body"],但在出隊操作中找不到合适方法将httpModuleErrLogData對象解析出來)。

        下面就是出隊操作:      

        /// <summary>

        /// 日志出隊

        /// </summary>

        public static void Dequeue()

        {       

            string serverAddress = httpModuleErrorLogInfo.RabbitMQAddress.Replace("amqp://", "").TrimEnd('/'); //"10.0.4.85:5672";

            ConnectionFactory cf = new ConnectionFactory()

            {

                UserName = httpModuleErrorLogInfo.UserName,

                Password = httpModuleErrorLogInfo.PassWord,

                VirtualHost = "dnt_mq",

                RequestedHeartbeat = 0,

                Address = serverAddress

            };

            using (IConnection conn = cf.CreateConnection())

            {

                using (IModel ch = conn.CreateModel())

                {

                    while (true)

                    {

                        BasicGetResult res = ch.BasicGet(httpModuleErrorLogInfo.QueueName, false);

                        if (res != null)

                        {

                            try

                            {

                                string objstr = System.Text.UTF8Encoding.UTF8.GetString(res.Body).Replace("\0\0\0body\0\n", "");//去掉頭部資訊

                                object obj = SerializationHelper.DeSerialize(typeof(HttpModuleErrLogData), objstr);

                                HttpModuleErrLogData httpModuleErrLogData = obj as HttpModuleErrLogData;

                                if (httpModuleErrLogData != null)

                                {

                                    MongoDbHelper.Insert(new Mongo(httpModuleErrorLogInfo.MongoDB), "dnt_httpmoduleerrlog", LoadAttachment(httpModuleErrLogData));

                                    _msgBox.BeginInvoke(new ShowMsg(SetMsgRichBox), "\r發生時間:" + httpModuleErrLogData.TimeStamp + "\r錯誤等級:" + httpModuleErrLogData.Level + "\r詳細資訊:" + httpModuleErrLogData.Message);

                                    ch.BasicAck(res.DeliveryTag, false);

                                }

                            }

                            catch { }

                        }

                        else

                            break;

                    }

                }

            }           

        } 

        出隊操作也是先執行個體化連結到rabbitmq 的執行個體,并循環使用BasicGet方法來單條擷取隊列資訊,并最終将res.Body的資料序列化成HttpModuleErrLogData對象,并最終插入到mongodb資料庫中。同時将擷取的隊列消息顯示出來:

    _msgBox.BeginInvoke(new ShowMsg(SetMsgRichBox), "\r發生時間:" + httpModuleErrLogData.TimeStamp + "\r錯誤等級:" + httpModuleErrLogData.Level + "\r詳細資訊:" + httpModuleErrLogData.Message);

        這裡使用異步方式顯示出隊的日志資訊,其聲明的delegate 方法“ShowMsg”如下:

        /// <summary>

        /// 聲明委托

        /// </summary>

        /// <param name="message"></param>

        public delegate void ShowMsg(string message);

        /// <summary>

        /// 綁定到上面delegate的方法

        /// </summary>

        /// <param name="outPut"></param>

        public static void SetMsgRichBox(string outPut)

        {

            _msgBox.Text += "\r==================================\r下列錯誤資訊出隊時間=>" + DateTime.Now + outPut + "\r";

        }

        同時使用LoadAttachment方法來實作HttpModuleErrLogData到mongodb的Document類型的轉換:       

        /// <summary>

        /// 将HttpModuleErrLogData轉換成Document類型

        /// </summary>

        /// <param name="httpModuleErrLogData"></param>

        /// <returns></returns>

        public static Document LoadAttachment(HttpModuleErrLogData httpModuleErrLogData)

        {

           Document doc = new Document();

            doc["_id"] = httpModuleErrLogData.Oid;

            doc["level"] = httpModuleErrLogData.Level;

            doc["message"] = httpModuleErrLogData.Message;

            doc["timestamp"] = httpModuleErrLogData.TimeStamp;

            return doc;

        }       

     到這裡,主要的功能介紹就差不多了。當然本文所闡述的隻是一個原型,相信會随着對rabbitmq的了解深入而不斷完善,感興趣的朋友歡迎讨論交流,以糾正我認識上的偏差,呵呵。

   原文連結:http://www.cnblogs.com/daizhj/archive/2010/10/25/1860442.html   

   Tags:discuz!nt,Rabbitmq,NET,mongodb

   BLOG: http://daizhj.cnblogs.com/

   作者:daizhj,代震軍