天天看点

数据接收软件中的《如何快速稳定进行数据存储》附源代码

随着大数据爆发和数据微细化,如何提高数据接收软件数据存储能力,本节只分享快速数据存储。

应用技术: 多线,线程同步,EF,列表,SqlBulkCopy。

具体业务具体分析,如:大气环境监测、雨量监测、交通(GPS),可能有秒级、分钟数据,

接收软件采用异步socket进行接收和发送数据,通过解析层进行数据包解析,通过数据存储层入库存储

主要业务功能:数据接收,数据包解析,数据存储,数据转发等。

思路:利用多线程技术,数据接收线程只接收数据存放队列中(并通知解析处理线程),解析线程只做数据解包合成(并通知存储线程),

     存储线程只进行DB数据存储可利用SqlBulkCopy。  

直接来代码吧,这里只对一类数据进行存储实现。

//数据实体类

using System;

using System.Collections.Generic;

using System.ComponentModel.DataAnnotations;

using System.Runtime.Serialization;

namespace DAL.Models

{

    [DataContract]

    public  class MD_StationMeteoData

    {

        [Key]

        [DataMember]

        public int Id { get; set; }

        [DataMember]

        public System.DateTime ModelDate { get; set; }

        [DataMember]

        public int LevelHeight { get; set; }

        [DataMember]

        public int StationID { get; set; }

        [DataMember]

        public System.DateTime TimePoint { get; set; }

        [DataMember]

        public string PollutantCode { get; set; }

        [DataMember]

        public double Value { get; set; }

        [DataMember]

        public string Mark { get; set; }

    }

}

//业务逻辑类

using DAL.Models;

using System;

using System.Collections.Generic;

using System.Data;

using System.Data.SqlClient;

using System.Linq;

using System.Text;

namespace DAL.BLL

{

  public   class MD_StationMeteoDataBLL

    {

        private static readonly log4net.ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

        //static MD_StationMeteoDataBLL _defaule = new MD_StationMeteoDataBLL();

        //public static MD_StationMeteoDataBLL Default

        //{

        //    get { return _defaule; }

        //}

        public  MD_StationMeteoDataBLL()

        {

        }

        private void SqlBulkCopyByDatatable(string connectionString, string TableName, DataTable dt)

        {

            using (SqlConnection conn = new SqlConnection(connectionString))

            {

                using (SqlBulkCopy sqlbulkcopy = new SqlBulkCopy(connectionString, SqlBulkCopyOptions.UseInternalTransaction))

                {

                    try

                    {

                        sqlbulkcopy.DestinationTableName = TableName;

                        for (int i = 0; i < dt.Columns.Count; i++)

                        {

                            sqlbulkcopy.ColumnMappings.Add(dt.Columns[i].ColumnName, dt.Columns[i].ColumnName);

                        }

                        sqlbulkcopy.WriteToServer(dt);

                    }

                    catch (System.Exception ex)

                    {

                        throw ex;

                    }

                }

            }

        }

        private DataTable CreateDataTable(List<MD_StationMeteoData> values)

        {

            var valuesList = values;

            DataTable table = new DataTable();

            table.Columns.AddRange(new DataColumn[] {

                new DataColumn("ModelDate", typeof(DateTime)),

                new DataColumn("LevelHeight",typeof(int)),

                new DataColumn("StationID",typeof(int)),

                new DataColumn("TimePoint", typeof(DateTime)),

                new DataColumn("PollutantCode", typeof(string)),

                new DataColumn("Value", typeof(double)),

                new DataColumn("Mark", typeof(string)),    

                  });

            foreach (var item in values)

            {

                DataRow row = table.NewRow();

                int num = 0;

                DateTime timePoint = valuesList[0].TimePoint;

                row[num++] = item.ModelDate;

                row[num++] = item.LevelHeight;

                row[num++] = item.StationID;

                row[num++] = item.TimePoint;

                row[num++] = item.PollutantCode;

                row[num++] = item.Value;

                row[num++] = "";

                table.Rows.Add(row);

            }

            return table;

        }

        public bool AddSqlBulkCopyByData(List<MD_StationMeteoData> modellist)

        {

            try

            {

              DataTable dt =   CreateDataTable(modellist);

                string constr =  db.Database.Connection.ConnectionString;

                SqlBulkCopyByDatatable(constr, "MD_StationMeteoData", dt);

                return true;

            }

            catch (Exception ex)

            {

                log.Error(ex);

                return false;

            }

        }

}

}

//数据存储处理类

using System;

using System.Collections.Generic;

using System.Linq;

using System.Diagnostics;

using System.Threading;

using DAL.Models;

using DAL.BLL;

namespace MeteoDataProcessDemo

{

   public class SaveProcess

    {

        /// <summary>

        /// 

        /// </summary>

        private readonly AutoResetEvent _event;

        private bool _flag;

        private readonly List<MD_StationMeteoData> _MeteoDataList;

        /// <summary>

        /// 工作线程

        /// </summary>

        private Thread _worker;

        /// <summary>

        /// 静态字段 

        /// </summary>

        public static SaveProcess Default = null;

        /// <summary>

        /// 静态构造

        /// </summary>

        static SaveProcess()

        {

            Default = new SaveProcess();

        }

        private SaveProcess()

        {

              _flag = true;

            this._MeteoDataList = new List<MD_StationMeteoData>();

            this._event = new AutoResetEvent(true);

        }

        /// <summary>

        /// 向列表添加数据项

        /// </summary>

        /// <param name="values"></param>

        public void Append(List<MD_StationMeteoData> values)

        {

            if (values != null)

            {

                lock (this._MeteoDataList)

                {

                    this._MeteoDataList.AddRange(values);

                    this._event.Set(); //通知工作线程保存

                }

            }

        }

        /// <summary>

        /// 

        /// </summary>

        public void Stop()

        {

            if ((this._worker != null) && this._worker.IsAlive)

            {

                this._flag = false;

                this._event.Set();

                this._worker.Join();

            }

        }

        /// <summary>

        /// 线程启动事件

        /// </summary>

        public void Start()

        {

            _worker = new Thread(Work);

            _worker.Start();

        }

        /// <summary>

        /// 工作线程

        /// </summary>

        public void Work()

        {

            //this._flag = true;

            while (this._flag)

            {

                int num = -1;

                try

                {

                    if (this._MeteoDataList.Count == 0)

                    {

                        this._event.Reset();

                        this._event.WaitOne();

                    }

                    Thread.Sleep(1000);

                    num = this.Save();

                }

                catch (Exception ex)

                {

                }

                finally

                {

                }

            }

        }

        /// <summary>

        /// 快速复制List 

        /// </summary>

        /// <returns></returns>

        private int Save()

        {

            if (this._MeteoDataList.Count > 0)

            {

                MD_StationMeteoData[] values = null;

                lock (this._MeteoDataList) //

                {

                    values = this._MeteoDataList.ToArray(); //复制list 

                    this._MeteoDataList.Clear();            //清空列表

                }

                if (values != null)

                {

                    return this.Save(values);

                }

                return -1;

            }

            return 0;

        }

        /// <summary>

        /// 保存数据-SqlBulkCopyByData

        /// </summary>

        /// <param name="values"></param>

        /// <returns></returns>

        private int Save(MD_StationMeteoData[] values)

        {

            int num = 0;

            try

            {

               MD_StationMeteoDataBLL meteodatabll = new MD_StationMeteoDataBLL();

                meteodatabll.AddSqlBulkCopyByData(values.ToList<MD_StationMeteoData>());

            }

            finally

            {

            }

            return num;

        }

    }

}

继续阅读