天天看點

DataX簡介、部署、原理和使用介紹1.DataX簡介2.DataX原理3.DataX安裝部署4.DataX使用介紹5.DataX常見的參數設定

DataX簡介、部署、原理和使用介紹

1.DataX簡介

1-1.項目位址

項目位址:https://github.com/alibaba/DataX

官方文檔:https://github.com/alibaba/DataX/blob/master/introduction.md

1-2.DataX概述

​ DataX 是阿裡雲 DataWorks資料內建 的開源版本,在阿裡巴巴集團内被廣泛使用的離線資料同步工具/平台。DataX 實作了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各種異構資料源之間高效的資料同步功能

​ DataX本身作為資料同步架構,将不同資料源的同步抽象為從源頭資料源讀取資料的Reader插件,以及向目标端寫入資料的Writer插件,理論上DataX架構可以支援任意資料源類型的資料同步工作。同時DataX插件體系作為一套生态系統, 每接入一套新資料源該新加入的資料源即可實作和現有的資料源互通

1-3.DataX支援的資料源

DataX目前已經有了比較全面的插件體系,主流的RDBMS資料庫、NOSQL、大資料計算系統都已經接入

類型 資料源 Reader(讀) Writer(寫) 文檔
RDBMS 關系型資料庫 MySQL 讀 、寫
Oracle 讀 、寫
OceanBase 讀 、寫
SQLServer 讀 、寫
PostgreSQL 讀 、寫
DRDS 讀 、寫
Kingbase 讀 、寫
通用RDBMS(支援所有關系型資料庫) 讀 、寫
阿裡雲數倉資料存儲 ODPS 讀 、寫
ADB
ADS
OSS 讀 、寫
OCS
Hologres
AnalyticDB For PostgreSQL
阿裡雲中間件 datahub 讀 、寫
SLS 讀 、寫
阿裡雲圖資料庫 GDB 讀 、寫
NoSQL資料存儲 OTS 讀 、寫
Hbase0.94 讀 、寫
Hbase1.1 讀 、寫
Phoenix4.x 讀 、寫
Phoenix5.x 讀 、寫
MongoDB 讀 、寫
Cassandra 讀 、寫
數倉資料存儲 StarRocks 讀 、寫
ApacheDoris
ClickHouse
Databend
Hive 讀 、寫
kudu
無結構化資料存儲 TxtFile 讀 、寫
FTP 讀 、寫
HDFS 讀 、寫
Elasticsearch
時間序列資料庫 OpenTSDB
TSDB 讀 、寫
TDengine 讀 、寫

1-4.DataX特點

  • 可靠的資料品質監控
  • 豐富的資料轉換功能
  • 精準的速度控制
  • 強勁的同步性能
  • 健壯的容錯機制
  • 極簡的使用體驗

2.DataX原理

2-1.DataX設計理念

  1. 異構資料源同步問題,就是不同架構之間同步資料時,相同的資料在不同架構中具有不同的資料結構。
  2. DataX的設計理念:

    DataX将複雜的網狀的同步鍊路變成了星型資料鍊路,DataX作為中間傳輸載體負責連接配接資料各種資料源。

    當需要接入一個新的資料源的時候,隻需要将此資料源對接到DataX,便能跟已有的資料源做到無縫資料同步。

DataX簡介、部署、原理和使用介紹1.DataX簡介2.DataX原理3.DataX安裝部署4.DataX使用介紹5.DataX常見的參數設定

2-2.DataX架構設計

DataX本身作為離線資料同步架構,采用Framework+plugin架構建構。将資料源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步架構中

DataX簡介、部署、原理和使用介紹1.DataX簡介2.DataX原理3.DataX安裝部署4.DataX使用介紹5.DataX常見的參數設定
  • Reader
    • 資料采集子產品,負責采集資料源的資料,将資料發送給Framework
  • Writer
    • 資料寫入子產品,負責不斷從Framework取資料,并将資料寫出到目的端。
  • Framework
    • 主題架構,用于連接配接Reader和Writer,作為兩者的資料傳輸通道,并處理緩沖、流控、并發、資料轉換等核心技術問題

Framework的幾大功能

  • 緩沖

Reader 和 Writer 可能會有

讀寫速度不一緻

的情況,是以中間需要一個元件作為緩沖

  • 流控

控制資料傳輸的速度,DataX 可以随意

根據需求調整

資料傳輸速度

  • 并發

并發的同步或寫入資料

  • 資料轉換

既然是異構,那麼說明讀 Reader 的資料源與 寫 Writer 的資料源 資料結構可能不同,資料結構不同的話,需要做資料轉換操作,轉換也在 Framework 中完成

2-3.DataX運作流程

DataX支援單機多線程模式完成同步作業,下面用一個DataX作業生命周期的時序圖,用以說明DataX的運作流程、核心概念以及每個概念的關系

DataX簡介、部署、原理和使用介紹1.DataX簡介2.DataX原理3.DataX安裝部署4.DataX使用介紹5.DataX常見的參數設定
  • 核心子產品介紹:
  1. DataX完成單個資料同步的作業,我們稱之為Job,DataX接受到一個Job之後,将啟動一個程序來完成整個作業同步過程。DataX Job子產品是單個作業的中樞管理節點,承擔了資料清理、子任務切分(将單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
  2. DataXJob啟動後,會根據不同的源端切分政策,将Job切分成多個小的Task(子任務),以便于并發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分資料的同步工作。
  3. 切分多個Task之後,DataX Job會調用Scheduler子產品,根據配置的并發資料量,将拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的并發運作完畢配置設定好的所有Task,預設單個任務組的并發數量為5。
  4. 每一個Task都由TaskGroup負責啟動,Task啟動後,會固定啟動Reader—>Channel—>Writer的線程來完成任務同步工作。
  5. DataX作業運作起來之後, Job監控并等待多個TaskGroup子產品任務完成,等待所有TaskGroup任務完成後Job成功退出。否則,異常退出,程序退出值非0

2-4.DataX排程政策

舉例來說,使用者送出了一個DataX作業,并且配置了20個并發,目的是将一個100張分表的mysql資料同步到odps裡面。 DataX的排程決策思路是:

  1. DataXJob根據分庫分表切分成了100個Task。
  2. 根據20個并發,DataX計算共需要配置設定4個TaskGroup。
  3. 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個并發共計運作25個Task。

3.DataX安裝部署

# 下載下傳安裝包
[[email protected] ~]$ cd /opt/software/
[[email protected] software]$ wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz
# 解壓安裝包
[[email protected] software]$ tar -xf datax.tar.gz -C /opt/module
[[email protected] software]$ cd /opt/module/datax/
# DataX自檢任務
[[email protected] datax]$ python bin/datax.py job/job.json
2023-02-15 19:01:48.489 [job-0] INFO  JobContainer -
任務啟動時刻                    : 2023-02-15 19:01:38
任務結束時刻                    : 2023-02-15 19:01:48
任務總計耗時                    :                 10s
任務平均流量                    :          253.91KB/s
記錄寫入速度                    :          10000rec/s
讀出記錄總數                    :              100000
讀寫失敗總數                    :                   0
# 成功執行自檢任務
           

4.DataX使用介紹

4-1.同步MySQl全量資料到HDFS案例

将MySQL的全量資料,利用DataX工具同步至HDFS

1.檢視MySQL被遷移的資料情況

2.根據需求确定reader為mysqlreader,writer為hdfswriter

檢視reader和writer模闆的方式(-r 讀模闆; -w 寫模闆):
[[email protected] datax]$ python bin/datax.py -r mysqlreader -w hdfswriter
           

3.編寫同步json腳本

4.确定HDFS上目标路徑是否存在

5.通過datax.py指定json任務運作同步資料

6.資料驗證,檢視HDFS上是否已經有MySQL對應表中的所有資料

這裡先跑通一個實驗案例,再根據操作來總結
  • MySQL資料:
[[email protected] ~]$ mysql -uroot -p123456 -Dwangtingdb
mysql> select * from test;
+------+---------+
| id   | name    |
+------+---------+
|    1 | wang111 |
|    2 | wang222 |
|    3 | wang333 |
+------+---------+
3 rows in set (0.00 sec)
           
  • 同步任務定義:
[[email protected] datax]$ vim job/mysql2hdfs.json
           

mysql2hdfs.json内容:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": ["id","name"],
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://hdt-dmcp-ops05:3306/wangtingdb"],
                "table": ["test"]
              }
            ],
            "password": "123456",
            "username": "root",
            "splitPk": ""
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "column": [
              {"name": "id","type": "bigint"},
              {"name": "name","type": "string"}
            ],
            "compress": "gzip",
            "defaultFS": "hdfs://hdt-dmcp-ops01:8020",
            "fieldDelimiter": "\t",
            "fileName": "test",
            "fileType": "text",
            "path": "/test",
            "writeMode": "append"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": "1"
      }
    }
  }
}
           
  • 任務執行:
[[email protected] datax]$ hdfs dfs -mkdir /test
2023-02-15 19:25:04,683 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
[[email protected] datax]$ python bin/datax.py job/mysql2hdfs.json
...
...
2023-02-15 19:25:43.656 [job-0] INFO  JobContainer -
         [total cpu info] =>
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu
                -1.00%                         | -1.00%                         | -1.00%


         [total gc info] =>
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime
                 PS MarkSweep         | 1                  | 1                  | 1                  | 0.022s             | 0.022s             | 0.022s
                 PS Scavenge          | 1                  | 1                  | 1                  | 0.010s             | 0.010s             | 0.010s

2023-02-15 19:25:43.656 [job-0] INFO  JobContainer - PerfTrace not enable!
2023-02-15 19:25:43.656 [job-0] INFO  StandAloneJobContainerCommunicator - Total 3 records, 24 bytes | Speed 2B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s|  All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-02-15 19:25:43.657 [job-0] INFO  JobContainer -
任務啟動時刻                    : 2023-02-15 19:25:32
任務結束時刻                    : 2023-02-15 19:25:43
任務總計耗時                    :                 11s
任務平均流量                    :                2B/s
記錄寫入速度                    :              0rec/s
讀出記錄總數                    :                   3
讀寫失敗總數                    :                   0
           
  • 驗證
[[email protected] datax]$ hdfs dfs -ls /test
2023-02-15 19:28:48,080 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 1 items
-rw-r--r--   3 wangting supergroup         43 2023-02-15 19:25 /test/test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
[[email protected] datax]$ hdfs dfs -get /test/test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz ~/
2023-02-15 19:29:41,378 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2023-02-15 19:29:41,786 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
[[email protected] datax]$ cd ~
[[email protected] ~]$ ll
total 8
drwxrwxr-x 2 wangting wangting 4096 Feb 15 15:13 bin
-rw-r--r-- 1 wangting wangting   43 Feb 15 19:29 test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
[[email protected] ~]$ gunzip test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
[[email protected] ~]$ ll
total 8
drwxrwxr-x 2 wangting wangting 4096 Feb 15 15:13 bin
-rw-r--r-- 1 wangting wangting   30 Feb 15 19:29 test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4
[[email protected] ~]$ cat test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4
1       wang111
2       wang222
3       wang333
           

截至到這裡,可以看到最終資料檔案的内容和原MySQL資料比對的上

總結:

**MysqlReader插件介紹:**實作了從Mysql讀取資料。在底層實作上,MysqlReader通過JDBC連接配接遠端Mysql資料庫,并執行相應的SQL語句将資料從mysql庫中select出來。

**MysqlReader插件原理:**MysqlReader通過JDBC連接配接器連接配接到遠端的Mysql資料庫,并根據使用者配置的資訊生成查詢語句,然後發送到遠端Mysql資料庫,并将該SQL執行傳回結果使用DataX自定義的資料類型拼裝為抽象的資料集,并傳遞給下遊Writer處理。

**HdfsWriter插件介紹:**提供項HDFS檔案系統指定路徑中寫入TextTile和OrcFile類型的檔案,檔案内容可與Hive表相關聯。

**HdfsWriter:**插件實作過程:首先根據使用者指定的path,建立一個hdfs檔案系統上的不存在的臨時目錄,建立規則是:path_随機;然後将讀取的檔案寫入到這個臨時目錄中;待到全部寫入後,再将這個臨時目錄下的檔案移動到使用者所指定的目錄下,(在建立檔案時保證檔案名不重複);最後删除臨時目錄。如果在中間過程中發生網絡中斷等情況,造成無法與hdfs建立連接配接,需要使用者手動删除已經寫入的檔案和臨時目錄

4-2.同步MySQl需求資料到HDFS案例(where)

相對上個案例的變化:

1.增加了

where

關鍵詞,過濾同步的資料範圍

2.去除了壓縮格式:

"compress": "gzip"

3.更換了分隔符,由原

\t

變成不可見分隔字元

\u0001

[[email protected] datax]$ vim job/mysql2hdfs_2.json
           

mysql2hdfs_2.json腳本内容:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": ["id","name"],
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://hdt-dmcp-ops05:3306/wangtingdb"],
                "table": ["test"]
              }
            ],
            "password": "123456",
            "username": "root",
            "where": "id>=2",
            "splitPk": ""
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "column": [
              {"name": "id","type": "bigint"},
              {"name": "name","type": "string"}
            ],
            "defaultFS": "hdfs://hdt-dmcp-ops01:8020",
            "fieldDelimiter": "\u0001",
            "fileName": "test",
            "fileType": "text",
            "path": "/test",
            "writeMode": "append"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": "1"
      }
    }
  }
}
           

執行任務:

[[email protected] datax]$ python bin/datax.py job/mysql2hdfs_2.json
...
...
2023-02-16 10:37:05.503 [job-0] INFO  JobContainer - PerfTrace not enable!
2023-02-16 10:37:05.504 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2 records, 16 bytes | Speed 1B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s|  All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-02-16 10:37:05.505 [job-0] INFO  JobContainer -
任務啟動時刻                    : 2023-02-16 10:36:54
任務結束時刻                    : 2023-02-16 10:37:05
任務總計耗時                    :                 11s
任務平均流量                    :                1B/s
記錄寫入速度                    :              0rec/s
讀出記錄總數                    :                   2
讀寫失敗總數                    :                   0
           

驗證:

[[email protected] datax]$ hdfs dfs -ls /test
2023-02-16 10:37:45,122 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 2 items
-rw-r--r--   3 wangting supergroup         20 2023-02-16 10:36 /test/test__00adf34b_234b_49d8_8ba6_bd3b94881224
-rw-r--r--   3 wangting supergroup         43 2023-02-15 19:25 /test/test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
[[email protected] datax]$ hdfs dfs -cat /test/test__00adf34b_234b_49d8_8ba6_bd3b94881224
2023-02-16 10:38:07,714 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2023-02-16 10:38:08,111 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2wang222
3wang333
           

可以看到隻有滿足

id>=2

的2條資料被寫入到了HDFS,直接看似乎沒有分隔符,字元相連了,把檔案下載下傳到本地再次驗證

[[email protected] datax]$ hdfs dfs -get /test/test__00adf34b_234b_49d8_8ba6_bd3b94881224 ~/
2023-02-16 10:38:39,679 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2023-02-16 10:38:40,079 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
[[email protected] datax]$ vim ~/test__00adf34b_234b_49d8_8ba6_bd3b94881224
           
DataX簡介、部署、原理和使用介紹1.DataX簡介2.DataX原理3.DataX安裝部署4.DataX使用介紹5.DataX常見的參數設定
注意,

cat

檔案是看不到特殊分隔符的

4-3.同步MySQl需求資料到HDFS案例(傳條件參數)

​ 在生産環境中,離線資料同步任務需要在任務排程平台每日定時重複執行去拉取某個時間視窗的資料,例如每日同步T-1的資料到HDFS,但腳本中如果寫了固定日期,每日任務都需要修改日期條件,顯然不合理。是以為實作這個業務需求,需要使用DataX的傳參功能。

建立測試表:

[[email protected] datax]$ mysql -uroot -p123456
mysql> use wangtingdb;
mysql> create table test_2(id int(11),name varchar(20),updated datetime);
insert into test_2 value(1,"wangting111","2023-02-13 15:13:42");
insert into test_2 value(2,"wangting222","2023-02-13 21:22:12");
insert into test_2 value(3,"wangting333","2023-02-14 09:15:04");
insert into test_2 value(4,"wangting444","2023-02-14 18:00:32");
insert into test_2 value(5,"wangting555","2023-02-15 13:44:30");
insert into test_2 value(6,"wangting666","2023-02-15 22:13:41");
insert into test_2 value(7,"wangting777","2023-02-16 12:22:30");
insert into test_2 value(8,"wangting888","2023-02-16 23:14:52");

mysql> select * from test_2;
+------+-------------+---------------------+
| id   | name        | updated             |
+------+-------------+---------------------+
|    1 | wangting111 | 2023-02-13 15:13:42 |
|    2 | wangting222 | 2023-02-13 21:22:12 |
|    3 | wangting333 | 2023-02-14 09:15:04 |
|    4 | wangting444 | 2023-02-14 18:00:32 |
|    5 | wangting555 | 2023-02-15 13:44:30 |
|    6 | wangting666 | 2023-02-15 22:13:41 |
|    7 | wangting777 | 2023-02-16 12:22:30 |
|    8 | wangting888 | 2023-02-16 23:14:52 |
+------+-------------+---------------------+
8 rows in set (0.01 sec)
           
目前時間為20230216,

拟定2個變量:

START_FLAG=

date -d"1 day ago" +%Y%m%d

END_FLAG=

date +%Y%m%d

[[email protected] datax]$ date -d"1 day ago" +%Y%m%d
20230215
[[email protected] datax]$ date +%Y%m%d
20230216
           

編寫同步腳本任務

[[email protected] datax]$ vim job/mysql2hdfs_3.json
           

mysql2hdfs_3.json腳本任務内容:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": ["id","name","updated"],
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://hdt-dmcp-ops05:3306/wangtingdb"],
                "table": ["test_2"]
              }
            ],
            "password": "123456",
            "username": "root",
            "where": "updated>=${START_FLAG} AND updated<${END_FLAG}",
            "splitPk": ""
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "column": [
              {"name": "id","type": "bigint"},
              {"name": "name","type": "string"}
              {"name": "updated","type": "string"}
            ],
            "defaultFS": "hdfs://hdt-dmcp-ops01:8020",
            "fieldDelimiter": "\t",
            "fileName": "test",
            "fileType": "text",
            "path": "/test",
            "writeMode": "append"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": "1"
      }
    }
  }
}
           

"where": "updated>=${START_FLAG} AND updated<${END_FLAG}"

相當于updated大于等于2023-02-15 00:00:00,小于2023-02-16 00:00:00的資料

目前日期為2月16日,則意為着資料是前一天日内的全量資料

執行任務:

[[email protected] datax]$ python bin/datax.py -p "-DSTART_FLAG=`date -d"1 day ago" +%Y%m%d` -DEND_FLAG=`date +%Y%m%d`" job/mysql2hdfs_3.json
2023-02-16 16:58:22.742 [job-0] INFO  JobContainer - PerfTrace not enable!
2023-02-16 16:58:22.742 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2 records, 40 bytes | Speed 4B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s|  All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-02-16 16:58:22.743 [job-0] INFO  JobContainer -
任務啟動時刻                    : 2023-02-16 16:58:11
任務結束時刻                    : 2023-02-16 16:58:22
任務總計耗時                    :                 11s
任務平均流量                    :                4B/s
記錄寫入速度                    :              0rec/s
讀出記錄總數                    :                   2
讀寫失敗總數                    :                   0
           

驗證資料:

[[email protected] datax]$ hdfs dfs -ls /test
2023-02-16 16:58:37,819 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 3 items
-rw-r--r--   3 wangting supergroup         20 2023-02-16 10:36 /test/test__00adf34b_234b_49d8_8ba6_bd3b94881224
-rw-r--r--   3 wangting supergroup         43 2023-02-15 19:25 /test/test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
-rw-r--r--   3 wangting supergroup         68 2023-02-16 16:58 /test/test__4ffd9840_bc1f_4370_88ab_75d9ee97f1f4
[[email protected] datax]$ hdfs dfs -cat /test/test__4ffd9840_bc1f_4370_88ab_75d9ee97f1f4
2023-02-16 16:58:49,450 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2023-02-16 16:58:49,845 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
5       wangting555     2023-02-15 13:44:30
6       wangting666     2023-02-15 22:13:41
           

可以看到資料隻收取到了T-1日的2條資料

4-4.同步HDFS資料到MySQL案例

準備HDFS檔案目錄

[[email protected] datax]$ hdfs dfs -ls /test
2023-02-16 17:17:50,778 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 3 items
-rw-r--r--   3 wangting supergroup         20 2023-02-16 10:36 /test/test__00adf34b_234b_49d8_8ba6_bd3b94881224
-rw-r--r--   3 wangting supergroup         43 2023-02-15 19:25 /test/test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
-rw-r--r--   3 wangting supergroup         68 2023-02-16 16:58 /test/test__4ffd9840_bc1f_4370_88ab_75d9ee97f1f4
[[email protected] datax]$ hdfs dfs -rm /test/test__00adf34b_234b_49d8_8ba6_bd3b94881224
2023-02-16 17:18:15,296 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Deleted /test/test__00adf34b_234b_49d8_8ba6_bd3b94881224
[[email protected] datax]$ hdfs dfs -rm /test/test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
2023-02-16 17:18:31,868 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Deleted /test/test__34c4b765_ecd9_426f_b64e_96ef6f9dbba4.gz
[[email protected] datax]$ hdfs dfs -ls /test
2023-02-16 17:18:45,837 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 1 items
-rw-r--r--   3 wangting supergroup         68 2023-02-16 16:58 /test/test__4ffd9840_bc1f_4370_88ab_75d9ee97f1f4
[[email protected] datax]$ hdfs dfs -cat /test/test__4ffd9840_bc1f_4370_88ab_75d9ee97f1f4
2023-02-16 17:18:56,387 INFO Configuration.deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
2023-02-16 17:18:56,791 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
5       wangting555     2023-02-15 13:44:30
6       wangting666     2023-02-15 22:13:41

# /test目錄下有一個test__4ffd9840_bc1f_4370_88ab_75d9ee97f1f4檔案
# 檔案中有2條資料
           

建立MySQL被導入的測試表

[[email protected] datax]$ mysql -uroot -p123456 -Dwangtingdb;

mysql> create table test_666 like test_2;
Query OK, 0 rows affected (0.01 sec)

mysql> desc test_666;
+---------+-------------+------+-----+---------+-------+
| Field   | Type        | Null | Key | Default | Extra |
+---------+-------------+------+-----+---------+-------+
| id      | int(11)     | YES  |     | NULL    |       |
| name    | varchar(20) | YES  |     | NULL    |       |
| updated | datetime    | YES  |     | NULL    |       |
+---------+-------------+------+-----+---------+-------+
3 rows in set (0.00 sec)

mysql> select * from test_666;
Empty set (0.00 sec)
# 目前test_666表為空,沒有資料
           
[[email protected] datax]$ vim job/hdfs2mysql.json
           

hdfs2mysql.json任務内容:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "hdfsreader",
          "parameter": {
            "path": "/test",
            "defaultFS": "hdfs://hdt-dmcp-ops01:8020",
            "column": [
			{"index":0,"type":"string"},
			{"index":1,"type":"string"},
			{"index":2,"type":"string"}
			],
            "fileType": "text",
            "encoding": "UTF-8",
            "nullFormat": "\\N",
            "fieldDelimiter": "\t"
          }

        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "replace",
            "username": "root",
            "password": "123456",
            "column": [
              "id",
              "name",
              "updated"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://hdt-dmcp-ops05:3306/wangtingdb?useUnicode=true&characterEncoding=utf-8",
                "table": [
                  "test_666"
                ]
              }
            ]
          }
        }
      }
    ]
  }
}
           

執行任務:

[[email protected] datax]$ python bin/datax.py job/hdfs2mysql.json
2023-02-16 17:19:56.672 [job-0] INFO  JobContainer - PerfTrace not enable!
2023-02-16 17:19:56.673 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2 records, 62 bytes | Speed 6B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s|  All Task WaitReaderTime 0.004s | Percentage 100.00%
2023-02-16 17:19:56.674 [job-0] INFO  JobContainer -
任務啟動時刻                    : 2023-02-16 17:19:45
任務結束時刻                    : 2023-02-16 17:19:56
任務總計耗時                    :                 11s
任務平均流量                    :                6B/s
記錄寫入速度                    :              0rec/s
讀出記錄總數                    :                   2
讀寫失敗總數                    :                   0
           

驗證檢視MySQL

[[email protected] datax]$ mysql -uroot -p123456 -Dwangtingdb -e "select * from test_666;"
mysql: [Warning] Using a password on the command line interface can be insecure.
+------+-------------+---------------------+
| id   | name        | updated             |
+------+-------------+---------------------+
|    5 | wangting555 | 2023-02-15 13:44:30 |
|    6 | wangting666 | 2023-02-15 22:13:41 |
+------+-------------+---------------------+
           

4-5.同步CSV檔案資料倒MySQL案例

  • 準備一個csv檔案用于同步資料
[[email protected] ~]$ cd /opt/module/datax/job/huatong_data/
[[email protected] huatong_data]$ vim prefecture_level_city_quarter.csv
"dbcode","code","cname","ayearmon","regcode","regname","cunit",data,"updatetime"
"djsjd","A0302","社會消費品零售總額_累計增長","2021D","511000","内江市","%",18.2,"2022-02-20 09:29:00"
"djsjd","A0302","社會消費品零售總額_累計增長","2021C","511000","内江市","%",21.1,"2021-11-08 08:14:05"
"djsjd","A0302","社會消費品零售總額_累計增長","2021B","511000","内江市","%",25.1,"2021-07-29 07:23:33"
"djsjd","A0302","社會消費品零售總額_累計增長","2021A","511000","内江市","%",29.9,"2021-07-29 07:23:30"
"djsjd","A0302","社會消費品零售總額_累計增長","2020D","511000","内江市","%",-3.2,"2021-08-02 11:42:00"
"djsjd","A0302","社會消費品零售總額_累計增長","2020B","511000","内江市","%",-7.9,"2021-08-02 11:41:56"
"djsjd","A0302","社會消費品零售總額_累計增長","2020A","511000","内江市","%",-11.9,"2021-08-02 11:41:55"
"djsjd","A0301","社會消費品零售總額_累計值","2021C","511000","内江市","億元",446.38,"2021-11-08 08:14:05"
"djsjd","A0301","社會消費品零售總額_累計值","2020B","511000","内江市","億元",232.43,"2021-08-02 11:41:56"
"djsjd","A0301","社會消費品零售總額_累計值","2020A","511000","内江市","億元",106.81,"2021-08-02 11:41:55"
"djsjd","A0202","城鎮常住居民人均可支配收入_累計增長","2016D","511000","内江市","%",8.53,"2018-12-13 05:21:45"
"djsjd","A0202","城鎮常住居民人均可支配收入_累計增長","2022A","511000","内江市","%",6.4,"2022-05-11 02:11:47"
"djsjd","A0202","城鎮常住居民人均可支配收入_累計增長","2021D","511000","内江市","%",8.9,"2022-05-27 02:14:21"
"djsjd","A0202","城鎮常住居民人均可支配收入_累計增長","2021D","511000","内江市","%",8.9,"2022-02-23 08:23:30"
"djsjd","A0202","城鎮常住居民人均可支配收入_累計增長","2021C","511000","内江市","%",9.8,"2022-02-23 09:49:27"
           
這裡隻提供部分樣例資料用于調試
  • 建立MySQL庫表用于寫入資料
-- 建立db
CREATE DATABASE `huatongdata`;

use huatongdata;

-- 建立地級市季度表
create table prefecture_level_city_quarter(
dbcode varchar(50) comment "次元碼",
code varchar(200) comment "名額編碼",
cname varchar(200) comment "名額名稱",
ayearmon varchar(50) comment "時間期",
regcode varchar(50) comment "地區編碼",
regname varchar(200) comment "地區名稱",
cunit varchar(80) comment "計量機關",
`data` decimal(38,8) comment "數值",
updatetime datetime comment "更新時間"
) comment "地級市季度表";

-- 剛建立的表目前為空
mysql> select * from prefecture_level_city_quarter;
Empty set (0.00 sec)
           
  • 編寫任務腳本
[[email protected] datax]$ vim job/csv2mysql.json
           
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "txtfilereader", 
                    "parameter": {
                    "path": ["/opt/module/datax/job/huatong_data/prefecture_level_city_quarter.csv"], 
                    "encoding":"utf-8",
                        "column": [
                            {
                                "index": 0,
                                "type": "string"
                            },
                            {
                                "index": 1,
                                "type": "string"
                            },
                            {
                                "index": 2,
                                "type": "string"
                            },
							{
                                "index": 3,
                                "type": "string"
                            },
							{
                                "index": 4,
                                "type": "string"
                            },
							{
                                "index": 5,
                                "type": "string"
                            },
							{
                                "index": 6,
                                "type": "string"
                            },
							{
                                "index": 7,
                                "type": "string"
                            },
							{
                                "index": 8,
                                "type": "string"
                            }
                        ],
                    "skipHeader": "true"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                            "dbcode",
                            "code",
                            "cname",
                            "ayearmon",
                            "regcode",
                            "regname",
                            "cunit",
                            "data",
                            "updatetime"
                        ], 
                        "connection": [
                            {
                               "jdbcUrl": "jdbc:mysql://hdt-dmcp-ops05:3306/huatongdata?useUnicode=true&characterEncoding=utf8", 
                               "table": ["prefecture_level_city_quarter"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root",
                        "preSql":[""],
                        "session":["set session sql_mode='ANSI'"],
                        "writeMode":"insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
           
  • 執行同步任務
[[email protected] datax]$ python bin/datax.py job/csv2mysql.json
...
...
2023-02-17 10:36:17.020 [job-0] INFO  JobContainer - PerfTrace not enable!
2023-02-17 10:36:17.021 [job-0] INFO  StandAloneJobContainerCommunicator - Total 173559 records, 10806586 bytes | Speed 1.03MB/s, 17355 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 3.118s |  All Task WaitReaderTime 0.268s | Percentage 100.00%
2023-02-17 10:36:17.022 [job-0] INFO  JobContainer -
任務啟動時刻                    : 2023-02-17 10:36:06
任務結束時刻                    : 2023-02-17 10:36:17
任務總計耗時                    :                 10s
任務平均流量                    :            1.03MB/s
記錄寫入速度                    :          17355rec/s
讀出記錄總數                    :              173559
讀寫失敗總數                    :                   0
           

讀出記錄總數 : 173559

說明本次任務同步到MySQL涉及到173559行

  • 驗證
# 登入MySQL查詢剛建立的prefecture_level_city_quarter地級市季度表檢視資料量
mysql> select count(*) from prefecture_level_city_quarter;
+----------+
| count(*) |
+----------+
|   173559 |
+----------+
1 row in set (0.05 sec)
           

和datax記錄總數可以對上,說明CSV檔案全部都同步到MySQL

5.DataX常見的參數設定

5-1.加速相關配置

參數 說明 注意事項
job.setting.speed.channel 設定并發數
job.setting.speed.record 總record限速 配置此參數,則必須配置單個channel的record限速參數
job.setting.speed.byte 總byte限速 配置此參數,則必須配置單個channel的byte限速參數
core.transport.channel.speed.record 單個channel的record限速,預設10000條/s
【注意】:如果配置了總record限速和總byte限速,channel并發數就會失效。因為配置了這兩個參數後,實際的channel并發數是通過計算得到的

5-2.運作記憶體調整

當提升DataX Job内的Channel并發數時,記憶體的占用會明顯增加,因為DataX作為資料交換通道,在記憶體中會緩存較多的資料。

例如:channel中會有一個Buffer,作為臨時的資料交換緩沖區,而在Reader和Write中,也會有一些buffer,為了防止OOM等錯誤,需要适當調大JVM堆記憶體
  • 永久修改

修改datax.py

# 找到DEFAULT_JVM相關内容更改:-Xms1g -Xmx1g
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
           
  • 目前任務修改

啟動時使用參數:

python bin/datax.py --jvm = "-Xms8G -Xmx8G” job.json