題外:在博文索引中暫時列出了開源的計劃一覽,蟲子開源的目的是希望能有更多的交流,部分軟體可能小得連開源協定的認證價值都沒有。不管程式有多小多簡單,用心把一個完整的設計思路、實作過程以及測試結果展現給大家。歡迎大牛拍磚,小牛問路。
軟體背景
拿本次高性能資料同步工具來說,目前還處于開發階段,大概是1/4的樣子。為了避免模糊,就先把這1/4分享給大家。
資料作為系統的核心價值,因為其流動性是以經常會有載體的變更。如何高性能、安全的将資料搬移是一個大家經常接觸也一直在用的課題。如果隻是sql to sql可能作為程式員而言,DBA更适合這個内容,例如dts導入等。但是更多的實際場景下,可能會有檔案、服務、甚至其他類型的資料流來源。是以作為碼農,我們不妨多了解一下這方面的内容。
設計思路
暫時開源程式中隻做了sql to sql的一部分。直接就以這個開始來講吧。
首先是入參和返參的設計
<a href="http://blog.51cto.com/dubing/712455#">?</a>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
<code>/// <summary></code>
<code> </code><code>/// 入參接口</code>
<code> </code><code>/// </summary></code>
<code> </code><code>public</code> <code>interface</code> <code>IAOPParam</code>
<code> </code><code>{</code>
<code> </code><code>/// <summary></code>
<code> </code><code>/// 目标位址</code>
<code> </code><code>/// </summary></code>
<code> </code><code>string</code> <code>T_ConnectionString { </code><code>get</code><code>; }</code>
<code> </code><code>/// 請求行數</code>
<code> </code><code>long</code> <code>MaxSize { </code><code>get</code><code>; }</code>
<code> </code><code>/// 表名</code>
<code> </code><code>string</code> <code>TableName { </code><code>get</code><code>; }</code>
<code> </code><code>/// 目前行數</code>
<code> </code><code>long</code> <code>CurrentSize { </code><code>get</code><code>; }</code>
<code> </code><code>/// 域名</code>
<code> </code><code>string</code> <code>p_Domain { </code><code>get</code><code>; }</code>
<code> </code><code>/// 斷點檔案位址</code>
<code> </code><code>string</code> <code>p_InitPath { </code><code>get</code><code>; }</code>
<code> </code><code>/// 斷點時間</code>
<code> </code><code>DateTime p_Previous { </code><code>get</code><code>; }</code>
<code> </code><code>/// 是否結束</code>
<code> </code><code>bool</code> <code>p_IsEnd { </code><code>get</code><code>; }</code>
<code> </code><code>/// 排序方式</code>
<code> </code><code>string</code> <code>SortName { </code><code>get</code><code>; </code><code>set</code><code>; }</code>
<code> </code><code>/// 單次請求大小</code>
<code> </code><code>long</code> <code>SingleSize { </code><code>get</code><code>; }</code>
<code> </code><code>/// 排序主鍵</code>
<code> </code><code>string</code> <code>Sortkey { </code><code>get</code><code>; }</code>
<code> </code>
<code> </code><code>/// 是否支援事務</code>
<code> </code><code>bool</code> <code>IsTransaction { </code><code>get</code><code>; }</code>
<code> </code><code>/// true為支援斷點 發生斷點或異常後程式終止 false為不支援斷點 遇到斷點或異常繼續填充直到此次請求完成</code>
<code> </code><code>bool</code> <code>IsBreakPoints { </code><code>get</code><code>; }</code>
<code> </code><code>/// guid</code>
<code> </code><code>string</code> <code>T_Guid { </code><code>get</code><code>; }</code>
<code> </code><code>}</code>
<code> </code><code>/// <summary></code>
<code> </code><code>/// 對象處理傳回的入參接口(泛型)</code>
<code> </code><code>public</code> <code>interface</code> <code>IAOPParam<T> : IAOPParam</code>
<code> </code><code>/// <summary></code>
<code> </code><code>/// 泛型附加對象</code>
<code> </code><code>/// </summary></code>
<code> </code><code>T ParamAttachObjectEx { </code><code>get</code><code>; }</code>
這樣設計的目的是考慮到伺服器的記憶體與資源占用問題,如果資料來源的體積過大,我們将會對請求的來源進行分塊處理。另外通過排序字段或者自定義的sql語句或者存儲過程(暫未補充)可以對資料源進行進階過濾,斷點續傳的設計目前比較簡單,web程式的話植入cookie、控制台或者cs程式通過文本媒介json格式來控制。
<code>#region IAOPResult</code>
<code> </code><code>/// <summary></code>
<code> </code><code>/// 對象處理傳回的結果接口</code>
<code> </code><code>/// </summary></code>
<code> </code><code>/// <remarks></code>
<code> </code><code>/// 建議在代碼調用傳回值中都采用此類執行個體為傳回值<br /></code>
<code> </code><code>/// 一般ResultNo小于0表示異常,0表示成功,大于0表示其它一般提示資訊</code>
<code> </code><code>/// </remarks></code>
<code> </code><code>public</code> <code>interface</code> <code>IAOPResult</code>
<code> </code><code>{</code>
<code> </code><code>/// <summary></code>
<code> </code><code>/// 傳回代碼</code>
<code> </code><code>/// </summary></code>
<code> </code><code>int</code> <code>ResultNo { </code><code>get</code><code>; }</code>
<code> </code><code>/// 對應的描述資訊</code>
<code> </code><code>string</code> <code>ResultDescription { </code><code>get</code><code>; }</code>
<code> </code><code>/// 相應的附加資訊</code>
<code> </code><code>object</code> <code>ResultAttachObject { </code><code>get</code><code>; }</code>
<code> </code><code>/// 内部AOPResult</code>
<code> </code><code>IAOPResult InnerAOPResult { </code><code>get</code><code>; }</code>
<code> </code><code>/// 處理結果是否成功(ResultNo == 0)</code>
<code> </code><code>bool</code> <code>IsSuccess { </code><code>get</code><code>; }</code>
<code> </code><code>/// 處理結果是否失敗(ResultNo != 0 )</code>
<code> </code><code>bool</code> <code>IsNotSuccess { </code><code>get</code><code>; }</code>
<code> </code><code>/// 處理結果是否失敗(ResultNo < 0 )</code>
<code> </code><code>bool</code> <code>IsFailed { </code><code>get</code><code>; }</code>
<code> </code><code>/// 已處理,但有不緻命的錯誤(ResultNo > 0)</code>
<code> </code><code>bool</code> <code>IsPassedButFailed { </code><code>get</code><code>; }</code>
<code> </code><code>/// 如果處理失敗,則抛出異常</code>
<code> </code><code>/// <returns>傳回本身</returns></code>
<code> </code><code>IAOPResult ThrowErrorOnFailed();</code>
<code> </code><code>}</code>
<code> </code><code>#endregion IAOPResult</code>
<code> </code><code>#region IAOPResult<T></code>
<code> </code><code>/// 對象處理傳回的結果接口(泛型)</code>
<code> </code><code>public</code> <code>interface</code> <code>IAOPResult<T> : IAOPResult</code>
<code> </code><code>/// 泛型附加對象</code>
<code> </code><code>T ResultAttachObjectEx { </code><code>get</code><code>; }</code>
<code> </code><code>#endregion</code>
返參的設計比較通用化,大家可以自己摸索下。自己也可以補充添加。
<a href="http://blog.51cto.com/dubing/712455#">expand source</a>
異常基類。
日志采取lognet 不贅述
單例通用類 關于作用可以參考蟲子設計模式随筆中的相關博文
狀态類,通過這個類可以反映出目前資料同步的進度。
邊緣化的準備工作大體如此,下面是主要的實作過程。過程中有幾個注意點,同步讀寫還是異步讀寫、是否存線上程安全甚至程序的資源安全(例如我在讀寫前5000條的時候突然在另外一個用戶端CRUD了N條資料),另外,我們讀寫的時候是用連接配接的方式還是使用非連接配接的方式,如何解決伺服器端記憶體占用問題,如何實作excel、txt、sql、oracle等不同資料來源的多态性。
實作過程
這裡就先介紹下已經解決的一些問題
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
<code>public</code> <code>class</code> <code>AnalyseDataManager</code>
<code>{</code>
<code> </code><code>public</code> <code>Status MStatus { </code><code>get</code><code>; </code><code>set</code><code>; }</code>
<code> </code><code>public</code> <code>static</code> <code>int</code> <code>SingleSize = 5000;</code>
<code> </code><code>public</code> <code>static</code> <code>int</code> <code>StatusSize = 1000;</code>
<code> </code><code>readonly</code> <code>Sqlhelper _sh = </code><code>new</code> <code>Sqlhelper();</code>
<code> </code><code>public</code> <code>AnalyseDataManager()</code>
<code> </code><code>public</code> <code>AnalyseDataManager(Status st)</code>
<code> </code><code>: </code><code>this</code><code>()</code>
<code> </code><code>MStatus = st;</code>
<code> </code><code>public</code> <code>IAsyncResult OutMethod(AopParam app)</code>
<code> </code><code>MStatus.MTotalSize = app.MaxSize;</code>
<code> </code><code>var</code> <code>func = </code><code>new</code> <code>Func<AopParam, </code><code>bool</code><code>>(ServerMethod);</code>
<code> </code><code>return</code> <code>func.BeginInvoke(app, CallbackMethod, func);</code>
<code> </code><code>/// 複制資料</code>
<code> </code><code>/// <returns>是否成功</returns></code>
<code> </code><code>public</code> <code>bool</code> <code>ServerMethod(AopParam app)</code>
<code> </code><code>try</code>
<code> </code><code>{</code>
<code> </code><code>_sh.App = app;</code>
<code> </code><code>if</code> <code>(_sh.OpenConn().IsSuccess)</code>
<code> </code><code>{</code>
<code> </code><code>while</code> <code>(app.MaxSize > MStatus.MCurrentSize)</code>
<code> </code><code>{</code>
<code> </code><code>app.CurrentSize = MStatus.MCurrentSize;</code>
<code> </code><code>if</code> <code>(!AsyncDataToServer(app) && app.IsBreakPoints)</code>
<code> </code><code>{</code>
<code> </code><code>break</code><code>;</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>catch</code> <code>(Exception ex)</code>
<code> </code><code>Log4N.WarnLog(</code><code>"ServerMethod出錯"</code><code>, ex);</code>
<code> </code><code>if</code> <code>(app.IsBreakPoints)</code>
<code> </code><code>return</code> <code>false</code><code>;</code>
<code> </code><code>finally</code>
<code> </code><code>_sh.Dispose();</code>
<code> </code><code>return</code> <code>true</code><code>;</code>
<code> </code><code>private</code> <code>bool</code> <code>AsyncDataToServer(AopParam app)</code>
<code> </code><code>Log4N.InfoLog(</code><code>string</code><code>.Format(</code><code>"資料同步開始\r\n來源資料{0}\r\n表的名字{1}\r\n一次性送出的行數{2}\r\n目前行數{3}"</code><code>, app.T_ConnectionString, app.TableName, app.MaxSize, app.CurrentSize));</code>
<code> </code><code>using</code> <code>(</code><code>var</code> <code>bcp = </code><code>new</code> <code>SqlBulkCopy(_sh.TconnSql))</code>
<code> </code><code>MStatus.Statusflag = Status.CopyStatus.Doing;</code>
<code> </code><code>bcp.BatchSize = SingleSize;</code>
<code> </code><code>bcp.DestinationTableName = app.TableName;</code>
<code> </code><code>bcp.SqlRowsCopied +=</code>
<code> </code><code>OnSqlRowsCopied;</code>
<code> </code><code>bcp.NotifyAfter = StatusSize;</code>
<code> </code><code>try</code>
<code> </code><code>bcp.WriteToServer(_sh.GetDtResultImp());</code>
<code> </code><code>catch</code> <code>(Exception ex)</code>
<code> </code><code>Log4N.WarnLog(</code><code>"AsyncDataToServer出錯"</code><code>, ex);</code>
<code> </code><code>finally</code>
<code> </code><code>_sh.IreaderSql.Close();</code>
<code> </code><code>return</code> <code>true</code><code>;</code>
<code> </code><code>private</code> <code>void</code> <code>OnSqlRowsCopied(</code><code>object</code> <code>sender, SqlRowsCopiedEventArgs e)</code>
<code> </code><code>Thread.Sleep(1000);</code>
<code> </code><code>MStatus.MCurrentSize += StatusSize;</code>
<code> </code><code>public</code> <code>void</code> <code>CallbackMethod(IAsyncResult ar)</code>
<code> </code><code>var</code> <code>caller = (Func<AopParam,</code><code>bool</code> <code>>)ar.AsyncState;</code>
<code> </code><code>if</code> <code>(caller.EndInvoke(ar))</code>
<code> </code><code>MStatus.Statusflag = Status.CopyStatus.Finished;</code>
<code>}</code>
Microsoft SQL Server 提供一個稱為 bcp 的流行的指令提示符實用工具,用于将資料從一個表移動到另一個表(表既可以在同一個伺服器上,也可以在不同伺服器上)。SqlBulkCopy 類允許編寫提供類似功能的托管代碼解決方案。還有其他将資料加載到 SQL Server 表的方法(例如 INSERT 語句),但相比之下 SqlBulkCopy 提供明顯的性能優勢。使用 SqlBulkCopy 類隻能向 SQL Server 表寫入資料。但是,資料源不限于 SQL Server;可以使用任何資料源,隻要資料可加載到 DataTable 執行個體或可使用 IDataReader 執行個體讀取資料。其中 SqlRowsCopied 在每次處理完 NotifyAfter 屬性指定的行數時發生。
ServerMethod為主方法提供單次用戶端請求的邏輯。
OutMethod對外開放以bpm異步程式設計模型形式進行處理、sqlhelper之是以不設計成單列,為了保證可以多個用戶端請求狀态不幹擾。
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
<code>public</code> <code>class</code> <code>Sqlhelper : IDisposable</code>
<code> </code><code>private</code> <code>readonly</code> <code>string</code> <code>_sqlconn = ConfigurationSettings.AppSettings[</code><code>"BaseConn"</code><code>];</code>
<code> </code><code>public</code> <code>bool</code> <code>IblnTransBegin { </code><code>get</code><code>; </code><code>set</code><code>; }</code>
<code> </code><code>public</code> <code>SqlTransaction ItransSql { </code><code>get</code><code>; </code><code>set</code><code>; }</code>
<code> </code><code>public</code> <code>SqlConnection IconnSql { </code><code>get</code><code>; </code><code>set</code><code>; }</code>
<code> </code><code>public</code> <code>SqlConnection TconnSql { </code><code>get</code><code>; </code><code>set</code><code>; }</code>
<code> </code><code>public</code> <code>SqlDataReader IreaderSql { </code><code>get</code><code>; </code><code>set</code><code>; }</code>
<code> </code><code>public</code> <code>IAOPParam App { </code><code>get</code><code>; </code><code>set</code><code>; }</code>
<code> </code><code>public</code> <code>DataTable GetDtResult(</code><code>string</code> <code>sqlcommand)</code>
<code> </code><code>{</code>
<code> </code><code>var</code> <code>ds = </code><code>new</code> <code>DataSet();</code>
<code> </code><code>var</code> <code>da = </code><code>new</code> <code>SqlDataAdapter(sqlcommand, </code><code>new</code> <code>SqlConnection(_sqlconn));</code>
<code> </code><code>da.Fill(ds);</code>
<code> </code><code>if</code> <code>(ds.Tables[0] != </code><code>null</code><code>)</code>
<code> </code><code>{</code>
<code> </code><code>return</code> <code>ds.Tables[0];</code>
<code> </code><code>}</code>
<code> </code><code>return</code> <code>null</code><code>;</code>
<code> </code><code>}</code>
<code> </code><code>public</code> <code>DataTable GetDtResult()</code>
<code> </code><code>//string sqlstr = string.Format("Select Top {0} * From {1} Where {2} not in (select Top {4} {2} From {1} order by {2} {3} ) order by {2} {3} ", app.SingleSize.ToString(), app.TableName, app.Sortkey, app.SortName, app.CurrentSize.ToString());</code>
<code> </code><code>string</code> <code>sqlstr = GetCommandByApp();</code>
<code> </code><code>var</code> <code>da = </code><code>new</code> <code>SqlDataAdapter(sqlstr, </code><code>new</code> <code>SqlConnection(_sqlconn));</code>
<code> </code><code>public</code> <code>SqlDataReader GetDtResultImp()</code>
<code> </code><code>var</code> <code>sqlstr = GetCommandByApp();</code>
<code> </code><code>var</code> <code>command = </code><code>new</code> <code>SqlCommand(</code>
<code> </code><code>sqlstr, IconnSql);</code>
<code> </code><code>IreaderSql =</code>
<code> </code><code>command.ExecuteReader();</code>
<code> </code><code>return</code> <code>IreaderSql;</code>
<code> </code><code>public</code> <code>IAOPResult OpenConn()</code>
<code> </code><code>var</code> <code>ar = </code><code>new</code> <code>AOPResult(0);</code>
<code> </code><code>IconnSql = </code><code>new</code> <code>SqlConnection(_sqlconn);</code>
<code> </code><code>TconnSql = </code><code>new</code> <code>SqlConnection(App.T_ConnectionString);</code>
<code> </code><code>try</code>
<code> </code><code>IconnSql.Open();</code>
<code> </code><code>TconnSql.Open();</code>
<code> </code><code>catch</code> <code>(SqlException ex)</code>
<code> </code><code>ar.ResultNo = 1;</code>
<code> </code><code>Log4N.InfoLog(</code><code>string</code><code>.Format(</code><code>"OpenConn失敗,詳細消息為{0},源表"</code><code>, ex.Message), App);</code>
<code> </code><code>return</code> <code>ar;</code>
<code> </code><code>public</code> <code>IAOPResult CloseConn()</code>
<code> </code><code>IconnSql.Close();</code>
<code> </code><code>TconnSql.Close();</code>
<code> </code><code>Log4N.InfoLog(</code><code>string</code><code>.Format(</code><code>"CloseConn失敗,詳細消息為{0},源表"</code><code>, ex.Message), App);</code>
<code> </code><code>public</code> <code>IAOPResult BeginTran()</code>
<code> </code><code>ItransSql = IconnSql.BeginTransaction();</code>
<code> </code><code>public</code> <code>void</code> <code>Dispose()</code>
<code> </code><code>CloseConn();</code>
<code> </code><code>public</code> <code>string</code> <code>GetCommandByApp()</code>
<code> </code><code>string</code> <code>sqlstr = </code><code>string</code><code>.Empty;</code>
<code> </code><code>if</code><code>(App.CurrentSize == 0)</code>
<code> </code><code>switch</code> <code>(App.SortName.ToLower())</code>
<code> </code><code>{</code>
<code> </code><code>case</code> <code>"asc"</code><code>:</code>
<code> </code><code>sqlstr = </code><code>string</code><code>.Format(</code><code>"Select Top {0} * From {1} order by {2} asc"</code><code>, App.SingleSize.ToString(), App.TableName, App.Sortkey);</code>
<code> </code><code>break</code><code>;</code>
<code> </code><code>case</code> <code>"desc"</code><code>:</code>
<code> </code><code>sqlstr = </code><code>string</code><code>.Format(</code><code>"Select Top {0} * From {1} order by {2} desc"</code><code>, App.SingleSize.ToString(), App.TableName, App.Sortkey);</code>
<code> </code><code>}</code>
<code> </code><code>else</code>
<code> </code><code>sqlstr = </code><code>string</code><code>.Format(</code><code>"Select Top {0} * From {1} Where {2} >(select max ({2}) From (select Top {3} {2} From {1} order by {2} asc ) as temp_chongzi) order by {2} asc"</code><code>, App.SingleSize.ToString(), App.TableName, App.Sortkey, App.CurrentSize.ToString());</code>
<code> </code><code>sqlstr = </code><code>string</code><code>.Format(</code><code>"Select Top {0} * From {1} Where {2} <(select min ({2}) From (select Top {3} {2} From {1}) order by {2} desc )as temp_chongzi) order by {2} desc"</code><code>, App.SingleSize.ToString(), App.TableName, App.Sortkey, App.CurrentSize.ToString());</code>
<code> </code><code>} </code>
<code> </code><code>return</code> <code>sqlstr;</code>
資料庫通路層中首先是一個類似分頁sql的設計,來優化單次請求的效率。bcp的來源可以選擇連接配接式的SqlDataReader 或者非連接配接式的Dataset,2者各有優缺。前者需要打開SqlConnection,但是是逐條讀取,後者非連接配接但是占用記憶體大。至于具體的性能比,蟲子在下一章節再和大家讨論。至于源程式目前還是草稿版,很多功能還未實作,細節處理也不夠細膩,因為異步目前隻設定了一個線程,還未涉及到并行架構,性能方面還有相當大的提高空間。先放出來讓大家讨論,細節方面可以暫時先略過,大家可以說說在設計方面如何才能更高效、穩定。
本文轉自 熬夜的蟲子 51CTO部落格,原文連結:http://blog.51cto.com/dubing/712455