天天看點

分享改進 高性能資料同步工具(一)

題外:在博文索引中暫時列出了開源的計劃一覽,蟲子開源的目的是希望能有更多的交流,部分軟體可能小得連開源協定的認證價值都沒有。不管程式有多小多簡單,用心把一個完整的設計思路、實作過程以及測試結果展現給大家。歡迎大牛拍磚,小牛問路。

軟體背景

拿本次高性能資料同步工具來說,目前還處于開發階段,大概是1/4的樣子。為了避免模糊,就先把這1/4分享給大家。

資料作為系統的核心價值,因為其流動性是以經常會有載體的變更。如何高性能、安全的将資料搬移是一個大家經常接觸也一直在用的課題。如果隻是sql to sql可能作為程式員而言,DBA更适合這個内容,例如dts導入等。但是更多的實際場景下,可能會有檔案、服務、甚至其他類型的資料流來源。是以作為碼農,我們不妨多了解一下這方面的内容。

設計思路

暫時開源程式中隻做了sql to sql的一部分。直接就以這個開始來講吧。

首先是入參和返參的設計

<a href="http://blog.51cto.com/dubing/712455#">?</a>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

<code>/// &lt;summary&gt;</code>

<code>    </code><code>/// 入參接口</code>

<code>    </code><code>/// &lt;/summary&gt;</code>

<code>    </code><code>public</code> <code>interface</code> <code>IAOPParam</code>

<code>    </code><code>{</code>

<code>         </code><code>/// &lt;summary&gt;</code>

<code>         </code><code>/// 目标位址</code>

<code>         </code><code>/// &lt;/summary&gt;</code>

<code>         </code><code>string</code> <code>T_ConnectionString { </code><code>get</code><code>;  }</code>

<code>         </code><code>/// 請求行數</code>

<code>         </code><code>long</code> <code>MaxSize { </code><code>get</code><code>; }</code>

<code>         </code><code>/// 表名</code>

<code>         </code><code>string</code> <code>TableName { </code><code>get</code><code>; }</code>

<code>         </code><code>/// 目前行數</code>

<code>         </code><code>long</code> <code>CurrentSize { </code><code>get</code><code>; }</code>

<code>         </code><code>/// 域名</code>

<code>         </code><code>string</code> <code>p_Domain { </code><code>get</code><code>;  }</code>

<code>         </code><code>/// 斷點檔案位址</code>

<code>         </code><code>string</code> <code>p_InitPath { </code><code>get</code><code>;  }</code>

<code>         </code><code>/// 斷點時間</code>

<code>         </code><code>DateTime p_Previous { </code><code>get</code><code>;  }</code>

<code>         </code><code>/// 是否結束</code>

<code>         </code><code>bool</code> <code>p_IsEnd { </code><code>get</code><code>;  }</code>

<code>         </code><code>/// 排序方式</code>

<code>         </code><code>string</code> <code>SortName { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>         </code><code>/// 單次請求大小</code>

<code>         </code><code>long</code> <code>SingleSize { </code><code>get</code><code>; }</code>

<code>         </code><code>/// 排序主鍵</code>

<code>         </code><code>string</code> <code>Sortkey { </code><code>get</code><code>;  }</code>

<code>         </code> 

<code>         </code><code>/// 是否支援事務</code>

<code>         </code><code>bool</code> <code>IsTransaction { </code><code>get</code><code>;  }</code>

<code>         </code><code>/// true為支援斷點 發生斷點或異常後程式終止   false為不支援斷點 遇到斷點或異常繼續填充直到此次請求完成</code>

<code>         </code><code>bool</code> <code>IsBreakPoints { </code><code>get</code><code>;  }</code>

<code>         </code><code>/// guid</code>

<code>         </code><code>string</code> <code>T_Guid { </code><code>get</code><code>; }</code>

<code>    </code><code>}</code>

<code>    </code><code>/// &lt;summary&gt;</code>

<code>    </code><code>/// 對象處理傳回的入參接口(泛型)</code>

<code>    </code><code>public</code> <code>interface</code> <code>IAOPParam&lt;T&gt; : IAOPParam</code>

<code>        </code><code>/// &lt;summary&gt;</code>

<code>        </code><code>/// 泛型附加對象</code>

<code>        </code><code>/// &lt;/summary&gt;</code>

<code>        </code><code>T ParamAttachObjectEx { </code><code>get</code><code>; }</code>

 這樣設計的目的是考慮到伺服器的記憶體與資源占用問題,如果資料來源的體積過大,我們将會對請求的來源進行分塊處理。另外通過排序字段或者自定義的sql語句或者存儲過程(暫未補充)可以對資料源進行進階過濾,斷點續傳的設計目前比較簡單,web程式的話植入cookie、控制台或者cs程式通過文本媒介json格式來控制。

<code>#region IAOPResult</code>

<code>   </code><code>/// &lt;summary&gt;</code>

<code>   </code><code>/// 對象處理傳回的結果接口</code>

<code>   </code><code>/// &lt;/summary&gt;</code>

<code>   </code><code>/// &lt;remarks&gt;</code>

<code>   </code><code>/// 建議在代碼調用傳回值中都采用此類執行個體為傳回值&lt;br /&gt;</code>

<code>   </code><code>/// 一般ResultNo小于0表示異常,0表示成功,大于0表示其它一般提示資訊</code>

<code>   </code><code>/// &lt;/remarks&gt;</code>

<code>   </code><code>public</code> <code>interface</code> <code>IAOPResult</code>

<code>   </code><code>{</code>

<code>       </code><code>/// &lt;summary&gt;</code>

<code>       </code><code>/// 傳回代碼</code>

<code>       </code><code>/// &lt;/summary&gt;</code>

<code>       </code><code>int</code> <code>ResultNo { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 對應的描述資訊</code>

<code>       </code><code>string</code> <code>ResultDescription { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 相應的附加資訊</code>

<code>       </code><code>object</code> <code>ResultAttachObject { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 内部AOPResult</code>

<code>       </code><code>IAOPResult InnerAOPResult { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 處理結果是否成功(ResultNo == 0)</code>

<code>       </code><code>bool</code> <code>IsSuccess { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 處理結果是否失敗(ResultNo != 0 )</code>

<code>       </code><code>bool</code> <code>IsNotSuccess { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 處理結果是否失敗(ResultNo &lt; 0 )</code>

<code>       </code><code>bool</code> <code>IsFailed { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 已處理,但有不緻命的錯誤(ResultNo &gt; 0)</code>

<code>       </code><code>bool</code> <code>IsPassedButFailed { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 如果處理失敗,則抛出異常</code>

<code>       </code><code>/// &lt;returns&gt;傳回本身&lt;/returns&gt;</code>

<code>       </code><code>IAOPResult ThrowErrorOnFailed();</code>

<code>   </code><code>}</code>

<code>   </code><code>#endregion IAOPResult</code>

<code>   </code><code>#region IAOPResult&lt;T&gt;</code>

<code>   </code><code>/// 對象處理傳回的結果接口(泛型)</code>

<code>   </code><code>public</code> <code>interface</code> <code>IAOPResult&lt;T&gt; : IAOPResult</code>

<code>       </code><code>/// 泛型附加對象</code>

<code>       </code><code>T ResultAttachObjectEx { </code><code>get</code><code>; }</code>

<code>   </code><code>#endregion</code>

 返參的設計比較通用化,大家可以自己摸索下。自己也可以補充添加。

<a href="http://blog.51cto.com/dubing/712455#">expand source</a>

 異常基類。

 日志采取lognet 不贅述

 單例通用類 關于作用可以參考蟲子設計模式随筆中的相關博文

 狀态類,通過這個類可以反映出目前資料同步的進度。

邊緣化的準備工作大體如此,下面是主要的實作過程。過程中有幾個注意點,同步讀寫還是異步讀寫、是否存線上程安全甚至程序的資源安全(例如我在讀寫前5000條的時候突然在另外一個用戶端CRUD了N條資料),另外,我們讀寫的時候是用連接配接的方式還是使用非連接配接的方式,如何解決伺服器端記憶體占用問題,如何實作excel、txt、sql、oracle等不同資料來源的多态性。

實作過程 

這裡就先介紹下已經解決的一些問題

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

<code>public</code> <code>class</code> <code>AnalyseDataManager</code>

<code>{</code>

<code>    </code><code>public</code> <code>Status MStatus { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>    </code><code>public</code> <code>static</code> <code>int</code> <code>SingleSize = 5000;</code>

<code>    </code><code>public</code> <code>static</code> <code>int</code> <code>StatusSize = 1000;</code>

<code>    </code><code>readonly</code> <code>Sqlhelper _sh = </code><code>new</code> <code>Sqlhelper();</code>

<code>    </code><code>public</code> <code>AnalyseDataManager()</code>

<code>    </code><code>public</code> <code>AnalyseDataManager(Status st)</code>

<code>        </code><code>: </code><code>this</code><code>()</code>

<code>        </code><code>MStatus = st;</code>

<code>    </code><code>public</code> <code>IAsyncResult OutMethod(AopParam app)</code>

<code>        </code><code>MStatus.MTotalSize = app.MaxSize;</code>

<code>        </code><code>var</code> <code>func = </code><code>new</code> <code>Func&lt;AopParam, </code><code>bool</code><code>&gt;(ServerMethod);</code>

<code>        </code><code>return</code> <code>func.BeginInvoke(app, CallbackMethod, func);</code>

<code>    </code><code>/// 複制資料</code>

<code>    </code><code>/// &lt;returns&gt;是否成功&lt;/returns&gt;</code>

<code>    </code><code>public</code> <code>bool</code> <code>ServerMethod(AopParam app)</code>

<code>        </code><code>try</code>

<code>        </code><code>{</code>

<code>            </code><code>_sh.App = app;</code>

<code>            </code><code>if</code> <code>(_sh.OpenConn().IsSuccess)</code>

<code>            </code><code>{</code>

<code>                </code><code>while</code> <code>(app.MaxSize &gt; MStatus.MCurrentSize)</code>

<code>                </code><code>{</code>

<code>                    </code><code>app.CurrentSize = MStatus.MCurrentSize;</code>

<code>                    </code><code>if</code> <code>(!AsyncDataToServer(app) &amp;&amp; app.IsBreakPoints)</code>

<code>                    </code><code>{</code>

<code>                        </code><code>break</code><code>;</code>

<code>                    </code><code>}</code>

<code>                </code><code>}</code>

<code>            </code><code>}</code>

<code>        </code><code>}</code>

<code>        </code><code>catch</code> <code>(Exception ex)</code>

<code>            </code><code>Log4N.WarnLog(</code><code>"ServerMethod出錯"</code><code>, ex);</code>

<code>            </code><code>if</code> <code>(app.IsBreakPoints)</code>

<code>                </code><code>return</code> <code>false</code><code>;</code>

<code>        </code><code>finally</code>

<code>            </code><code>_sh.Dispose();</code>

<code>        </code><code>return</code> <code>true</code><code>;</code>

<code>    </code><code>private</code> <code>bool</code> <code>AsyncDataToServer(AopParam app)</code>

<code>        </code><code>Log4N.InfoLog(</code><code>string</code><code>.Format(</code><code>"資料同步開始\r\n來源資料{0}\r\n表的名字{1}\r\n一次性送出的行數{2}\r\n目前行數{3}"</code><code>, app.T_ConnectionString, app.TableName, app.MaxSize, app.CurrentSize));</code>

<code>        </code><code>using</code> <code>(</code><code>var</code> <code>bcp = </code><code>new</code> <code>SqlBulkCopy(_sh.TconnSql))</code>

<code>            </code><code>MStatus.Statusflag = Status.CopyStatus.Doing;</code>

<code>            </code><code>bcp.BatchSize = SingleSize;</code>

<code>            </code><code>bcp.DestinationTableName = app.TableName;</code>

<code>            </code><code>bcp.SqlRowsCopied +=</code>

<code>              </code><code>OnSqlRowsCopied;</code>

<code>            </code><code>bcp.NotifyAfter = StatusSize;</code>

<code>            </code><code>try</code>

<code>                </code><code>bcp.WriteToServer(_sh.GetDtResultImp());</code>

<code>            </code><code>catch</code> <code>(Exception ex)</code>

<code>                </code><code>Log4N.WarnLog(</code><code>"AsyncDataToServer出錯"</code><code>, ex);</code>

<code>            </code><code>finally</code>

<code>                </code><code>_sh.IreaderSql.Close();</code>

<code>            </code><code>return</code> <code>true</code><code>;</code>

<code>    </code><code>private</code> <code>void</code> <code>OnSqlRowsCopied(</code><code>object</code> <code>sender, SqlRowsCopiedEventArgs e)</code>

<code>        </code><code>Thread.Sleep(1000);</code>

<code>        </code><code>MStatus.MCurrentSize += StatusSize;</code>

<code>    </code><code>public</code> <code>void</code> <code>CallbackMethod(IAsyncResult ar)</code>

<code>        </code><code>var</code> <code>caller = (Func&lt;AopParam,</code><code>bool</code> <code>&gt;)ar.AsyncState;</code>

<code>        </code><code>if</code> <code>(caller.EndInvoke(ar))</code>

<code>            </code><code>MStatus.Statusflag = Status.CopyStatus.Finished;</code>

<code>}</code>

Microsoft SQL Server 提供一個稱為 bcp 的流行的指令提示符實用工具,用于将資料從一個表移動到另一個表(表既可以在同一個伺服器上,也可以在不同伺服器上)。SqlBulkCopy 類允許編寫提供類似功能的托管代碼解決方案。還有其他将資料加載到 SQL Server 表的方法(例如 INSERT 語句),但相比之下 SqlBulkCopy 提供明顯的性能優勢。使用 SqlBulkCopy 類隻能向 SQL Server 表寫入資料。但是,資料源不限于 SQL Server;可以使用任何資料源,隻要資料可加載到 DataTable 執行個體或可使用 IDataReader 執行個體讀取資料。其中 SqlRowsCopied 在每次處理完 NotifyAfter 屬性指定的行數時發生。

ServerMethod為主方法提供單次用戶端請求的邏輯。

OutMethod對外開放以bpm異步程式設計模型形式進行處理、sqlhelper之是以不設計成單列,為了保證可以多個用戶端請求狀态不幹擾。

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

<code>public</code> <code>class</code> <code>Sqlhelper : IDisposable</code>

<code>       </code><code>private</code> <code>readonly</code> <code>string</code> <code>_sqlconn = ConfigurationSettings.AppSettings[</code><code>"BaseConn"</code><code>];</code>

<code>       </code><code>public</code> <code>bool</code> <code>IblnTransBegin { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>       </code><code>public</code> <code>SqlTransaction ItransSql { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>       </code><code>public</code> <code>SqlConnection IconnSql { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>       </code><code>public</code> <code>SqlConnection TconnSql { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>       </code><code>public</code> <code>SqlDataReader IreaderSql { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>       </code><code>public</code> <code>IAOPParam App { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>       </code><code>public</code> <code>DataTable GetDtResult(</code><code>string</code> <code>sqlcommand)</code>

<code>       </code><code>{</code>

<code>           </code><code>var</code> <code>ds = </code><code>new</code> <code>DataSet();</code>

<code>           </code><code>var</code> <code>da = </code><code>new</code> <code>SqlDataAdapter(sqlcommand, </code><code>new</code> <code>SqlConnection(_sqlconn));</code>

<code>           </code><code>da.Fill(ds);</code>

<code>           </code><code>if</code> <code>(ds.Tables[0] != </code><code>null</code><code>)</code>

<code>           </code><code>{</code>

<code>               </code><code>return</code> <code>ds.Tables[0];</code>

<code>           </code><code>}</code>

<code>           </code><code>return</code> <code>null</code><code>;</code>

<code>       </code><code>}</code>

<code>       </code><code>public</code> <code>DataTable GetDtResult()</code>

<code>           </code><code>//string sqlstr = string.Format("Select Top {0} * From {1} Where {2} not in (select Top {4} {2} From {1} order by {2} {3} ) order by {2} {3}  ", app.SingleSize.ToString(), app.TableName, app.Sortkey, app.SortName, app.CurrentSize.ToString());</code>

<code>           </code><code>string</code> <code>sqlstr = GetCommandByApp();</code>

<code>           </code><code>var</code> <code>da = </code><code>new</code> <code>SqlDataAdapter(sqlstr, </code><code>new</code> <code>SqlConnection(_sqlconn));</code>

<code>       </code><code>public</code> <code>SqlDataReader GetDtResultImp()</code>

<code>           </code><code>var</code> <code>sqlstr = GetCommandByApp();</code>

<code>           </code><code>var</code> <code>command = </code><code>new</code> <code>SqlCommand(</code>

<code>              </code><code>sqlstr, IconnSql);</code>

<code>           </code><code>IreaderSql =</code>

<code>             </code><code>command.ExecuteReader();</code>

<code>           </code><code>return</code> <code>IreaderSql;</code>

<code>       </code><code>public</code> <code>IAOPResult OpenConn()</code>

<code>           </code><code>var</code> <code>ar = </code><code>new</code> <code>AOPResult(0);</code>

<code>           </code><code>IconnSql = </code><code>new</code> <code>SqlConnection(_sqlconn);</code>

<code>           </code><code>TconnSql = </code><code>new</code> <code>SqlConnection(App.T_ConnectionString);</code>

<code>           </code><code>try</code>

<code>               </code><code>IconnSql.Open();</code>

<code>               </code><code>TconnSql.Open();</code>

<code>           </code><code>catch</code> <code>(SqlException ex)</code>

<code>               </code><code>ar.ResultNo = 1;</code>

<code>               </code><code>Log4N.InfoLog(</code><code>string</code><code>.Format(</code><code>"OpenConn失敗,詳細消息為{0},源表"</code><code>, ex.Message), App);</code>

<code>           </code><code>return</code> <code>ar;</code>

<code>       </code><code>public</code> <code>IAOPResult CloseConn()</code>

<code>               </code><code>IconnSql.Close();</code>

<code>               </code><code>TconnSql.Close();</code>

<code>               </code><code>Log4N.InfoLog(</code><code>string</code><code>.Format(</code><code>"CloseConn失敗,詳細消息為{0},源表"</code><code>, ex.Message), App);</code>

<code>       </code><code>public</code> <code>IAOPResult BeginTran()</code>

<code>           </code><code>ItransSql = IconnSql.BeginTransaction();</code>

<code>       </code><code>public</code> <code>void</code> <code>Dispose()</code>

<code>           </code><code>CloseConn();</code>

<code>       </code><code>public</code> <code>string</code> <code>GetCommandByApp()</code>

<code>           </code><code>string</code> <code>sqlstr = </code><code>string</code><code>.Empty;</code>

<code>           </code><code>if</code><code>(App.CurrentSize == 0)</code>

<code>               </code><code>switch</code> <code>(App.SortName.ToLower())</code>

<code>               </code><code>{</code>

<code>                   </code><code>case</code> <code>"asc"</code><code>:</code>

<code>                       </code><code>sqlstr = </code><code>string</code><code>.Format(</code><code>"Select Top {0} * From {1}  order by {2} asc"</code><code>, App.SingleSize.ToString(), App.TableName, App.Sortkey);</code>

<code>                       </code><code>break</code><code>;</code>

<code>                   </code><code>case</code> <code>"desc"</code><code>:</code>

<code>                       </code><code>sqlstr = </code><code>string</code><code>.Format(</code><code>"Select Top {0} * From {1}  order by {2} desc"</code><code>, App.SingleSize.ToString(), App.TableName, App.Sortkey);</code>

<code>               </code><code>}</code>

<code>           </code><code>else</code>

<code>                       </code><code>sqlstr = </code><code>string</code><code>.Format(</code><code>"Select Top {0} * From {1} Where {2} &gt;(select max ({2}) From (select Top {3} {2} From {1} order by {2} asc ) as temp_chongzi) order by {2} asc"</code><code>, App.SingleSize.ToString(), App.TableName, App.Sortkey, App.CurrentSize.ToString());</code>

<code>                       </code><code>sqlstr = </code><code>string</code><code>.Format(</code><code>"Select Top {0} * From {1} Where {2} &lt;(select min ({2}) From (select Top {3} {2} From {1}) order by {2} desc )as temp_chongzi) order by {2} desc"</code><code>, App.SingleSize.ToString(), App.TableName, App.Sortkey, App.CurrentSize.ToString());</code>

<code>               </code><code>}     </code>

<code>           </code><code>return</code> <code>sqlstr;</code>

 資料庫通路層中首先是一個類似分頁sql的設計,來優化單次請求的效率。bcp的來源可以選擇連接配接式的SqlDataReader 或者非連接配接式的Dataset,2者各有優缺。前者需要打開SqlConnection,但是是逐條讀取,後者非連接配接但是占用記憶體大。至于具體的性能比,蟲子在下一章節再和大家讨論。至于源程式目前還是草稿版,很多功能還未實作,細節處理也不夠細膩,因為異步目前隻設定了一個線程,還未涉及到并行架構,性能方面還有相當大的提高空間。先放出來讓大家讨論,細節方面可以暫時先略過,大家可以說說在設計方面如何才能更高效、穩定。

本文轉自 熬夜的蟲子  51CTO部落格,原文連結:http://blog.51cto.com/dubing/712455