天天看點

SQL Server 多表資料增量擷取和釋出 4

核心代碼分析

最關鍵的在于擷取捕獲表資訊(系統表中間_CT結尾的資料)。

根據網上資料查取,找到了擷取目前捕獲表時間區間範圍内資料的方式。

見[SQL Server 多表資料增量擷取和釋出 2.3(

https://www.jianshu.com/p/6a400eca6e79

)

--10.按照時間範圍查詢CDC結果
DECLARE @from_lsn BINARY(10),@end_lsn BINARY(10)
DECLARE @start_time DATETIME = '2018-08-01'
DECLARE @end_time DATETIME ='2018-08-30'
SELECT @from_lsn=sys.fn_cdc_map_time_to_lsn('smallest greater than or equal',@start_time)
SELECT @end_lsn=sys.fn_cdc_map_time_to_lsn(' largest less than or equal',@end_time)
SELECT * FROM cdc.fn_cdc_get_all_changes_dbo_Department(@from_lsn,@end_lsn,'all')
           

資料既然能夠通過sql語句擷取到,那麼邏輯判斷就會變得簡單,通過分析我們可以發現select * from XXX ,XXX就是上文中講到的CDC生成的表值函數,表值函數前面相等,可變化的就是架構名_表名稱(dbo_Person)

SQL Server 多表資料增量擷取和釋出 4

image.png

是以我們完全可以通過拼接sql語句得到我們需要的内容,可以預設傳回給我們的資料是不友好的,我們還需要自己在做一步設定,将某些字段變成我們好了解的内容

如對下文内容不了解,可翻閱LZ之前的文章

  • sys.fn_cdc_map_lsn_to_time(__$start_lsn) AS UpdateTime
  • [__$operation] AS Operation

通過檢視CDC生成的捕獲表我們發現,其實他是在原來的資料表結構上新增了幾個字段給我們,其他的表也相同。

SQL Server 多表資料增量擷取和釋出 4
SQL Server 多表資料增量擷取和釋出 4

那我們在代碼中對實體的設計就可以基于繼承相同父類的方式,定義一個父類,擁有共同屬性

public partial class ExtBase
    {
        /// <summary>
        /// 更新時間
        /// </summary>
        public DateTime UpdateTime { get; set; }

        /// <summary>
        /// 操作方式 1 = 删除,2 = 插入,3 = 更新(舊值),4 = 更新(新值)
        /// </summary>
        public int Operation { get; set; }
    }
           

其他表都是在自己原來字段的基礎上繼承目前父類

public class Department : ExtBase
    {
        public int Id { get; set; }
        public string Name { get; set; }
    }

    public class Person : ExtBase
    {

        public int Id { get; set; }

        public string Name { get; set; }

        public int? Age { get; set; }
    }
           

實體類結構完畢後我們開始考慮擷取資料的業務邏輯,根據業務我們可以假設擷取資料的方法幾乎相同,不同的地方就是傳回的資料實體集合不同,那我們通過何種方法來完成邏輯的有效封裝,這是需要考慮的問題。

經過思考,我構想出了一種方法

1、定義一個抽象基類,在其中定義公共業務邏輯(GetDate)方法,然後定義一個抽象方法,抽象方法需要被子類繼承,而子類需要做的就是覆寫父類的GetData方法,唯一需要修改的就是傳遞的實體——可以采用泛型變量的形式去實作

2、等所有的子類建構完成以後,建立一個簡單工廠,傳遞需要的參數,然後根據參數中的唯一辨別符,執行個體化對應的操作類去執行公共方法。

首先是基類抽象類
/// <summary>
    /// Cdc 資料捕獲服務類
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public abstract class CTBaseService
{
        /// <summary>
        /// 擷取CDC捕獲表的資料
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="schema_table"></param>
        /// <param name="startDateTime"></param>
        /// <returns></returns>
        private List<T> GetRangeList<T>(string schema_table, DateTime startDateTime) where T : class, new()
        {
            //擷取目前需要更新的日期集合清單
            var conn = new SqlConnection(StaticConst.Conn);
            string query = @"SELECT  * ,sys.fn_cdc_map_lsn_to_time(__$start_lsn) AS UpdateTime,[__$operation] AS Operation FROM [cdc].[{2}_CT] WHERE[__$operation] IN(1, 2, 4) AND sys.fn_cdc_map_lsn_to_time(__$start_lsn) > '{0}'
              AND sys.fn_cdc_map_lsn_to_time(__$start_lsn) <= '{1}';";
            string nowDate = DateTime.Now.ToString();
            query = string.Format(query, startDateTime.ToString(), nowDate, schema_table);
            var queryList = conn.Query<T>(query).ToList();
            return queryList;
        }


        /// <summary>
        /// 抽象方法,由父類實作
        /// </summary>
        /// <param name="id"></param>
        /// <param name="schema_table"></param>
        /// <param name="startDateTime"></param>
        public abstract void Work(int id, string schema_table, DateTime startDateTime);


        /// <summary>
        /// 得到CDC捕獲資料并插入隊列
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="id"></param>
        /// <param name="schema_table"></param>
        /// <param name="startDateTime"></param>
        protected void GetRangeListAndInsertQueue<T>(int id, string schema_table, DateTime startDateTime) where T : ExtBase, new()
        {
            //擷取目前需要更新的日期集合清單
            List<T> queryList = GetRangeList<T>(schema_table, startDateTime);

            if (queryList.Count > 0)
            {
                //對集合進行操作 【集合序列化,注意集合反序列化】
                string jsonItem = JsonConvert.SerializeObject(queryList);
                QueueWorker.Instance.EnqueueItem(jsonItem);

                int iret = UpdateServiceLog(id, queryList);
                string nowDate = DateTime.Now.ToString();
                if (iret > 0)
                {
                    Log4NetHelper.Info(string.Format("表名:{0}\r\n 隊列資料:{1} \r\n 插入時間:{2}", schema_table, jsonItem, nowDate));
                }
            }

        }

}
           

子類實作

我們可以發現子類實作非常好了解,正如上文所說,基類提供了一個抽象接口供子類實作,而子類也真的隻需要修改一個實體就可以。如果大家不懂,可以多看幾遍來裂解其中的設計方式。

public class DepartmentCT : CTBaseService
    {
        public override void Work(int id, string schema_table, DateTime startDateTime)
        {
            base.GetRangeListAndInsertQueue<Department>(id, schema_table, startDateTime);
        }
    }
           
public class PersonCT : CTBaseService
    {
        public override void Work(int id, string schema_table, DateTime startDateTime)
        {
          base.GetRangeListAndInsertQueue<Person>(id, schema_table, startDateTime);
        }
    }
           
最後我們建立一個工廠類,工廠類主要負責接受參數并建立對應的CT幫助類,代碼結構如下。根據表名作為唯一辨別符字段,建立***CT服務類,然後因為他們繼承并覆寫了父類抽象方法Work,是以調用.Work方法即可實作擷取資料并插入隊列的功能。
public class CTCExecuteFactory
    {
        /// <summary>
        /// 執行服務
        /// </summary>
        /// <param name="id"></param>
        /// <param name="schema_table"></param>
        /// <param name="updateTime"></param>
        public static void ExecuteService(int id, string schemaName, string tableName, DateTime updateTime)
        {
            CTBaseService service = null;
            switch (tableName.ToLower())
            {
                case ServiceTables.person: service = new PersonCT(); break;
                case ServiceTables.department: service = new DepartmentCT(); break;
                default: break;
            }

            if (service != null) service.Work(id, string.Format("{0}_{1}", schemaName, tableName), updateTime);
        }
    }
           
其他子產品的代碼我覺得屬于正常了解範圍内的東西,不予說明,有興趣的可自行下載下傳代碼檢視具體功能。 下載下傳連結