随着大数据爆发和数据微细化,如何提高数据接收软件数据存储能力,本节只分享快速数据存储。
应用技术: 多线,线程同步,EF,列表,SqlBulkCopy。
具体业务具体分析,如:大气环境监测、雨量监测、交通(GPS),可能有秒级、分钟数据,
接收软件采用异步socket进行接收和发送数据,通过解析层进行数据包解析,通过数据存储层入库存储
主要业务功能:数据接收,数据包解析,数据存储,数据转发等。
思路:利用多线程技术,数据接收线程只接收数据存放队列中(并通知解析处理线程),解析线程只做数据解包合成(并通知存储线程),
存储线程只进行DB数据存储可利用SqlBulkCopy。
直接来代码吧,这里只对一类数据进行存储实现。
//数据实体类
using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Runtime.Serialization;
namespace DAL.Models
{
[DataContract]
public class MD_StationMeteoData
{
[Key]
[DataMember]
public int Id { get; set; }
[DataMember]
public System.DateTime ModelDate { get; set; }
[DataMember]
public int LevelHeight { get; set; }
[DataMember]
public int StationID { get; set; }
[DataMember]
public System.DateTime TimePoint { get; set; }
[DataMember]
public string PollutantCode { get; set; }
[DataMember]
public double Value { get; set; }
[DataMember]
public string Mark { get; set; }
}
}
//业务逻辑类
using DAL.Models;
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
namespace DAL.BLL
{
public class MD_StationMeteoDataBLL
{
private static readonly log4net.ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
//static MD_StationMeteoDataBLL _defaule = new MD_StationMeteoDataBLL();
//public static MD_StationMeteoDataBLL Default
//{
// get { return _defaule; }
//}
public MD_StationMeteoDataBLL()
{
}
private void SqlBulkCopyByDatatable(string connectionString, string TableName, DataTable dt)
{
using (SqlConnection conn = new SqlConnection(connectionString))
{
using (SqlBulkCopy sqlbulkcopy = new SqlBulkCopy(connectionString, SqlBulkCopyOptions.UseInternalTransaction))
{
try
{
sqlbulkcopy.DestinationTableName = TableName;
for (int i = 0; i < dt.Columns.Count; i++)
{
sqlbulkcopy.ColumnMappings.Add(dt.Columns[i].ColumnName, dt.Columns[i].ColumnName);
}
sqlbulkcopy.WriteToServer(dt);
}
catch (System.Exception ex)
{
throw ex;
}
}
}
}
private DataTable CreateDataTable(List<MD_StationMeteoData> values)
{
var valuesList = values;
DataTable table = new DataTable();
table.Columns.AddRange(new DataColumn[] {
new DataColumn("ModelDate", typeof(DateTime)),
new DataColumn("LevelHeight",typeof(int)),
new DataColumn("StationID",typeof(int)),
new DataColumn("TimePoint", typeof(DateTime)),
new DataColumn("PollutantCode", typeof(string)),
new DataColumn("Value", typeof(double)),
new DataColumn("Mark", typeof(string)),
});
foreach (var item in values)
{
DataRow row = table.NewRow();
int num = 0;
DateTime timePoint = valuesList[0].TimePoint;
row[num++] = item.ModelDate;
row[num++] = item.LevelHeight;
row[num++] = item.StationID;
row[num++] = item.TimePoint;
row[num++] = item.PollutantCode;
row[num++] = item.Value;
row[num++] = "";
table.Rows.Add(row);
}
return table;
}
public bool AddSqlBulkCopyByData(List<MD_StationMeteoData> modellist)
{
try
{
DataTable dt = CreateDataTable(modellist);
string constr = db.Database.Connection.ConnectionString;
SqlBulkCopyByDatatable(constr, "MD_StationMeteoData", dt);
return true;
}
catch (Exception ex)
{
log.Error(ex);
return false;
}
}
}
}
//数据存储处理类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Diagnostics;
using System.Threading;
using DAL.Models;
using DAL.BLL;
namespace MeteoDataProcessDemo
{
public class SaveProcess
{
/// <summary>
///
/// </summary>
private readonly AutoResetEvent _event;
private bool _flag;
private readonly List<MD_StationMeteoData> _MeteoDataList;
/// <summary>
/// 工作线程
/// </summary>
private Thread _worker;
/// <summary>
/// 静态字段
/// </summary>
public static SaveProcess Default = null;
/// <summary>
/// 静态构造
/// </summary>
static SaveProcess()
{
Default = new SaveProcess();
}
private SaveProcess()
{
_flag = true;
this._MeteoDataList = new List<MD_StationMeteoData>();
this._event = new AutoResetEvent(true);
}
/// <summary>
/// 向列表添加数据项
/// </summary>
/// <param name="values"></param>
public void Append(List<MD_StationMeteoData> values)
{
if (values != null)
{
lock (this._MeteoDataList)
{
this._MeteoDataList.AddRange(values);
this._event.Set(); //通知工作线程保存
}
}
}
/// <summary>
///
/// </summary>
public void Stop()
{
if ((this._worker != null) && this._worker.IsAlive)
{
this._flag = false;
this._event.Set();
this._worker.Join();
}
}
/// <summary>
/// 线程启动事件
/// </summary>
public void Start()
{
_worker = new Thread(Work);
_worker.Start();
}
/// <summary>
/// 工作线程
/// </summary>
public void Work()
{
//this._flag = true;
while (this._flag)
{
int num = -1;
try
{
if (this._MeteoDataList.Count == 0)
{
this._event.Reset();
this._event.WaitOne();
}
Thread.Sleep(1000);
num = this.Save();
}
catch (Exception ex)
{
}
finally
{
}
}
}
/// <summary>
/// 快速复制List
/// </summary>
/// <returns></returns>
private int Save()
{
if (this._MeteoDataList.Count > 0)
{
MD_StationMeteoData[] values = null;
lock (this._MeteoDataList) //
{
values = this._MeteoDataList.ToArray(); //复制list
this._MeteoDataList.Clear(); //清空列表
}
if (values != null)
{
return this.Save(values);
}
return -1;
}
return 0;
}
/// <summary>
/// 保存数据-SqlBulkCopyByData
/// </summary>
/// <param name="values"></param>
/// <returns></returns>
private int Save(MD_StationMeteoData[] values)
{
int num = 0;
try
{
MD_StationMeteoDataBLL meteodatabll = new MD_StationMeteoDataBLL();
meteodatabll.AddSqlBulkCopyByData(values.ToList<MD_StationMeteoData>());
}
finally
{
}
return num;
}
}
}