天天看點

資料接收軟體中的《如何快速穩定進行資料存儲》附源代碼

随着大資料爆發和資料微細化,如何提高資料接收軟體資料存儲能力,本節隻分享快速資料存儲。

應用技術: 多線,線程同步,EF,清單,SqlBulkCopy。

具體業務具體分析,如:大氣環境監測、雨量監測、交通(GPS),可能有秒級、分鐘資料,

接收軟體采用異步socket進行接收和發送資料,通過解析層進行資料包解析,通過資料存儲層入庫存儲

主要業務功能:資料接收,資料包解析,資料存儲,資料轉發等。

思路:利用多線程技術,資料接收線程隻接收資料存放隊列中(并通知解析處理線程),解析線程隻做資料解包合成(并通知存儲線程),

     存儲線程隻進行DB資料存儲可利用SqlBulkCopy。  

直接來代碼吧,這裡隻對一類資料進行存儲實作。

//資料實體類

using System;

using System.Collections.Generic;

using System.ComponentModel.DataAnnotations;

using System.Runtime.Serialization;

namespace DAL.Models

{

    [DataContract]

    public  class MD_StationMeteoData

    {

        [Key]

        [DataMember]

        public int Id { get; set; }

        [DataMember]

        public System.DateTime ModelDate { get; set; }

        [DataMember]

        public int LevelHeight { get; set; }

        [DataMember]

        public int StationID { get; set; }

        [DataMember]

        public System.DateTime TimePoint { get; set; }

        [DataMember]

        public string PollutantCode { get; set; }

        [DataMember]

        public double Value { get; set; }

        [DataMember]

        public string Mark { get; set; }

    }

}

//業務邏輯類

using DAL.Models;

using System;

using System.Collections.Generic;

using System.Data;

using System.Data.SqlClient;

using System.Linq;

using System.Text;

namespace DAL.BLL

{

  public   class MD_StationMeteoDataBLL

    {

        private static readonly log4net.ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

        //static MD_StationMeteoDataBLL _defaule = new MD_StationMeteoDataBLL();

        //public static MD_StationMeteoDataBLL Default

        //{

        //    get { return _defaule; }

        //}

        public  MD_StationMeteoDataBLL()

        {

        }

        private void SqlBulkCopyByDatatable(string connectionString, string TableName, DataTable dt)

        {

            using (SqlConnection conn = new SqlConnection(connectionString))

            {

                using (SqlBulkCopy sqlbulkcopy = new SqlBulkCopy(connectionString, SqlBulkCopyOptions.UseInternalTransaction))

                {

                    try

                    {

                        sqlbulkcopy.DestinationTableName = TableName;

                        for (int i = 0; i < dt.Columns.Count; i++)

                        {

                            sqlbulkcopy.ColumnMappings.Add(dt.Columns[i].ColumnName, dt.Columns[i].ColumnName);

                        }

                        sqlbulkcopy.WriteToServer(dt);

                    }

                    catch (System.Exception ex)

                    {

                        throw ex;

                    }

                }

            }

        }

        private DataTable CreateDataTable(List<MD_StationMeteoData> values)

        {

            var valuesList = values;

            DataTable table = new DataTable();

            table.Columns.AddRange(new DataColumn[] {

                new DataColumn("ModelDate", typeof(DateTime)),

                new DataColumn("LevelHeight",typeof(int)),

                new DataColumn("StationID",typeof(int)),

                new DataColumn("TimePoint", typeof(DateTime)),

                new DataColumn("PollutantCode", typeof(string)),

                new DataColumn("Value", typeof(double)),

                new DataColumn("Mark", typeof(string)),    

                  });

            foreach (var item in values)

            {

                DataRow row = table.NewRow();

                int num = 0;

                DateTime timePoint = valuesList[0].TimePoint;

                row[num++] = item.ModelDate;

                row[num++] = item.LevelHeight;

                row[num++] = item.StationID;

                row[num++] = item.TimePoint;

                row[num++] = item.PollutantCode;

                row[num++] = item.Value;

                row[num++] = "";

                table.Rows.Add(row);

            }

            return table;

        }

        public bool AddSqlBulkCopyByData(List<MD_StationMeteoData> modellist)

        {

            try

            {

              DataTable dt =   CreateDataTable(modellist);

                string constr =  db.Database.Connection.ConnectionString;

                SqlBulkCopyByDatatable(constr, "MD_StationMeteoData", dt);

                return true;

            }

            catch (Exception ex)

            {

                log.Error(ex);

                return false;

            }

        }

}

}

//資料存儲處理類

using System;

using System.Collections.Generic;

using System.Linq;

using System.Diagnostics;

using System.Threading;

using DAL.Models;

using DAL.BLL;

namespace MeteoDataProcessDemo

{

   public class SaveProcess

    {

        /// <summary>

        /// 

        /// </summary>

        private readonly AutoResetEvent _event;

        private bool _flag;

        private readonly List<MD_StationMeteoData> _MeteoDataList;

        /// <summary>

        /// 工作線程

        /// </summary>

        private Thread _worker;

        /// <summary>

        /// 靜态字段 

        /// </summary>

        public static SaveProcess Default = null;

        /// <summary>

        /// 靜态構造

        /// </summary>

        static SaveProcess()

        {

            Default = new SaveProcess();

        }

        private SaveProcess()

        {

              _flag = true;

            this._MeteoDataList = new List<MD_StationMeteoData>();

            this._event = new AutoResetEvent(true);

        }

        /// <summary>

        /// 向清單添加資料項

        /// </summary>

        /// <param name="values"></param>

        public void Append(List<MD_StationMeteoData> values)

        {

            if (values != null)

            {

                lock (this._MeteoDataList)

                {

                    this._MeteoDataList.AddRange(values);

                    this._event.Set(); //通知工作線程儲存

                }

            }

        }

        /// <summary>

        /// 

        /// </summary>

        public void Stop()

        {

            if ((this._worker != null) && this._worker.IsAlive)

            {

                this._flag = false;

                this._event.Set();

                this._worker.Join();

            }

        }

        /// <summary>

        /// 線程啟動事件

        /// </summary>

        public void Start()

        {

            _worker = new Thread(Work);

            _worker.Start();

        }

        /// <summary>

        /// 工作線程

        /// </summary>

        public void Work()

        {

            //this._flag = true;

            while (this._flag)

            {

                int num = -1;

                try

                {

                    if (this._MeteoDataList.Count == 0)

                    {

                        this._event.Reset();

                        this._event.WaitOne();

                    }

                    Thread.Sleep(1000);

                    num = this.Save();

                }

                catch (Exception ex)

                {

                }

                finally

                {

                }

            }

        }

        /// <summary>

        /// 快速複制List 

        /// </summary>

        /// <returns></returns>

        private int Save()

        {

            if (this._MeteoDataList.Count > 0)

            {

                MD_StationMeteoData[] values = null;

                lock (this._MeteoDataList) //

                {

                    values = this._MeteoDataList.ToArray(); //複制list 

                    this._MeteoDataList.Clear();            //清空清單

                }

                if (values != null)

                {

                    return this.Save(values);

                }

                return -1;

            }

            return 0;

        }

        /// <summary>

        /// 儲存資料-SqlBulkCopyByData

        /// </summary>

        /// <param name="values"></param>

        /// <returns></returns>

        private int Save(MD_StationMeteoData[] values)

        {

            int num = 0;

            try

            {

               MD_StationMeteoDataBLL meteodatabll = new MD_StationMeteoDataBLL();

                meteodatabll.AddSqlBulkCopyByData(values.ToList<MD_StationMeteoData>());

            }

            finally

            {

            }

            return num;

        }

    }

}

繼續閱讀