随着大資料爆發和資料微細化,如何提高資料接收軟體資料存儲能力,本節隻分享快速資料存儲。
應用技術: 多線,線程同步,EF,清單,SqlBulkCopy。
具體業務具體分析,如:大氣環境監測、雨量監測、交通(GPS),可能有秒級、分鐘資料,
接收軟體采用異步socket進行接收和發送資料,通過解析層進行資料包解析,通過資料存儲層入庫存儲
主要業務功能:資料接收,資料包解析,資料存儲,資料轉發等。
思路:利用多線程技術,資料接收線程隻接收資料存放隊列中(并通知解析處理線程),解析線程隻做資料解包合成(并通知存儲線程),
存儲線程隻進行DB資料存儲可利用SqlBulkCopy。
直接來代碼吧,這裡隻對一類資料進行存儲實作。
//資料實體類
using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Runtime.Serialization;
namespace DAL.Models
{
[DataContract]
public class MD_StationMeteoData
{
[Key]
[DataMember]
public int Id { get; set; }
[DataMember]
public System.DateTime ModelDate { get; set; }
[DataMember]
public int LevelHeight { get; set; }
[DataMember]
public int StationID { get; set; }
[DataMember]
public System.DateTime TimePoint { get; set; }
[DataMember]
public string PollutantCode { get; set; }
[DataMember]
public double Value { get; set; }
[DataMember]
public string Mark { get; set; }
}
}
//業務邏輯類
using DAL.Models;
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
namespace DAL.BLL
{
public class MD_StationMeteoDataBLL
{
private static readonly log4net.ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
//static MD_StationMeteoDataBLL _defaule = new MD_StationMeteoDataBLL();
//public static MD_StationMeteoDataBLL Default
//{
// get { return _defaule; }
//}
public MD_StationMeteoDataBLL()
{
}
private void SqlBulkCopyByDatatable(string connectionString, string TableName, DataTable dt)
{
using (SqlConnection conn = new SqlConnection(connectionString))
{
using (SqlBulkCopy sqlbulkcopy = new SqlBulkCopy(connectionString, SqlBulkCopyOptions.UseInternalTransaction))
{
try
{
sqlbulkcopy.DestinationTableName = TableName;
for (int i = 0; i < dt.Columns.Count; i++)
{
sqlbulkcopy.ColumnMappings.Add(dt.Columns[i].ColumnName, dt.Columns[i].ColumnName);
}
sqlbulkcopy.WriteToServer(dt);
}
catch (System.Exception ex)
{
throw ex;
}
}
}
}
private DataTable CreateDataTable(List<MD_StationMeteoData> values)
{
var valuesList = values;
DataTable table = new DataTable();
table.Columns.AddRange(new DataColumn[] {
new DataColumn("ModelDate", typeof(DateTime)),
new DataColumn("LevelHeight",typeof(int)),
new DataColumn("StationID",typeof(int)),
new DataColumn("TimePoint", typeof(DateTime)),
new DataColumn("PollutantCode", typeof(string)),
new DataColumn("Value", typeof(double)),
new DataColumn("Mark", typeof(string)),
});
foreach (var item in values)
{
DataRow row = table.NewRow();
int num = 0;
DateTime timePoint = valuesList[0].TimePoint;
row[num++] = item.ModelDate;
row[num++] = item.LevelHeight;
row[num++] = item.StationID;
row[num++] = item.TimePoint;
row[num++] = item.PollutantCode;
row[num++] = item.Value;
row[num++] = "";
table.Rows.Add(row);
}
return table;
}
public bool AddSqlBulkCopyByData(List<MD_StationMeteoData> modellist)
{
try
{
DataTable dt = CreateDataTable(modellist);
string constr = db.Database.Connection.ConnectionString;
SqlBulkCopyByDatatable(constr, "MD_StationMeteoData", dt);
return true;
}
catch (Exception ex)
{
log.Error(ex);
return false;
}
}
}
}
//資料存儲處理類
using System;
using System.Collections.Generic;
using System.Linq;
using System.Diagnostics;
using System.Threading;
using DAL.Models;
using DAL.BLL;
namespace MeteoDataProcessDemo
{
public class SaveProcess
{
/// <summary>
///
/// </summary>
private readonly AutoResetEvent _event;
private bool _flag;
private readonly List<MD_StationMeteoData> _MeteoDataList;
/// <summary>
/// 工作線程
/// </summary>
private Thread _worker;
/// <summary>
/// 靜态字段
/// </summary>
public static SaveProcess Default = null;
/// <summary>
/// 靜态構造
/// </summary>
static SaveProcess()
{
Default = new SaveProcess();
}
private SaveProcess()
{
_flag = true;
this._MeteoDataList = new List<MD_StationMeteoData>();
this._event = new AutoResetEvent(true);
}
/// <summary>
/// 向清單添加資料項
/// </summary>
/// <param name="values"></param>
public void Append(List<MD_StationMeteoData> values)
{
if (values != null)
{
lock (this._MeteoDataList)
{
this._MeteoDataList.AddRange(values);
this._event.Set(); //通知工作線程儲存
}
}
}
/// <summary>
///
/// </summary>
public void Stop()
{
if ((this._worker != null) && this._worker.IsAlive)
{
this._flag = false;
this._event.Set();
this._worker.Join();
}
}
/// <summary>
/// 線程啟動事件
/// </summary>
public void Start()
{
_worker = new Thread(Work);
_worker.Start();
}
/// <summary>
/// 工作線程
/// </summary>
public void Work()
{
//this._flag = true;
while (this._flag)
{
int num = -1;
try
{
if (this._MeteoDataList.Count == 0)
{
this._event.Reset();
this._event.WaitOne();
}
Thread.Sleep(1000);
num = this.Save();
}
catch (Exception ex)
{
}
finally
{
}
}
}
/// <summary>
/// 快速複制List
/// </summary>
/// <returns></returns>
private int Save()
{
if (this._MeteoDataList.Count > 0)
{
MD_StationMeteoData[] values = null;
lock (this._MeteoDataList) //
{
values = this._MeteoDataList.ToArray(); //複制list
this._MeteoDataList.Clear(); //清空清單
}
if (values != null)
{
return this.Save(values);
}
return -1;
}
return 0;
}
/// <summary>
/// 儲存資料-SqlBulkCopyByData
/// </summary>
/// <param name="values"></param>
/// <returns></returns>
private int Save(MD_StationMeteoData[] values)
{
int num = 0;
try
{
MD_StationMeteoDataBLL meteodatabll = new MD_StationMeteoDataBLL();
meteodatabll.AddSqlBulkCopyByData(values.ToList<MD_StationMeteoData>());
}
finally
{
}
return num;
}
}
}