描述:内網資料庫 多表大量資料 需要同步至外網。
思路方案:1.window定時任務,每天淩晨1點 (多線程或者單線程) 同步前一天的資料。
2.因可能會出現同步失敗的 情況,以及會設定到修改資料,内網資料庫各表要有 建立時間和修改時間,外網結構 也要有建立時間和修改時間,友善資料核對,同時 批次插入或修改資料庫時,若失敗 因郵件或日志檔案通知開發人員。
3.内網資料庫 多張表,需要封裝一個公共的擷取 資料集合的方法(此方法 分頁擷取表 資料,使用類反射 調用類裡面的方法)
4.内網資料 同步到外網,用http協定發送 post 資料 請求,外網(雲伺服器)上 要做安全驗證 (僅處理指定ip位址的 請求)
5.外網(雲伺服器)上 資料批量儲存,使用 SqlBulkCopy,封裝 一個公共的批量儲存方法。(若是修改資料 則先删除原來的資料再新增)。
核心代碼:
a.實體對象,繼承自basePush
public class basePush
{
/// <summary>
/// 資料庫目标表資料列名 對應關系
/// </summary>
/// <returns></returns>
public static Dictionary<string, string> DictionaryColumn()
{
Dictionary<string, string> ls = new Dictionary<string, string>();
return ls;
}
}
[Serializable]
public class DiseasebuweiInfo : basePush
{
public string disname { get; set; }
public string bw { get; set; }
public int cnt { get; set; }
public Int64 rn { get; set; }
public DateTime? CreateTime { get; set; }
public DateTime? UpdateTime { get; set; }
}
b.任務方法
/// <summary>
///
/// 每天 淩晨1點後執行
/// </summary>
private static void TaskRunDiseasebuwei()
{
CHC.DAL.Log.Loger.Log("開始->同步Diseasebuwei:TaskRunDiseasebuwei", "系統任務");
GetPostList<DiseasebuweiInfo>("DiseasebuweiInfo", "update"); //修改
}
private static void TaskRunSysdisease()
{
CHC.DAL.Log.Loger.Log("開始->同步SysdiseaseInfo:TaskRunSysdisease", "系統任務");
GetPostList<SysdiseaseInfo>("SysdiseaseInfo", "create");//新增
}
c.擷取資料集合的統一方法
//List對象 post資料
private static void GetPostList<T>(string name, string typeName)
{
int page = 1;
int rows = 5000;
int total = 0;
int pageCount = 1; //資料庫 取資料源 ,每次同步1000個
string strName = typeof(T).Name;
try
{
//DateTime datetime = DateTime.Now;
//前一天的
string DayOfDay = LastRunDate.ToString("yyyy-MM-dd HH:00:00");
while (page <= pageCount)
{
Type typ = typeof(TaskPushDAL);
MethodInfo methstr = typ.GetMethod(string.Format("Get{0}List", name));
object[] objPara = new object[] { DayOfDay, (page - 1) * rows + 1, page * rows, total, typeName };
List<T> infoList = methstr.Invoke(null, objPara) as List<T>;
total = (int)objPara[3];
//List<DiseasebuweiInfo> infoList = TaskPushDAL.GetDiseasebuweiInfoList(DayOfDay, (page - 1) * rows + 1, page * rows, out total);
if (page == 1)
{
pageCount = (total / rows) + (total % rows != 0 ? 1 : 0);
}
if (infoList != null && infoList.Count > 0)
{
//post 資料
string result = string.Empty;
result = ResponseTask(infoList, name, typeName);
//通知成功
if (result.Contains("ok"))
{
CHC.DAL.Log.Loger.Log(string.Format("{0}通知成功!===result:{1}", strName, result));
}
else
{
CHC.DAL.Log.Loger.Log(string.Format("{0}通知失敗!===result:{1},目前頁數:{2},操作類型:{3}",
strName, result, page, typeName));
}
}
page = page + 1;
}
}
catch (Exception ex)
{
CHC.DAL.Log.Loger.Error(ex, string.Format("同步{0}異常", strName));
}
}
//通路資料庫 分頁擷取資料
public static List<DiseasebuweiInfo> GetDiseasebuweiInfoList(string DayOfDay, int pageStart, int pageEnd, out int total,string typename)
{
StringBuilder sql = new StringBuilder();
sql.AppendFormat(@"select COUNT(1) from [tb_diseasebuwei] a where {1}>'{0}' ", DayOfDay,
typename == "update" ? "a.UpdateTime" : "a.CreateTime");
total = SqlUtility.ExecuteScalar<int>(_connectionBig, sql.ToString(), null);
string s = string.Format(@"select * from (select a.disname,a.bw,a.cnt,a.rn,dn=ROW_NUMBER()OVER(ORDER BY a.disname) from
[tb_diseasebuwei] a with(nolock) where {3}>'{0}' )tmp where dn between {1} and {2} "
, DayOfDay, pageStart, pageEnd, typename == "update" ? "a.UpdateTime" : "a.CreateTime");
return SqlUtility.ExecuteObjectList<DiseasebuweiInfo>(_connectionBig, s, null);
}
//修改時,先删除舊資料
public static string DeleteDiseasebuweiInfo(List<DiseasebuweiInfo> list)
{
string strs = string.Join("','", list.Select(v => v.disname));
string sql = string.Format("delete from tb_diseasebuwei where disname in('{0}')", strs);
return sql;
}
d.post資料
//發送 post 資料 請求
private static string ResponseTask<T>(List<T> list, string Name, string typeName)
{
string result = string.Empty;
try
{
string serverURL = string.Empty;
if (ConfigurationManager.AppSettings["serverURL"] != null)
{
serverURL = ConfigurationManager.AppSettings["serverURL"];
serverURL = serverURL + NotifyUrl;
}
if (list != null && list.Count > 0 && !string.IsNullOrEmpty(serverURL) && !string.IsNullOrEmpty(Name))
{
string ModelList = Newtonsoft.Json.JsonConvert.SerializeObject(list,
new Newtonsoft.Json.Converters.IsoDateTimeConverter() { DateTimeFormat = "yyyy-MM-dd HH:mm:ss" });
TaskData dataInfo = new TaskData() { MothodName = Name, ModelList = ModelList, ModeType = typeName };
string param = Newtonsoft.Json.JsonConvert.SerializeObject(dataInfo);
result = GetHTMLByPost(serverURL, param).ToLower();
}
else
{
result = "參數不能為空";
}
}
catch (Exception ex)
{
LogUtil.LogError("發送 post 資料 請求異常", ex);
result = ex.Message;
}
return result;
}
public static string GetHTMLByPost(string Url, string Params)
{
string result = "";
string encoding = "utf-8";
byte[] bytes = Encoding.GetEncoding(encoding).GetBytes(Params);
HttpWebRequest httpWebRequest = (HttpWebRequest)WebRequest.Create(Url);
httpWebRequest.Method = "POST";
httpWebRequest.ContentType = "application/x-www-form-urlencoded";
httpWebRequest.ContentLength = (long)bytes.Length;
using (Stream requestStream = httpWebRequest.GetRequestStream())
{
requestStream.Write(bytes, 0, bytes.Length);
}
using (WebResponse response = httpWebRequest.GetResponse())
{
StreamReader streamReader = new StreamReader(response.GetResponseStream(), Encoding.UTF8);
result = streamReader.ReadToEnd();
response.Close();
}
return result;
}
e.雲伺服器 接收資料
public class PageTask : PageAjaxHandler
{
private string checkIp = "127.0.0.1";//指定的ip
public PageTask(HttpContext context) : base(context) {
if (ConfigurationManager.AppSettings["checkIp"] != null)
{
checkIp = ConfigurationManager.AppSettings["checkIp"];
}
}
public void SetData()
{
ResponseData responseData = new ResponseData();
CHC.DAL.Log.Loger.Log("大資料儲存-----");
try
{
//擷取 請求的ip位址
string requestIp = Lin.ToolKit.Common.RequestHelper.GetIPAddress();
if (!string.IsNullOrEmpty(requestIp) && checkIp.Contains(requestIp))
{
string jasonstrs = new System.IO.StreamReader(Request.InputStream).ReadToEnd();
TaskData info = Newtonsoft.Json.JsonConvert.DeserializeObject<TaskData>(jasonstrs);
//UrlParam url = new UrlParam(Request);
//string MothodName = url.Querys["MothodName"];//方法
string MothodName = info.MothodName;
string ModelList = info.ModelList;//list 對象 json格式
string ModeType = info.ModeType;
string result = string.Empty;
//list對象儲存到資料庫
switch (MothodName)
{
case "DiseasebuweiInfo":
List<DiseasebuweiInfo> list = Newtonsoft.Json.JsonConvert.DeserializeObject<List<DiseasebuweiInfo>>(ModelList);
result = TaskPushBLL.SaveDataPush(MothodName, ModeType, list, "tb_diseasebuwei");
break;
case "SysdiseaseInfo":
List<SysdiseaseInfo> SysdiseaseInfoList = Newtonsoft.Json.JsonConvert.DeserializeObject<List<SysdiseaseInfo>>(ModelList);
result = TaskPushBLL.SaveDataPush(MothodName, ModeType, SysdiseaseInfoList, "tb_sysdisease");
break;
}
if (string.IsNullOrEmpty(result))
{
responseData.ret = "ok";
}
else
{
responseData.msg = result;
}
}
else
{
responseData.ret = "error";
responseData.msg = "請求的ip位址異常";
}
}
catch (Exception ex)
{
responseData.msg = ex.Message;
}
WriteObject(responseData);
}
}
f.大資料儲存
#region 大資料儲存
/// <summary>
/// 儲存資料
/// </summary>
/// <typeparam name="T">實體對象</typeparam>
/// <param name="name">需要反射的方法名</param>
/// <param name="typename">新增或修改</param>
/// <param name="list">實體對象集合</param>
/// <param name="tableName">目标表名</param>
/// <returns></returns>
public static string SaveDataPush<T>(string name, string typename, List<T> list, string tableName)
{
string str = string.Empty;
string strName = typeof(T).Name;
if (list != null && list.Count > 0)
{
try
{
Dictionary<string, string> DictionaryColumn = new Dictionary<string, string>();
MethodInfo methstr = typeof(T).GetMethod("DictionaryColumn");
if (methstr != null)
{
DictionaryColumn = methstr.Invoke(null, null) as Dictionary<string, string>;
}
//TaskPushDAL.Sqlbulkcopy<T>(list, tableName, DictionaryColumn);
str = TaskPushDAL.Sqlbulkcopy(name, typename, list, tableName, DictionaryColumn);
CHC.DAL.Log.Loger.Log(string.Format("儲存({0}){1}資料共{2}條!===result:{3}", typename, strName, list.Count, str));
}
catch (Exception ex)
{
str = ex.Message;
CHC.DAL.Log.Loger.Error(ex);
}
}
return str;
}
#endregion
g.使用 SqlBulkCopy 批量處理資料
/// <summary>
/// typeName 為update :先删除舊資料 再儲存新資料 ,為create:直接插入新資料
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="name"></param>
/// <param name="typeName"></param>
/// <param name="data">資料源list集合</param>
/// <param name="tableName">需要插入的資料庫目标表名稱</param>
/// <param name="DictionaryColumn"></param>
/// <returns></returns>
public static string Sqlbulkcopy<T>(string name, string typeName, List<T> data, string tableName, Dictionary<string, string> DictionaryColumn)
{
string str = string.Empty;
List<PropertyInfo> pList = new List<PropertyInfo>();//建立屬性的集合
DataTable dt = new DataTable();
//把所有的public屬性加入到集合 并添加DataTable的列
Array.ForEach<PropertyInfo>(typeof(T).GetProperties(), p =>
{
pList.Add(p);
Type colType = p.PropertyType;
if ((colType.IsGenericType) && (colType.GetGenericTypeDefinition() == typeof(Nullable<>)))
{
colType = colType.GetGenericArguments()[0];
}
dt.Columns.Add(new DataColumn(p.Name, colType));
});
foreach (var item in data)
{
DataRow row = dt.NewRow();
//pList.ForEach(p => row[p.Name] = (item == null ? null : p.GetValue(item, null)));
foreach (var p in pList)
{
object tvalue = p.GetValue(item, null);
row[p.Name] = tvalue == null ? DBNull.Value : tvalue;
}
dt.Rows.Add(row);
}
string strName = typeof(T).Name;
using (SqlConnection conn = new SqlConnection(_connection))
{
conn.Open();
SqlTransaction sqlbulkTransaction = conn.BeginTransaction();
#region 處理批量插入對象
SqlBulkCopy bulk = new SqlBulkCopy(conn, SqlBulkCopyOptions.CheckConstraints, sqlbulkTransaction)
{
DestinationTableName = tableName, /*設定資料庫目标表名稱*/
BatchSize = dt.Rows.Count, /*每一批次中的行數*/
};
bulk.BulkCopyTimeout = 5000; //指定操作完成的Timeout時間
if (DictionaryColumn != null && DictionaryColumn.Count > 0)
{
foreach (var item in DictionaryColumn)
{
bulk.ColumnMappings.Add(item.Key, item.Value);//不一緻 先指定對應關系 ;源列:目标列
}
}
else
{
pList.ForEach(p =>
{
bulk.ColumnMappings.Add(p.Name, p.Name);//列名和 對象屬性一緻
});
}
#endregion
try
{
if (typeName == "update") //先删除 再新增
{
Type typ = typeof(TaskPushDAL);
MethodInfo methstr = typ.GetMethod(string.Format("Delete{0}", name));
object[] objPara = new object[] { data };
string sql = methstr.Invoke(null, objPara).ToString();
SqlCommand sqlComm = new SqlCommand(sql, conn, sqlbulkTransaction);
sqlComm.CommandTimeout = 300;
int count=sqlComm.ExecuteNonQuery();
CHC.DAL.Log.Loger.Log(string.Format("Sqlbulkcopy,{0},Delete的數量:{1}", strName, count));
}
bulk.WriteToServer(dt);//寫入表
CHC.DAL.Log.Loger.Log(string.Format("Sqlbulkcopy,{0},批量執行數量:{1}", strName, dt.Rows.Count));
sqlbulkTransaction.Commit();
}
catch (Exception ex)
{
str = ex.Message;
sqlbulkTransaction.Rollback();
CHC.DAL.Log.Loger.Error(ex);
}
finally
{
bulk.Close();
conn.Close();
}
}
return str;
}