天天看点

分享改进 高性能数据同步工具(一)

题外:在博文索引中暂时列出了开源的计划一览,虫子开源的目的是希望能有更多的交流,部分软件可能小得连开源协议的认证价值都没有。不管程序有多小多简单,用心把一个完整的设计思路、实现过程以及测试结果展现给大家。欢迎大牛拍砖,小牛问路。

软件背景

拿本次高性能数据同步工具来说,目前还处于开发阶段,大概是1/4的样子。为了避免模糊,就先把这1/4分享给大家。

数据作为系统的核心价值,因为其流动性所以经常会有载体的变更。如何高性能、安全的将数据搬移是一个大家经常接触也一直在用的课题。如果只是sql to sql可能作为程序员而言,DBA更适合这个内容,例如dts导入等。但是更多的实际场景下,可能会有文件、服务、甚至其他类型的数据流来源。所以作为码农,我们不妨多了解一下这方面的内容。

设计思路

暂时开源程序中只做了sql to sql的一部分。直接就以这个开始来讲吧。

首先是入参和返参的设计

<a href="http://blog.51cto.com/dubing/712455#">?</a>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

<code>/// &lt;summary&gt;</code>

<code>    </code><code>/// 入参接口</code>

<code>    </code><code>/// &lt;/summary&gt;</code>

<code>    </code><code>public</code> <code>interface</code> <code>IAOPParam</code>

<code>    </code><code>{</code>

<code>         </code><code>/// &lt;summary&gt;</code>

<code>         </code><code>/// 目标地址</code>

<code>         </code><code>/// &lt;/summary&gt;</code>

<code>         </code><code>string</code> <code>T_ConnectionString { </code><code>get</code><code>;  }</code>

<code>         </code><code>/// 请求行数</code>

<code>         </code><code>long</code> <code>MaxSize { </code><code>get</code><code>; }</code>

<code>         </code><code>/// 表名</code>

<code>         </code><code>string</code> <code>TableName { </code><code>get</code><code>; }</code>

<code>         </code><code>/// 当前行数</code>

<code>         </code><code>long</code> <code>CurrentSize { </code><code>get</code><code>; }</code>

<code>         </code><code>/// 域名</code>

<code>         </code><code>string</code> <code>p_Domain { </code><code>get</code><code>;  }</code>

<code>         </code><code>/// 断点文件地址</code>

<code>         </code><code>string</code> <code>p_InitPath { </code><code>get</code><code>;  }</code>

<code>         </code><code>/// 断点时间</code>

<code>         </code><code>DateTime p_Previous { </code><code>get</code><code>;  }</code>

<code>         </code><code>/// 是否结束</code>

<code>         </code><code>bool</code> <code>p_IsEnd { </code><code>get</code><code>;  }</code>

<code>         </code><code>/// 排序方式</code>

<code>         </code><code>string</code> <code>SortName { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>         </code><code>/// 单次请求大小</code>

<code>         </code><code>long</code> <code>SingleSize { </code><code>get</code><code>; }</code>

<code>         </code><code>/// 排序主键</code>

<code>         </code><code>string</code> <code>Sortkey { </code><code>get</code><code>;  }</code>

<code>         </code> 

<code>         </code><code>/// 是否支持事务</code>

<code>         </code><code>bool</code> <code>IsTransaction { </code><code>get</code><code>;  }</code>

<code>         </code><code>/// true为支持断点 发生断点或异常后程序终止   false为不支持断点 遇到断点或异常继续填充直到此次请求完成</code>

<code>         </code><code>bool</code> <code>IsBreakPoints { </code><code>get</code><code>;  }</code>

<code>         </code><code>/// guid</code>

<code>         </code><code>string</code> <code>T_Guid { </code><code>get</code><code>; }</code>

<code>    </code><code>}</code>

<code>    </code><code>/// &lt;summary&gt;</code>

<code>    </code><code>/// 对象处理返回的入参接口(泛型)</code>

<code>    </code><code>public</code> <code>interface</code> <code>IAOPParam&lt;T&gt; : IAOPParam</code>

<code>        </code><code>/// &lt;summary&gt;</code>

<code>        </code><code>/// 泛型附加对象</code>

<code>        </code><code>/// &lt;/summary&gt;</code>

<code>        </code><code>T ParamAttachObjectEx { </code><code>get</code><code>; }</code>

 这样设计的目的是考虑到服务器的内存与资源占用问题,如果数据来源的体积过大,我们将会对请求的来源进行分块处理。另外通过排序字段或者自定义的sql语句或者存储过程(暂未补充)可以对数据源进行高级过滤,断点续传的设计目前比较简单,web程序的话植入cookie、控制台或者cs程序通过文本媒介json格式来控制。

<code>#region IAOPResult</code>

<code>   </code><code>/// &lt;summary&gt;</code>

<code>   </code><code>/// 对象处理返回的结果接口</code>

<code>   </code><code>/// &lt;/summary&gt;</code>

<code>   </code><code>/// &lt;remarks&gt;</code>

<code>   </code><code>/// 建议在代码调用返回值中都采用此类实例为返回值&lt;br /&gt;</code>

<code>   </code><code>/// 一般ResultNo小于0表示异常,0表示成功,大于0表示其它一般提示信息</code>

<code>   </code><code>/// &lt;/remarks&gt;</code>

<code>   </code><code>public</code> <code>interface</code> <code>IAOPResult</code>

<code>   </code><code>{</code>

<code>       </code><code>/// &lt;summary&gt;</code>

<code>       </code><code>/// 返回代码</code>

<code>       </code><code>/// &lt;/summary&gt;</code>

<code>       </code><code>int</code> <code>ResultNo { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 对应的描述信息</code>

<code>       </code><code>string</code> <code>ResultDescription { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 相应的附加信息</code>

<code>       </code><code>object</code> <code>ResultAttachObject { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 内部AOPResult</code>

<code>       </code><code>IAOPResult InnerAOPResult { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 处理结果是否成功(ResultNo == 0)</code>

<code>       </code><code>bool</code> <code>IsSuccess { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 处理结果是否失败(ResultNo != 0 )</code>

<code>       </code><code>bool</code> <code>IsNotSuccess { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 处理结果是否失败(ResultNo &lt; 0 )</code>

<code>       </code><code>bool</code> <code>IsFailed { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 已处理,但有不致命的错误(ResultNo &gt; 0)</code>

<code>       </code><code>bool</code> <code>IsPassedButFailed { </code><code>get</code><code>; }</code>

<code>       </code><code>/// 如果处理失败,则抛出异常</code>

<code>       </code><code>/// &lt;returns&gt;返回本身&lt;/returns&gt;</code>

<code>       </code><code>IAOPResult ThrowErrorOnFailed();</code>

<code>   </code><code>}</code>

<code>   </code><code>#endregion IAOPResult</code>

<code>   </code><code>#region IAOPResult&lt;T&gt;</code>

<code>   </code><code>/// 对象处理返回的结果接口(泛型)</code>

<code>   </code><code>public</code> <code>interface</code> <code>IAOPResult&lt;T&gt; : IAOPResult</code>

<code>       </code><code>/// 泛型附加对象</code>

<code>       </code><code>T ResultAttachObjectEx { </code><code>get</code><code>; }</code>

<code>   </code><code>#endregion</code>

 返参的设计比较通用化,大家可以自己摸索下。自己也可以补充添加。

<a href="http://blog.51cto.com/dubing/712455#">expand source</a>

 异常基类。

 日志采取lognet 不赘述

 单例通用类 关于作用可以参考虫子设计模式随笔中的相关博文

 状态类,通过这个类可以反映出当前数据同步的进度。

边缘化的准备工作大体如此,下面是主要的实现过程。过程中有几个注意点,同步读写还是异步读写、是否存在线程安全甚至进程的资源安全(例如我在读写前5000条的时候突然在另外一个客户端CRUD了N条数据),另外,我们读写的时候是用连接的方式还是使用非连接的方式,如何解决服务器端内存占用问题,如何实现excel、txt、sql、oracle等不同数据来源的多态性。

实现过程 

这里就先介绍下已经解决的一些问题

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

<code>public</code> <code>class</code> <code>AnalyseDataManager</code>

<code>{</code>

<code>    </code><code>public</code> <code>Status MStatus { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>    </code><code>public</code> <code>static</code> <code>int</code> <code>SingleSize = 5000;</code>

<code>    </code><code>public</code> <code>static</code> <code>int</code> <code>StatusSize = 1000;</code>

<code>    </code><code>readonly</code> <code>Sqlhelper _sh = </code><code>new</code> <code>Sqlhelper();</code>

<code>    </code><code>public</code> <code>AnalyseDataManager()</code>

<code>    </code><code>public</code> <code>AnalyseDataManager(Status st)</code>

<code>        </code><code>: </code><code>this</code><code>()</code>

<code>        </code><code>MStatus = st;</code>

<code>    </code><code>public</code> <code>IAsyncResult OutMethod(AopParam app)</code>

<code>        </code><code>MStatus.MTotalSize = app.MaxSize;</code>

<code>        </code><code>var</code> <code>func = </code><code>new</code> <code>Func&lt;AopParam, </code><code>bool</code><code>&gt;(ServerMethod);</code>

<code>        </code><code>return</code> <code>func.BeginInvoke(app, CallbackMethod, func);</code>

<code>    </code><code>/// 复制数据</code>

<code>    </code><code>/// &lt;returns&gt;是否成功&lt;/returns&gt;</code>

<code>    </code><code>public</code> <code>bool</code> <code>ServerMethod(AopParam app)</code>

<code>        </code><code>try</code>

<code>        </code><code>{</code>

<code>            </code><code>_sh.App = app;</code>

<code>            </code><code>if</code> <code>(_sh.OpenConn().IsSuccess)</code>

<code>            </code><code>{</code>

<code>                </code><code>while</code> <code>(app.MaxSize &gt; MStatus.MCurrentSize)</code>

<code>                </code><code>{</code>

<code>                    </code><code>app.CurrentSize = MStatus.MCurrentSize;</code>

<code>                    </code><code>if</code> <code>(!AsyncDataToServer(app) &amp;&amp; app.IsBreakPoints)</code>

<code>                    </code><code>{</code>

<code>                        </code><code>break</code><code>;</code>

<code>                    </code><code>}</code>

<code>                </code><code>}</code>

<code>            </code><code>}</code>

<code>        </code><code>}</code>

<code>        </code><code>catch</code> <code>(Exception ex)</code>

<code>            </code><code>Log4N.WarnLog(</code><code>"ServerMethod出错"</code><code>, ex);</code>

<code>            </code><code>if</code> <code>(app.IsBreakPoints)</code>

<code>                </code><code>return</code> <code>false</code><code>;</code>

<code>        </code><code>finally</code>

<code>            </code><code>_sh.Dispose();</code>

<code>        </code><code>return</code> <code>true</code><code>;</code>

<code>    </code><code>private</code> <code>bool</code> <code>AsyncDataToServer(AopParam app)</code>

<code>        </code><code>Log4N.InfoLog(</code><code>string</code><code>.Format(</code><code>"数据同步开始\r\n来源数据{0}\r\n表的名字{1}\r\n一次性提交的行数{2}\r\n当前行数{3}"</code><code>, app.T_ConnectionString, app.TableName, app.MaxSize, app.CurrentSize));</code>

<code>        </code><code>using</code> <code>(</code><code>var</code> <code>bcp = </code><code>new</code> <code>SqlBulkCopy(_sh.TconnSql))</code>

<code>            </code><code>MStatus.Statusflag = Status.CopyStatus.Doing;</code>

<code>            </code><code>bcp.BatchSize = SingleSize;</code>

<code>            </code><code>bcp.DestinationTableName = app.TableName;</code>

<code>            </code><code>bcp.SqlRowsCopied +=</code>

<code>              </code><code>OnSqlRowsCopied;</code>

<code>            </code><code>bcp.NotifyAfter = StatusSize;</code>

<code>            </code><code>try</code>

<code>                </code><code>bcp.WriteToServer(_sh.GetDtResultImp());</code>

<code>            </code><code>catch</code> <code>(Exception ex)</code>

<code>                </code><code>Log4N.WarnLog(</code><code>"AsyncDataToServer出错"</code><code>, ex);</code>

<code>            </code><code>finally</code>

<code>                </code><code>_sh.IreaderSql.Close();</code>

<code>            </code><code>return</code> <code>true</code><code>;</code>

<code>    </code><code>private</code> <code>void</code> <code>OnSqlRowsCopied(</code><code>object</code> <code>sender, SqlRowsCopiedEventArgs e)</code>

<code>        </code><code>Thread.Sleep(1000);</code>

<code>        </code><code>MStatus.MCurrentSize += StatusSize;</code>

<code>    </code><code>public</code> <code>void</code> <code>CallbackMethod(IAsyncResult ar)</code>

<code>        </code><code>var</code> <code>caller = (Func&lt;AopParam,</code><code>bool</code> <code>&gt;)ar.AsyncState;</code>

<code>        </code><code>if</code> <code>(caller.EndInvoke(ar))</code>

<code>            </code><code>MStatus.Statusflag = Status.CopyStatus.Finished;</code>

<code>}</code>

Microsoft SQL Server 提供一个称为 bcp 的流行的命令提示符实用工具,用于将数据从一个表移动到另一个表(表既可以在同一个服务器上,也可以在不同服务器上)。SqlBulkCopy 类允许编写提供类似功能的托管代码解决方案。还有其他将数据加载到 SQL Server 表的方法(例如 INSERT 语句),但相比之下 SqlBulkCopy 提供明显的性能优势。使用 SqlBulkCopy 类只能向 SQL Server 表写入数据。但是,数据源不限于 SQL Server;可以使用任何数据源,只要数据可加载到 DataTable 实例或可使用 IDataReader 实例读取数据。其中 SqlRowsCopied 在每次处理完 NotifyAfter 属性指定的行数时发生。

ServerMethod为主方法提供单次客户端请求的逻辑。

OutMethod对外开放以bpm异步编程模型形式进行处理、sqlhelper之所以不设计成单列,为了保证可以多个客户端请求状态不干扰。

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

<code>public</code> <code>class</code> <code>Sqlhelper : IDisposable</code>

<code>       </code><code>private</code> <code>readonly</code> <code>string</code> <code>_sqlconn = ConfigurationSettings.AppSettings[</code><code>"BaseConn"</code><code>];</code>

<code>       </code><code>public</code> <code>bool</code> <code>IblnTransBegin { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>       </code><code>public</code> <code>SqlTransaction ItransSql { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>       </code><code>public</code> <code>SqlConnection IconnSql { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>       </code><code>public</code> <code>SqlConnection TconnSql { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>       </code><code>public</code> <code>SqlDataReader IreaderSql { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>       </code><code>public</code> <code>IAOPParam App { </code><code>get</code><code>; </code><code>set</code><code>; }</code>

<code>       </code><code>public</code> <code>DataTable GetDtResult(</code><code>string</code> <code>sqlcommand)</code>

<code>       </code><code>{</code>

<code>           </code><code>var</code> <code>ds = </code><code>new</code> <code>DataSet();</code>

<code>           </code><code>var</code> <code>da = </code><code>new</code> <code>SqlDataAdapter(sqlcommand, </code><code>new</code> <code>SqlConnection(_sqlconn));</code>

<code>           </code><code>da.Fill(ds);</code>

<code>           </code><code>if</code> <code>(ds.Tables[0] != </code><code>null</code><code>)</code>

<code>           </code><code>{</code>

<code>               </code><code>return</code> <code>ds.Tables[0];</code>

<code>           </code><code>}</code>

<code>           </code><code>return</code> <code>null</code><code>;</code>

<code>       </code><code>}</code>

<code>       </code><code>public</code> <code>DataTable GetDtResult()</code>

<code>           </code><code>//string sqlstr = string.Format("Select Top {0} * From {1} Where {2} not in (select Top {4} {2} From {1} order by {2} {3} ) order by {2} {3}  ", app.SingleSize.ToString(), app.TableName, app.Sortkey, app.SortName, app.CurrentSize.ToString());</code>

<code>           </code><code>string</code> <code>sqlstr = GetCommandByApp();</code>

<code>           </code><code>var</code> <code>da = </code><code>new</code> <code>SqlDataAdapter(sqlstr, </code><code>new</code> <code>SqlConnection(_sqlconn));</code>

<code>       </code><code>public</code> <code>SqlDataReader GetDtResultImp()</code>

<code>           </code><code>var</code> <code>sqlstr = GetCommandByApp();</code>

<code>           </code><code>var</code> <code>command = </code><code>new</code> <code>SqlCommand(</code>

<code>              </code><code>sqlstr, IconnSql);</code>

<code>           </code><code>IreaderSql =</code>

<code>             </code><code>command.ExecuteReader();</code>

<code>           </code><code>return</code> <code>IreaderSql;</code>

<code>       </code><code>public</code> <code>IAOPResult OpenConn()</code>

<code>           </code><code>var</code> <code>ar = </code><code>new</code> <code>AOPResult(0);</code>

<code>           </code><code>IconnSql = </code><code>new</code> <code>SqlConnection(_sqlconn);</code>

<code>           </code><code>TconnSql = </code><code>new</code> <code>SqlConnection(App.T_ConnectionString);</code>

<code>           </code><code>try</code>

<code>               </code><code>IconnSql.Open();</code>

<code>               </code><code>TconnSql.Open();</code>

<code>           </code><code>catch</code> <code>(SqlException ex)</code>

<code>               </code><code>ar.ResultNo = 1;</code>

<code>               </code><code>Log4N.InfoLog(</code><code>string</code><code>.Format(</code><code>"OpenConn失败,详细消息为{0},源表"</code><code>, ex.Message), App);</code>

<code>           </code><code>return</code> <code>ar;</code>

<code>       </code><code>public</code> <code>IAOPResult CloseConn()</code>

<code>               </code><code>IconnSql.Close();</code>

<code>               </code><code>TconnSql.Close();</code>

<code>               </code><code>Log4N.InfoLog(</code><code>string</code><code>.Format(</code><code>"CloseConn失败,详细消息为{0},源表"</code><code>, ex.Message), App);</code>

<code>       </code><code>public</code> <code>IAOPResult BeginTran()</code>

<code>           </code><code>ItransSql = IconnSql.BeginTransaction();</code>

<code>       </code><code>public</code> <code>void</code> <code>Dispose()</code>

<code>           </code><code>CloseConn();</code>

<code>       </code><code>public</code> <code>string</code> <code>GetCommandByApp()</code>

<code>           </code><code>string</code> <code>sqlstr = </code><code>string</code><code>.Empty;</code>

<code>           </code><code>if</code><code>(App.CurrentSize == 0)</code>

<code>               </code><code>switch</code> <code>(App.SortName.ToLower())</code>

<code>               </code><code>{</code>

<code>                   </code><code>case</code> <code>"asc"</code><code>:</code>

<code>                       </code><code>sqlstr = </code><code>string</code><code>.Format(</code><code>"Select Top {0} * From {1}  order by {2} asc"</code><code>, App.SingleSize.ToString(), App.TableName, App.Sortkey);</code>

<code>                       </code><code>break</code><code>;</code>

<code>                   </code><code>case</code> <code>"desc"</code><code>:</code>

<code>                       </code><code>sqlstr = </code><code>string</code><code>.Format(</code><code>"Select Top {0} * From {1}  order by {2} desc"</code><code>, App.SingleSize.ToString(), App.TableName, App.Sortkey);</code>

<code>               </code><code>}</code>

<code>           </code><code>else</code>

<code>                       </code><code>sqlstr = </code><code>string</code><code>.Format(</code><code>"Select Top {0} * From {1} Where {2} &gt;(select max ({2}) From (select Top {3} {2} From {1} order by {2} asc ) as temp_chongzi) order by {2} asc"</code><code>, App.SingleSize.ToString(), App.TableName, App.Sortkey, App.CurrentSize.ToString());</code>

<code>                       </code><code>sqlstr = </code><code>string</code><code>.Format(</code><code>"Select Top {0} * From {1} Where {2} &lt;(select min ({2}) From (select Top {3} {2} From {1}) order by {2} desc )as temp_chongzi) order by {2} desc"</code><code>, App.SingleSize.ToString(), App.TableName, App.Sortkey, App.CurrentSize.ToString());</code>

<code>               </code><code>}     </code>

<code>           </code><code>return</code> <code>sqlstr;</code>

 数据库访问层中首先是一个类似分页sql的设计,来优化单次请求的效率。bcp的来源可以选择连接式的SqlDataReader 或者非连接式的Dataset,2者各有优缺。前者需要打开SqlConnection,但是是逐条读取,后者非连接但是占用内存大。至于具体的性能比,虫子在下一章节再和大家讨论。至于源程序目前还是草稿版,很多功能还未实现,细节处理也不够细腻,因为异步目前只设置了一个线程,还未涉及到并行框架,性能方面还有相当大的提高空间。先放出来让大家讨论,细节方面可以暂时先略过,大家可以说说在设计方面如何才能更高效、稳定。

本文转自 熬夜的虫子  51CTO博客,原文链接:http://blog.51cto.com/dubing/712455