天天看點

有贊大資料 DataX 演進之路

有贊大資料 DataX 演進之路

文| 小木 on 大資料

一、需求

有贊大資料技術應用的早期,我們使用 Sqoop 作為資料同步工具,滿足了 MySQL 與 Hive 之間資料同步的日常開發需求。

随着公司業務發展,資料同步的場景越來越多,主要是 MySQL、Hive 與文本檔案之間的資料同步,Sqoop 已經不能完全滿足我們的需求。在2017年初,我們已經無法忍受 Sqoop 給我們帶來的折磨,準備改造我們的資料同步工具。當時有這麼些很最痛的需求:

  • 多次因 MySQL 變更引起的資料同步異常。MySQL 需要支援讀寫分離與分表分庫模式,而且要相容可能的資料庫遷移、節點當機以及主從切換
  • 有不少異常是因為表結構變更導緻。MySQL 或 Hive 的表結構都可能發生變更,需要相容多數的表結構不一緻情況
  • MySQL 讀寫操作不要影響線上業務,不要觸發 MySQL 的運維告警,不想天天被 DBA 噴
  • 希望支援更多的資料源,如 HBase、ES、文本檔案

作為資料平台管理者,還希望收集到更多運作細節,友善日常維護:

  • 統計資訊采集,例如運作時間、資料量、消耗資源
  • 髒資料校驗和上報
  • 希望運作日志能接入公司的日志平台,友善監控

二、選型

基于上述的資料同步需求,我們計劃基于開源做改造,考察的對象主要是 DataX 和 Sqoop,它們之間的功能對比如下

功能 DataX Sqoop
運作模式 單程序多線程 MR
MySQL讀寫 單機壓力大;讀寫粒度容易控制 mr模式重,寫出錯處理麻煩
Hive讀寫 單機壓力大 很好
檔案格式 orc支援 orc不支援,可添加
分布式 不支援,可以通過排程系統規避 支援
流控 有流控功能 需要定制
統計資訊 已有一些統計,上報需定制 沒有,分布式的資料收集不友善
資料校驗 在core部分有校驗功能 沒有,分布式的資料收集不友善
監控 需要定制 需要定制
社群 開源不久,社群不活躍 一直活躍,核心部分變動很少

DataX 主要的缺點在于單機運作,而這個可以通過排程系統規避,其他方面的功能均優于 Sqoop,最終我們選擇了基于 DataX 開發。

三、前期設計

3.1 運作形态

使用 DataX 最重要的是解決分布式部署和運作問題,DataX 本身是單程序的用戶端運作模式,需要考慮如何觸發運作 DataX。

我們決定複用已有的離線任務排程系統,任務觸發由排程系統負責,DataX 隻負責資料同步。這樣就複用的系統能力,避免重複開發。關于排程系統,可參考文章​​《大資料開發平台(Data Platform)在有贊的最佳實踐》​​。

有贊大資料 DataX 演進之路

DataX在有贊大資料平台的上下文

在每個資料平台的 worker 伺服器,都會部署一個 DataX 用戶端,運作時可同時啟動多個程序,這些都由排程系統控制。

3.2 執行器設計

為了與已有的資料平台互動,需要做一些定制修改:

  • 符合平台規則的狀态上報,如啟動/運作中/結束,運作時需上報進度,結束需上報成功失敗
  • 符合平台規則的運作日志實時上報,用于展示
  • 統計、校驗、流控等子子產品的參數可從平台傳入,并需要對結果做持久化
  • 需要對異常輸入做好相容,例如 MySQL 主從切換、表結構變更

3.3 開發政策

大緻的運作流程是:

前置配置檔案轉換、表結構校驗->(輸入->DataX核心+業務無關的校驗->輸出)->後置統計/持久化

盡量保證 DataX 專注于資料同步,盡量不隐含業務邏輯,把有贊特有的業務邏輯放到 DataX 之外,資料同步過程無法滿足的需求,才去修改源碼。

表結構、表命名規則、位址轉換這些運作時前置校驗邏輯,以及運作結果的持久化,放在中繼資料系統(參考《有贊資料倉庫中繼資料系統實踐》),而運作狀态的監控放在排程系統。

四、源碼改造之路

4.1 支援 Hive 讀寫

DataX 并沒有自帶 Hive 的 reader 和 writer,而隻有 HDFS 的 reader 和 writer。我們選擇在 DataX 之外封裝,把 Hive 讀寫操作的配置檔案,轉換為 HDFS 讀寫的配置檔案,另外輔助上 Hive DDL 操作。具體的,我們做了如下改造:

4.1.1 Hive 讀操作

  • 根據表名,拼接出 HDFS 路徑。有贊的資料倉庫規範裡有一條,禁止使用外部表,這使得 HDFS 路徑拼接變得容易。若是外部表,就需要從中繼資料系統擷取相應的路徑
  • Hive 的表結構擷取,需要依賴中繼資料系統。還需對 Hive 表結構做校驗,後面會詳細說明

4.1.2 Hive 寫操作

  • 寫 Hive 的配置裡不會指定 Hive 的檔案格式、分隔符,需要讀取中繼資料,擷取這些資訊填入 HDFS 的寫配置檔案
  • 支援建立不存在的 Hive 表或分區,能建構出符合資料倉庫規範的建表語句

4.2 MySQL -> Hive 相容性

按 DataX 的設計理念,reader 和 writer 互相不用關心,但實際使用經常需要關聯考慮才能避免運作出錯。MySQL 加減字段,或者字段類型變更,都會導緻 MySQL 和 Hive 的表結構不一緻,需要避免這種不一緻的運作出錯。

4.2.1 MySQL -> Hive 非分區表

非分區表都是全量導入,以 mysqlreader 配置為準。如果 MySQL 配置字段與 Hive 實際結構不一緻,則把 Hive 表 drop 掉後重建。表重建可能帶來下遊 Hive SQL 出錯的風險,這個靠 SQL 的定時檢查規避。

Hive 表重建時,需要做 MySQL 字段轉換為 Hive 類型,比如 MySQL 的 varchar 轉為 Hive 的 string。這裡有坑,Hive 沒有無符号類型,注意 MySQL 的 int unsigned 的取值範圍,需要向上轉型,轉為 Hive 的 bigint;同理,MySQL 的 bigint unsigned 也需要向上轉型,我們根據實際業務情況大膽轉為 bigint。而 Hive 的 string 是萬能類型,如果不知道怎麼轉,用 string 是比較保險的。

4.2.1 MySQL -> Hive 分區表

Hive 分區表不能随意變更表結構,變更可能會導緻舊分區資料讀取異常。是以寫Hive 分區表時,以 Hive 表結構為準,表結構不一緻則直接報錯。我們采取了如下的政策

MySQL字段 Hive實際字段 處理方法
a,b a,b 正常
a,b,c a,b 忽略MySQL的多餘字段,以Hive為準
b,a a,b 順序不對,調整
a a,b MySQL少一個,報錯
a,c a,b 不比對, 報錯
未指定字段 a,b 以Hive為準

這麼做偏保守,對于無害的 Hive 分區表變更,其實可以大膽去做,比如 int 類型改 bigint、orc 表加字段。

4.3 适配 MySQL 叢集

有贊并沒有獨立運作的 MySQL 執行個體,都是由 RDS 中間件管理着 MySQL 叢集,有讀寫分離和分表分庫兩種模式。讀寫 MySQL 有兩種選擇,通過 RDS 中間件讀寫,以及直接讀寫 MySQL 執行個體。

方案 優先 缺點
連執行個體 性能好;不影響線上業務 當備庫維護或切換位址時,需要修改配置;開發者不知道備庫位址
連 RDS 與普通應用一緻;屏蔽了後端維護 對 RDS 造成額外壓力,有影響線上業務的風險;需要完全符合公司 SQL 規範

對于寫 MySQL,寫入的資料量一般不大,DataX 選擇連 RDS,這樣就不用額外考慮主從複制。

對于讀 MySQL,考慮到有大量的全表同步任務,特别是淩晨離線任務高峰流量特别大,避免大流量對 RDS 中間件的沖擊,DataX 選擇直連到 MySQL 執行個體去讀取資料。為了規避 MySQL 維護帶來的位址變更風險,我們又做了幾件事情:

  • 中繼資料維護了标準的 RDS 中間件位址
  • 主庫、從庫、RDS 中間件三者位址可以關聯和任意轉換
  • 每次 DataX 任務啟動時,擷取最新的主庫和從庫位址
  • 定期的 MySQL 連通性校驗
  • 與 DBA 建立協作關系,變更提前通知

讀取 MySQL 時,對于讀寫分離,每次擷取其中一個從庫位址并連接配接;對于分表分庫,我們有1024分片,就要轉換出1024個從庫位址,拼接出 DataX 的配置檔案。

4.4 MySQL 運維規範的相容

4.4.1 避免慢 SQL

前提是有贊的 MySQL 建表規範,規定了建表必須有 int 自增主鍵。另一條運維規範,SQL 運作超過2s會被強行 kill 掉。

以讀取 MySQL 全表為例,我們把一條全表去取的 SQL,拆分為很多條小 SQL,而每條小 SQL 隻走主鍵 id 的聚簇索引,代碼如下 ​

​select ... from table_name where id>? by id asc limit ?​

4.4.2 避免過快讀寫影響其他業務

執行完一條 SQL 後會強制 sleep 一下,讓系統不能太忙。無論是 insert 的 batchSize,還是 select 每次分頁大小,我們都是動态生成的,根據上一條運作的時間,運作太快就多 sleep,運作太慢就少 sleep,同時調整下一個批次的數量。

這裡還有改進的空間,可以根據系統級的監控名額動态調整速率,比如磁盤使用率、CPU 使用率、binlog 延遲等。實際運作中,删資料很容易引起 binlog 延遲,僅從 delete 語句運作時間無法判斷是否删的太快,具體原因尚未去深究。

4.5 更多的插件

除了最常用的 MySQL、Hive,以及邏輯比較簡單的文本,我們還對 HBase 的讀寫根據業務情況做了簡單改造。

我們還全新開發了 eswriter,以及有贊 kvds 的 kvwriter,這些都是由相關存儲的開發者負責開發和維護插件。

4.6 與大資料體系互動

4.6.1 上報運作統計資料

DataX 自帶了運作結果的統計資料,我們希望把這些統計資料上報到中繼資料系統,作為 ETL 的過程中繼資料存儲下來。

基于我們的開發政策,不要把有贊中繼資料系統的 api 嵌入 DataX 源碼,而是在 DataX 之外擷取 stdout,截取出列印的統計資訊再上報。

4.6.2 與資料平台的互動

資料平台提供了 DataX 任務的編輯頁面,儲存後會留下 DataX 運作配置檔案以及排程周期在平台上。排程系統會根據排程周期和配置檔案,定時啟動 DataX 任務,每個 DataX 任務以獨立程序的方式運作,程序退出後任務結束。運作中,會把 DataX 的日志實時傳輸并展示到頁面上。

4.7 考慮更多異常

DataX 代碼中多數場景暴力的使用 ​

​catchException​

​,缺乏對各異常場景的相容或重試,一個大任務執行過程中出現網絡、IO等異常容易引起任務失敗。最常見的異常就是 SQLException,需要對異常做分類處理,比如 SQL 異常考慮重試,批量處理異常改走單條依次處理,網絡異常考慮資料庫連接配接重建。

HDFS 對異常容忍度較高,DataX 較少捕獲異常。

4.8 測試場景改造

4.8.1 持續內建

為了發現低級問題,例如表遷移了但任務還在、普通表改成了分區表,我們每天晚上20點以後,會把當天運作的所有重要 DataX 任務“重放”一遍。

這不是原樣重放,而是在配置檔案裡加入了一個測試的辨別,DataX 啟動後,reader 部分隻會讀取一行資料,而 writer 會把目标位址指向一個測試的空間。這個測試能保證 DataX基本功能沒問題,以及整個運作環境沒有問題。

4.8.2 全鍊路壓測場景

有贊全鍊路壓測系統通過 Hive 來生成資料,通過 DataX 把生成好的資料導入影子庫。影子庫是一種建在生産 MySQL 裡的 database,對普通應用不可見,加上 SQL 的特殊 hint 才可以通路。

生産環境的全鍊路壓測是個高危操作,一旦配置檔案有誤可能會破壞真實的生産資料。DataX 的 MySQL 讀寫參數裡,加上了全鍊路壓測的标記時,隻能讀寫特定的 MySQL 和 Hive 庫,并配置資料平台做好醒目的提醒。

五、線上運作情況

DataX 是在2017年二季度正式上線,到2017年Q3完成了上述的大部分特性開發,後續僅做了少量修補。到2019年Q1,已經穩定運作了超過20個月時間,目前每天運作超過6000個 DataX 任務,傳輸了超過100億行資料,是資料平台裡比較穩定的一個元件。

期間出現過一些小問題,有一個印象深刻。原生的 hdfsreader 讀取超大 orc 檔案有 bug,orc 的讀 api 會把大檔案分片成多份,預設大于256MB會分片,而 datax 僅讀取了第一個分片,修改為讀取所有分片解決問題。因為256MB足夠大,這個問題很少出現很隐蔽。除此之外沒有發現大的 bug,平時遇到的問題,多數是運作環境或使用者了解的問題,或是可以克服的小問題。

六、下一步計劃

對于 DataX 其實并沒有再多的開發計劃。在需求清單裡積累了十幾條改進需求,而這些需求即便1年不去改進,也不會影響線上運作,諸如髒資料可讀性改進、支援 HDFS HA。這些不重要不緊急的需求,暫時不會再投入去做。

DataX 主要解決批量同步問題,無法滿足多數增量同步和實時同步的需求。對于增量同步我們也有了成熟方案,會有另一篇文章介紹我們自研的增量同步産品。

-The End-

号外:為讀者持續幾份最新教程,覆寫了 Spring Boot、Spring Cloud、微服務架構等。

擷取方式:下面關注公衆号,并回複 java 或 666 領取。