天天看點

【資料同步】DataX3.0

一. DataX3.0概覽

​ DataX 是一個異構資料源離線同步工具,緻力于實作包括關系型資料庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構資料源之間穩定高效的資料同步功能。

【資料同步】DataX3.0
  • 設計理念
  • 為了解決異構資料源同步問題,DataX将複雜的網狀的同步鍊路變成了星型資料鍊路,DataX作為中間傳輸載體負責連接配接各種資料源。當需要接入一個新的資料源的時候,隻需要将此資料源對接到DataX,便能跟已有的資料源做到無縫資料同步。
  • 目前使用現狀
  • DataX在阿裡巴巴集團内被廣泛使用,承擔了所有大資料的離線同步業務,并已持續穩定運作了6年之久。目前每天完成同步8w多道作業,每日傳輸資料量超過300TB。

此前已經開源DataX1.0版本,此次介紹為阿裡雲開源全新版本DataX3.0,有了更多更強大的功能和更好的使用體驗。Github首頁位址:GitHub - alibaba/DataX: DataX是阿裡雲DataWorks資料內建的開源版本。

二、DataX3.0架構設計

DataX本身作為離線資料同步架構,采用Framework + plugin架構建構。将資料源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步架構中。

  • Reader:Reader�為資料采集子產品,負責采集資料源的資料,将資料發送給Framework。
  • Writer: Writer為資料寫入子產品,負責不斷向Framework取資料,并将資料寫入到目的端。
  • Framework:Framework用于連接配接reader和writer,作為兩者的資料傳輸通道,并處理緩沖,流控,并發,資料轉換等核心技術問題。
【資料同步】DataX3.0

三. DataX3.0插件體系

​ 經過幾年積累,DataX目前已經有了比較全面的插件體系,主流的RDBMS資料庫、NOSQL、大資料計算系統都已經接入。DataX目前支援資料如下:

類型 資料源 Reader(讀) Writer(寫) 文檔
RDBMS 關系型資料庫 MySQL

Oracle         √         √    

OceanBase       √         √    

SQLServer

PostgreSQL

DRDS

達夢

通用RDBMS(支援所有關系型資料庫)

阿裡雲數倉資料存儲 ODPS

ADS
OSS

OCS

NoSQL資料存儲 OTS

Hbase0.94

Hbase1.1

MongoDB

Hive

無結構化資料存儲 TxtFile

FTP

HDFS

Elasticsearch

DataX Framework提供了簡單的接口與插件互動,提供簡單的插件接入機制,隻需要任意加上一種插件,就能無縫對接其他資料源。詳情請看:DataX資料源指南

�四、DataX3.0核心架構

DataX 3.0 開源版本支援單機多線程模式完成同步作業運作,本小節按一個DataX作業生命周期的時序圖,從整體架構設計非常簡要說明DataX各個子產品互相關系。

【資料同步】DataX3.0

核心子產品介紹:

  1. DataX完成單個資料同步的作業,我們稱之為Job,DataX接受到一個Job之後,将啟動一個程序來完成整個作業同步過程。DataX Job子產品是單個作業的中樞管理節點,承擔了資料清理、子任務切分(将單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
  2. DataXJob啟動後,會根據不同的源端切分政策,将Job切分成多個小的Task(子任務),以便于并發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分資料的同步工作。
  3. 切分多個Task之後,DataX Job會調用Scheduler子產品,根據配置的并發資料量,将拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的并發運作完畢配置設定好的所有Task,預設單個任務組的并發數量為5。
  4. 每一個Task都由TaskGroup負責啟動,Task啟動後,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。
  5. DataX作業運作起來之後, Job監控并等待多個TaskGroup子產品任務完成,等待所有TaskGroup任務完成後Job成功退出。否則,異常退出,程序退出值非0

DataX排程流程:

舉例來說,使用者送出了一個DataX作業,并且配置了20個并發,目的是将一個100張分表的mysql資料同步到odps裡面。 DataX的排程決策思路是:

  1. DataXJob根據分庫分表切分成了100個Task。
  2. 根據20個并發,DataX計算共需要配置設定4個TaskGroup。
  3. 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個并發共計運作25個Task。

五、DataX 3.0六大核心優勢

  • 可靠的資料品質監控
    • 完美解決資料傳輸個别類型失真問題
    • DataX舊版對于部分資料類型(比如時間戳)傳輸一直存在毫秒階段等資料失真情況,新版本DataX3.0已經做到支援所有的強資料類型,每一種插件都有自己的資料類型轉換政策,讓資料可以完整無損的傳輸到目的端。
    • 提供作業全鍊路的流量、資料量�運作時監控
    • DataX3.0運作過程中可以将作業本身狀态、資料流量、資料速度、執行進度等資訊進行全面的展示,讓使用者可以實時了解作業狀态。并可在作業執行過程中智能判斷源端和目的端的速度對比情況,給予使用者更多性能排查資訊。
    • 提供髒資料探測
    • 在大量資料的傳輸過程中,必定會由于各種原因導緻很多資料傳輸報錯(比如類型轉換錯誤),這種資料DataX認為就是髒資料。DataX目前可以實作髒資料精确過濾、識别、采集、展示,為使用者提供多種的髒資料處理模式,讓使用者準确把控資料品質大關!
  • 豐富的資料轉換功能
  • DataX作為一個服務于大資料的ETL工具,除了提供資料快照搬遷功能之外,還提供了豐富資料轉換的功能,讓資料在傳輸過程中可以輕松完成資料脫敏,補全,過濾等資料轉換功能,另外還提供了自動groovy函數,讓使用者自定義轉換函數。詳情請看DataX3的transformer詳細介紹。
  • 精準的速度控制
  • 還在為同步過程對線上存儲壓力影響而擔心嗎?新版本DataX3.0提供了包括通道(并發)、記錄流、位元組流三種流控模式,可以随意控制你的作業速度,讓你的作業在庫可以承受的範圍内達到最佳的同步速度。

"speed": { "channel": 5, "byte": 1048576, "record": 10000 }

  • 強勁的同步性能
  • DataX3.0每一種讀插件都有一種或多種切分政策,都能将作業合理切分成多個Task并行執行,單機多線程執行模型可以讓DataX速度随并發成線性增長。在源端和目的端性能都足夠的情況下,單個作業一定可以打滿網卡。另外,DataX團隊對所有的已經接入的插件都做了極緻的性能優化,并且做了完整的性能測試。性能測試相關詳情可以參照每單個資料源的詳細介紹:DataX資料源指南
  • 健壯的容錯機制
  • DataX作業是極易受外部因素的幹擾,網絡閃斷、資料源不穩定等因素很容易讓同步到一半的作業報錯停止。是以穩定性是DataX的基本要求,在DataX 3.0的設計中,重點完善了架構和插件的穩定性。目前DataX3.0可以做到線程級别、程序級别(暫時未開放)、作業級别多層次局部/全局的重試,保證使用者的作業穩定運作。
    • 線程内部重試
    • DataX的核心插件都經過團隊的全盤review,不同的網絡互動方式都有不同的重試政策。
    • 線程級别重試
    • 目前DataX已經可以實作TaskFailover,針對于中間失敗的Task,DataX架構可以做到整個Task級别的重新排程。
  • 極簡的使用體驗
    • 易用
    • 下載下傳即可用,支援linux和windows,隻需要短短幾步驟就可以完成資料的傳輸。請點選:Quick Start
    • 詳細
    • DataX在運作日志中列印了大量資訊,其中包括傳輸速度,Reader、Writer性能,程序CPU,JVM和GC情況等等。
      • 傳輸過程中列印傳輸速度、進度等
【資料同步】DataX3.0
      • 傳輸過程中會列印程序相關的CPU、JVM等
【資料同步】DataX3.0
      • 在任務結束之後,列印總體運作情況
【資料同步】DataX3.0

安裝部署

System Requirements

  • Linux
  • JDK(1.8以上,推薦1.8)
  • Python(推薦Python2.6.X)
  • Apache Maven 3.x
  • (Compile DataX)

Quick Start

  • 工具部署
    • 方法一、直接下載下傳DataX工具包:DataX下載下傳位址
    • 下載下傳後解壓至本地某個目錄,進入bin目錄,即可運作同步作業:

$ cd {YOUR_DATAX_HOME}/bin $ python datax.py {YOUR_JOB.json}

    • 自檢腳本:    python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json
    • 方法二、下載下傳DataX源碼,自己編譯:DataX源碼
    • (1)、下載下傳DataX源碼:

$ git clone [email protected]:alibaba/DataX.git

    • (2)、通過maven打包:

$ cd {DataX_source_code_home} $ mvn -U clean package assembly:assembly -Dmaven.test.skip=true

    • 打包成功,日志顯示如下:

[INFO] BUILD SUCCESS [INFO] ----------------------------------------------------------------- [INFO] Total time: 08:12 min [INFO] Finished at: 2015-12-13T16:26:48+08:00 [INFO] Final Memory: 133M/960M [INFO] -----------------------------------------------------------------

    • 打包成功後的DataX包位于 {DataX_source_code_home}/target/datax/datax/ ,結構如下:

$ cd {DataX_source_code_home} $ ls ./target/datax/datax/ bin conf job lib log log_perf plugin script tmp

  • 配置示例:從stream讀取資料并列印到控制台
    • 第一步、建立作業的配置檔案(json格式)
    • 可以通過指令檢視配置模闆: python datax.py -r {YOUR_READER} -w {YOUR_WRITER}

$ cd {YOUR_DATAX_HOME}/bin $ python datax.py -r streamreader -w streamwriter DataX (UNKNOWN_DATAX_VERSION), From Alibaba ! Copyright (C) 2010-2015, Alibaba Group. All Rights Reserved. Please refer to the streamreader document: https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md Please refer to the streamwriter document: https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md Please save the following configuration as a json file and use python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json to run the job. { "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [], "sliceRecordCount": "" } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "", "print": true } } } ], "setting": { "speed": { "channel": "" } } } }

    • 根據模闆配置json如下:

#stream2stream.json { "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "sliceRecordCount": 10, "column": [ { "type": "long", "value": "10" }, { "type": "string", "value": "hello,你好,世界-DataX" } ] } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } } } ], "setting": { "speed": { "channel": 5 } } } }

    • 第二步:啟動DataX

$ cd {YOUR_DATAX_DIR_BIN} $ python datax.py ./stream2stream.json

    • 同步結束,顯示日志如下:

... 2015-12-17 11:20:25.263 [job-0] INFO JobContainer - 任務啟動時刻 : 2015-12-17 11:20:15 任務結束時刻 : 2015-12-17 11:20:25 任務總計耗時 : 10s 任務平均流量 : 205B/s 記錄寫入速度 : 5rec/s 讀出記錄總數 : 50 讀寫失敗總數 : 0