一、插件開發
子產品命名規範
Reader插件名稱格式: Xreader-Y
Writer插件名稱格式: Xwriter-Y
其中:
X:資料源類型英⽂文名稱小寫 oraclereader、mysqlwriter
Y:廠商(版本) mysqlreader-5.7 hivewriter-cdh5.12.0
QuickStart
使用私服上的項目骨架進行開發
- DgroupId:新項⽬目的groupId 【保持與示例一緻即可】
- Dpackage:新項⽬目的package 【保持與示例一緻即可】
- Dversion:新項⽬目的version 【保持與示例一緻即可】
- DarchetypeGroupId:模闆⼯工程的GroupId 【保持與示例一緻即可】
- DarchetypeVersion:模闆⼯工程的Version 【保持與示例一緻即可】
- DarchetypeArtifactId:模闆⼯工程的ArtifactId
- DartifactId:新項⽬目的artifactId 建議命名方式為插件名稱
jdbc類骨架
writer
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxwriterplugin-archetype -DartifactId=
reader
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxreaderplugin-archetype -DartifactId=
Hadoop類骨架
需要指定廠商和版本,添加參數-Dvendor,例如:
cdh5.12.0 -Dvendor=cdh5.12.0
cdh6.1.0 -Dvendor=cdh6.1.0
apache -Dvendor=apache
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxhadoopwriterplugin-archetype -DartifactId= -Dvendor=
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxhadoopreaderplugin-archetype -DartifactId= -Dvendor=
其他類骨架
其他的一些資料源,例如es,需要使用自身的api,通用實作無法适用,則使用common骨架
主要的差別在于mvn的依賴,隻依賴了datax-common
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxcommonwriterplugin-archetype -DartifactId=
mvn archetype:generate -DarchetypeGroupId=com.dtwave.dipper.datax.plugin -DarchetypeVersion=1.0.0-SNAPSHOT -X -DarchetypeCatalog=local -DgroupId=com.dtwave.datax.plugin -Dpackage=com.dtwave.dipper.datax.plugin -Dversion=1.0.0-SNAPSHOT -DarchetypeArtifactId=dataxcommonreaderplugin-archetype -DartifactId=
其中骨架生成的目錄結構如下
.
├── doc
│ └── README.md 插件文檔
├── pom.xml
├── src
│ └── main
│ ├── assembly
│ │ └── package.xml 打包配置
│ ├── java
│ └── resources
│ └── plugin.json 插件描述
0.mvn依賴
- rdbms
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!--這裡的connector依賴要改成相應資料源的-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.34</version>
</dependency>
- hadoop
<properties>
<!--替換成對應叢集廠商的hadoop和hive的pom版本-->
<hadoop.version>2.6.0-cdh5.12.0</hadoop.version>
<hive.version>1.1.0-cdh5.12.0</hive.version>
</properties>
1.實作接口
插件的入口類必須擴充
Reader
或
Writer
抽象類,并且實作分别實作
Job
和
Task
兩個内部抽象類,
Job
Task
的實作必須是 内部類 的形式。
骨架生成項目是一個mysql插件的實作
我們已經在CommonRdbmsReader和CommonRdbmsWriter做了通用實作
可以看到,MysqlReader通過CommonRdbmsReader隻需要很少的代碼就可以适配了
rdbms類型的資料源适配過程和mysql基本一緻
大部分情況下,隻要把DBType中的typeName和driverClassName改一下就好了
public class MysqlReader extends Reader {
/**
* 資料源類型
*/
private static final DBType DB_TYPE;
static {
/*
typeName:資料源類型名稱(全小寫)
driverClassName:資料源jdbc驅動類
*/
DB_TYPE = new DBType("mysql", "com.mysql.jdbc.Driver");
}
public static class Job extends Reader.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration originalConfig = null;
private CommonRdbmsReader.Job commonRdbmsReaderJob;
/**
* Job對象初始化工作,測試可以通過super.getPluginJobConf()擷取與本插件相關的配置。
* 讀插件獲得配置中reader部分,寫插件獲得writer部分。
*/
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);
if (userConfigedFetchSize != null) {
LOG.warn("對 mysqlreader 不需要配置 fetchSize, mysqlreader 将會忽略這項配置. 如果您不想再看到此警告,請去除fetchSize 配置.");
}
this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);
this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DB_TYPE);
this.commonRdbmsReaderJob.init(this.originalConfig);
}
/**
* 全局準備工作,比如odpswriter清空目标表。
*/
@Override
public void prepare() {
super.prepare();
}
/**
* 校驗
*/
@Override
public void preCheck() {
this.commonRdbmsReaderJob.preCheck(this.originalConfig, DB_TYPE);
}
/**
* 拆分Task。
* 參數adviceNumber架構建議的拆分數,一般是運作時所配置的并發度。
* 值傳回的是`Task`的配置清單。
*
* @param adviceNumber
* @return
*/
@Override
public List<Configuration> split(int adviceNumber) {
return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);
}
/**
* 全局的後置工作,比如mysqlwriter同步完影子表後的rename操作。
*/
@Override
public void post() {
this.commonRdbmsReaderJob.post(this.originalConfig);
}
/**
* Job對象自身的銷毀工作。
*/
@Override
public void destroy() {
this.commonRdbmsReaderJob.destroy(this.originalConfig);
}
}
public static class Task extends Reader.Task {
private Configuration readerSliceConfig;
private CommonRdbmsReader.Task commonRdbmsReaderTask;
/**
* Task對象的初始化。此時可以通過super.getPluginJobConf()擷取與本Task相關的配置。
* 這裡的配置是Job的split方法傳回的配置清單中的其中一個。
*/
@Override
public void init() {
this.readerSliceConfig = super.getPluginJobConf();
this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DB_TYPE, super.getTaskGroupId(), super.getTaskId());
this.commonRdbmsReaderTask.init(this.readerSliceConfig);
}
/**
* 局部的準備工作。
*/
@Override
public void prepare() {
super.prepare();
}
/**
* 從資料源讀資料,寫入到RecordSender中。
* RecordSender會把資料寫入連接配接Reader和Writer的緩存隊列。
*
* @param recordSender
*/
@Override
public void startRead(RecordSender recordSender) {
int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
super.getTaskPluginCollector(), fetchSize);
}
/**
* 局部的後置工作。
*/
@Override
public void post() {
this.commonRdbmsReaderTask.post(this.readerSliceConfig);
}
/**
* Task對象自身的銷毀工作。
*/
@Override
public void destroy() {
this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);
}
}
}
2.修改插件描述 plugin.json
plugin.json
在每個插件的項目中,都有一個
plugin.json
檔案,這個檔案定義了插件的相關資訊,包括入口類。例如:
{
"name": "oraclereader",
"class": "com.alibaba.datax.plugin.writer.oraclereader.OracleReader",
"description": "插件用途",
"developer": "插件開發者",
"engineVersion": ""
}
-
: 插件名稱,大小寫敏感。架構根據使用者在配置檔案中指定的名稱來搜尋插件。 十分重要 。name
-
: 入口類的全限定名稱,架構通過反射穿件入口類的執行個體。十分重要 。class
-
: 描述資訊。description
-
: 開發人員。developer
-
: 資料源版本,目前先不管engineVersion
注意:插件的目錄名字必須和
plugin.json
中定義的插件名稱一緻。
3.打包釋出
- 子產品單獨打包指令:
mvn clean package -DskipTests
編譯好的内容存放在target/datax 下面
.
├── plugin
│ └── reader
│ └── common
│ └── mysqlreader
│ ├── libs
│ │ ├── mysqlreader-plugin-dependencies.jar
│ ├── mysqlreader-0.0.1-SNAPSHOT.jar
│ └── plugin.json
- 為了運維友善部署區分廠商版本,整體打包形成壓縮包目錄格式根據廠商版本區分、通用元件存放在common 檔案目錄下
-- plugin
-- reader
-- common(通用插件)
-- mysqlreader
-- sqlserverreader
-- ...
-- hdp (hdp廠商的插件)
-- hivereader
-- ...
-- cdh5.14.2(cdh5.14.2 的插件)
-- hivereader
-- ...
-- cdh6.1.0
-- hivereader
-- ...
-- writer
-- common(通用插件)
-- mysqlwriter
-- sqlserverwriter
-- ...
-- hdp (hdp廠商的插件)
-- hivewriter
-- ...
-- cdh5.14.2(cdh5.14.2 的插件)
-- hivewriter
-- ...
-- cdh6.1.0
-- hivewriter
-- ...
4.如何調試
0. python準備
如果沒有python,需要先安裝
python2.7
,一定要2.7版本
1.copy插件
在
datax-plugin-test
檔案夾中已經準好了下面的目錄結構
.
├── bin 可執行程式目錄
├── conf 架構配置目錄
├── job 任務配置檔案,裡面有stream.json和rdbms.json的兩個配置檔案樣例
├── lib 架構依賴庫目錄
├── log 運作日志
└── plugin 插件目錄
plugin目錄分為`reader`和`writer`子目錄,讀寫插件分别存放。插件目錄規範如下:
${PLUGIN_HOME}/libs: 插件的依賴庫。
${PLUGIN_HOME}/plugin-name.jar: 插件本身的jar。
${PLUGIN_HOME}/plugin.json: 插件描述檔案。
可以在終端執行python datax.py ../job/stream.json
感受一下執行過程
把打包好的插件放到plugin對應目錄下
在調試時推薦另一端使用mysql。比如要寫一個reader的插件,那目的端使用mysqlwriter,要寫一個writer的插件,源端使用mysqlreader
2.修改配置檔案
修改job路徑下rdbms.json,填寫正确的jdbcUrl、column以及table資訊
不要忘記修改使用的插件名
reader.name
、
writer.name
配置詳細資訊請看第三章
配置檔案

3.執行debug腳本
cd到bin目錄下,執行
python datax.py ../job/mysql2mysql_config.json -d
執行腳本後會列印出遠端調試端口
4.idea啟動調試
設定ip和端口,本地調試的話ip就用localhost就行了,端口設定成終端列印出來的
打上斷點,點選啟動調試
5.寫文檔
一定要記得把你的成果用文檔記錄下來哦!!!
可以參照doc/README.md
二、如何使用 Configuration
類
Configuration
為了簡化對json的操作,
DataX
提供了簡單的DSL配合
Configuration
類使用。
Configuration
提供了常見的
get
,
帶類型get
,
帶預設值get
set
等讀寫配置項的操作,以及
clone
toJSON
等方法。配置項讀寫操作都需要傳入一個
path
做為參數,這個
path
就是
DataX
定義的DSL。文法有兩條:
- 子map用
表示,.key
的第一個點省略。path
- 數組元素用
表示。[index]
比如操作如下json:
{
"a": {
"b": {
"c": 2
},
"f": [
1,
2,
{
"g": true,
"h": false
},
4
]
},
"x": 4
}
比如調用
configuration.get(path)
方法,當path為如下值的時候得到的結果為:
-
:x
4
-
a.b.c
2
-
a.b.c.d
null
-
a.b.f[0]
1
-
a.b.f[2].g
true
注意,因為插件看到的配置隻是整個配置的一部分。使用
Configuration
對象時,需要注意目前的根路徑是什麼。
三、配置檔案
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/test"
],
"table": [
"`test`"
]
}
],
"modifyUserName": "shulan_admin",
"password": "root",
"username": "root"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"cleanRule": 1,
"column": [
"id",
"name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test",
"table": [
"`test`"
]
}
],
"modifyUserName": "shulan_admin",
"password": "root",
"username": "root",
"writeMode": "replace"
}
}
}
],
"setting": {
"errorLimit": {
"record": 0
},
"speed": {
"channel": 5,
"throttle": false
}
}
}
}
- jdbcUrl
資料庫的 JDBC 連接配接資訊。作業運作時,DataX 會在你提供的 jdbcUrl 後面追加如下屬性:
- username
資料庫的使用者名
- password
資料庫的密碼
- table
表名稱,支援寫入一個或者多個表;當配置為多張表時,必須確定所有表結構保持一緻。
注意:table 和 jdbcUrl 必須包含在 connection 配置單元中
- column
表需要寫入/讀取資料的字段,字段之間用英文逗号分隔。例如: "column": ["id","name","age"]。
下面是一些非必填的選項
- session
DataX在擷取Mysql連接配接時,執行session指定的SQL語句,修改目前connection session屬性
- preSql
寫入資料到目的表前,會先執行這裡的标準語句。比如你希望導入資料前,先對表中資料進行删除操作,那麼你可以這樣配置: "preSql":["delete from 表名"]
,效果是:在執行到每個表寫入資料前,會先執行對應的 delete from 對應表名稱
- postSql
寫入資料到目的表後,會執行這裡的标準語句。(原理同 preSql )
- writeMode
控制寫入資料到目标表采用或者
insert into
replace into
ON DUPLICATE KEY UPDATE
語句
所有選項:insert/replace/update。需要注意的是一些庫可能不支援replace
預設值:insert
- batchSize
一次性批量送出的記錄數大小,該值可以極大減少DataX與Mysql的網絡互動次數,并提升整體吞吐量。但是該值設定過大可能會造成DataX運作程序OOM情況。
預設值:1024
四、髒資料處理
-
什麼是髒資料?
目前主要有三類髒資料:
- Reader讀到不支援的類型、不合法的值。
- 不支援的類型轉換,比如:
轉換為Bytes
。Date
- 寫入目标端失敗,比如:寫mysql整型長度超長。
-
如何處理髒資料
Reader.Task
Writer.Task
中,通過
AbstractTaskPlugin.getPluginCollector()
可以拿到一個
TaskPluginCollector
,它提供了一系列
collectDirtyRecord
的方法。當髒資料出現時,隻需要調用合适的
collectDirtyRecord
方法,把被認為是髒資料的
Record
傳入即可。
使用者可以在任務的配置中指定髒資料限制條數或者百分比限制,當髒資料超出限制時,架構會結束同步任務,退出。插件需要保證髒資料都被收集到,其他工作交給架構就好。
五、整體架構
邏輯執行模型
插件開發者不用關心太多,基本隻需要關注特定系統讀和寫,以及自己的代碼在邏輯上是怎樣被執行的,哪一個方法是在什麼時候被調用的。在此之前,需要明确以下概念:
-
:Job
是DataX用以描述從一個源頭到一個目的端的同步作業,是DataX資料同步的最小業務單元。比如:從一張mysql的表同步到odps的一個表的特定分區。Job
-
Task
是為最大化而把Task
拆分得到的最小執行單元。比如:讀一張有1024個分表的mysql分庫分表的Job
,拆分成1024個讀Job
,用若幹個并發執行。Task
-
: 描述的是一組TaskGroup
集合。在同一個Task
執行下的TaskGroupContainer
集合稱之為Task
TaskGroup
-
:JobContainer
執行器,負責Job
全局拆分、排程、前置語句和後置語句等工作的工作單元。類似Yarn中的JobTrackerJob
-
TaskGroupContainer
執行器,負責執行一組TaskGroup
的工作單元,類似Yarn中的TaskTracker。Task
簡而言之,
Job
拆分成
Task
,在分别在架構提供的容器中執行,插件隻需要實作
Job
Task
兩部分邏輯。
實體執行模型
架構為插件提供實體上的執行能力(線程)。
DataX
架構有三種運作模式:
-
: 單程序運作,沒有外部依賴。Standalone
-
: 單程序運作,統計資訊、錯誤資訊彙報到集中存儲。Local
-
: 分布式多程序運作,依賴Distrubuted
服務。DataX Service
當然,上述三種模式對插件的編寫而言沒有什麼差別,你隻需要避開一些小錯誤,插件就能夠在單機/分布式之間無縫切換了。
當
JobContainer
TaskGroupContainer
運作在同一個程序内時,就是單機模式(
Standalone
Local
);當它們分布在不同的程序中執行時,就是分布式(
Distributed
)模式。
是不是很簡單?
程式設計接口
那麼,
Job
Task
的邏輯應是怎麼對應到具體的代碼中的?
首先,插件的入口類必須擴充
Reader
Writer
Job
Task
Job
Task
的實作必須是 内部類 的形式,原因見 加載原理 一節。以Reader為例:
public class SomeReader extends Reader {
public static class Job extends Reader.Job {
@Override
public void init() {
}
@Override
public void prepare() {
}
@Override
public List<Configuration> split(int adviceNumber) {
return null;
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
public static class Task extends Reader.Task {
@Override
public void init() {
}
@Override
public void prepare() {
}
@Override
public void startRead(RecordSender recordSender) {
}
@Override
public void post() {
}
@Override
public void destroy() {
}
}
}
Job
接口功能如下:
-
: Job對象初始化工作,測試可以通過init
擷取與本插件相關的配置。讀插件獲得配置中super.getPluginJobConf()
部分,寫插件獲得reader
部分。writer
-
: 全局準備工作,比如odpswriter清空目标表。prepare
-
: 拆分split
。參數Task
架構建議的拆分數,一般是運作時所配置的并發度。值傳回的是adviceNumber
的配置清單。Task
-
: 全局的後置工作,比如mysqlwriter同步完影子表後的rename操作。post
-
: Job對象自身的銷毀工作。destroy
Task
-
:Task對象的初始化。此時可以通過init
擷取與本super.getPluginJobConf()
相關的配置。這裡的配置是Task
的Job
方法傳回的配置清單中的其中一個。split
-
:局部的準備工作。prepare
-
: 從資料源讀資料,寫入到startRead
中。RecordSender
會把資料寫入連接配接Reader和Writer的緩存隊列。RecordSender
-
:從startWrite
中讀取資料,寫入目标資料源。RecordReceiver
中的資料來自Reader和Writer之間的緩存隊列。RecordReceiver
-
: 局部的後置工作。post
-
: Task象自身的銷毀工作。destroy
需要注意的是:
-
Job
之間一定不能有共享變量,因為分布式運作時不能保證共享變量會被正确初始化。兩者之間隻能通過配置檔案進行依賴。Task
-
prepare
post
Job
中都存在,插件需要根據實際情況确定在什麼地方執行操作。Task
架構按照如下的順序執行
Job
Task
的接口:
上圖中,黃色表示
Job
部分的執行階段,藍色表示
Task
部分的執行階段,綠色表示架構執行階段。
相關類關系如下:
配置檔案
DataX
使用
json
作為配置檔案的格式。一個典型的
DataX
任務配置如下:
{
"job": {
"content": [
{
"reader": {
"name": "odpsreader",
"parameter": {
"accessKey": "",
"accessId": "",
"column": [""],
"isCompress": "",
"odpsServer": "",
"partition": [
""
],
"project": "",
"table": "",
"tunnelServer": ""
}
},
"writer": {
"name": "oraclewriter",
"parameter": {
"username": "",
"password": "",
"column": ["*"],
"connection": [
{
"jdbcUrl": "",
"table": [
""
]
}
]
}
}
}
]
}
}
DataX
架構有
core.json
配置檔案,指定了架構的預設行為。任務的配置裡頭可以指定架構中已經存在的配置項,而且具有更高的優先級,會覆寫
core.json
中的預設值。
配置中
job.content.reader.parameter
的value部分會傳給
Reader.Job
;
job.content.writer.parameter
Writer.Job
Reader.Job
Writer.Job
可以通過
super.getPluginJobConf()
來擷取。
DataX
架構支援對特定的配置項進行RSA加密,例子中以
*
開頭的項目便是加密後的值。 配置項加密解密過程對插件是透明,插件仍然以不帶
*
的key來查詢配置和操作配置項 。
如何設計配置參數
配置檔案的設計是插件開發的第一步!
任務配置中
reader
writer
下
parameter
部分是插件的配置參數,插件的配置參數應當遵循以下原則:
- 駝峰命名:所有配置項采用駝峰命名法,首字母小寫,單詞首字母大寫。
- 正交原則:配置項必須正交,功能沒有重複,沒有潛規則。
- 富類型:合理使用json的類型,減少無謂的處理邏輯,減少出錯的可能。
- 使用正确的資料類型。比如,bool類型的值使用
/true
,而非false
"yes"
等。"true"
- 合理使用集合類型,比如,用數組替代有分隔符的字元串。
- 使用正确的資料類型。比如,bool類型的值使用
- 類似通用:遵守同一類型的插件的習慣,比如關系型資料庫的
參數都是如下結構:connection
{
"connection": [
{
"table": [
"table_1",
"table_2"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/database_1",
"jdbc:mysql://127.0.0.2:3306/database_1_slave"
]
},
{
"table": [
"table_3",
"table_4"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.3:3306/database_2",
"jdbc:mysql://127.0.0.4:3306/database_2_slave"
]
}
]
}
插件資料傳輸
跟一般的
生産者-消費者
模式一樣,
Reader
插件和
Writer
插件之間也是通過
channel
來實作資料的傳輸的。
channel
可以是記憶體的,也可能是持久化的,插件不必關心。插件通過
RecordSender
往
channel
寫入資料,通過
RecordReceiver
從
channel
讀取資料。
channel
中的一條資料為一個
Record
的對象,
Record
中可以放多個
Column
對象,這可以簡單了解為資料庫中的記錄和列。
Record
有如下方法:
public interface Record {
// 加入一個列,放在最後的位置
void addColumn(Column column);
// 在指定下标處放置一個列
void setColumn(int i, final Column column);
// 擷取一個列
Column getColumn(int i);
// 轉換為json String
String toString();
// 擷取總列數
int getColumnNumber();
// 計算整條記錄在記憶體中占用的位元組數
int getByteSize();
}
因為
Record
是一個接口,
Reader
插件首先調用
RecordSender.createRecord()
建立一個
Record
執行個體,然後把
Column
一個個添加到
Record
Writer
插件調用
RecordReceiver.getFromReader()
方法擷取
Record
,然後把
Column
周遊出來,寫入目标存儲中。當
Reader
尚未退出,傳輸還在進行時,如果暫時沒有資料
RecordReceiver.getFromReader()
方法會阻塞直到有資料。如果傳輸已經結束,會傳回
null
Writer
插件可以據此判斷是否結束
startWrite
方法。
Column
的構造和操作,我們在《類型轉換》一節介紹。
類型轉換
為了規範源端和目的端類型轉換操作,保證資料不失真,DataX支援六種内部資料類型:
-
:定點數(Int、Short、Long、BigInteger等)。Long
-
:浮點數(Float、Double、BigDecimal(無限精度)等)。Double
-
:字元串類型,底層不限長,使用通用字元集(Unicode)。String
-
:日期類型。Date
-
:布爾值。Bool
-
:二進制,可以存放諸如MP3等非結構化資料。Bytes
對應地,有
DateColumn
LongColumn
DoubleColumn
BytesColumn
StringColumn
BoolColumn
六種
Column
的實作。
Column
除了提供資料相關的方法外,還提供一系列以
as
開頭的資料類型轉換轉換方法。
DataX的内部類型在實作上會選用不同的java類型:
内部類型 | 實作類型 | 備注 |
---|---|---|
Date | java.util.Date | |
Long | java.math.BigInteger | 使用無限精度的大整數,保證不失真 |
Double | java.lang.String | 用String表示,保證不失真 |
Bytes | byte[] | |
String | ||
Bool | java.lang.Boolean |
類型之間互相轉換的關系如下:
from\to | ||||||
---|---|---|---|---|---|---|
- | 使用毫秒時間戳 | 不支援 | 使用系統配置的date/time/datetime格式轉換 | |||
作為毫秒時間戳構造Date | BigInteger轉為BigDecimal,然後BigDecimal.doubleValue() | BigInteger.toString() | 0為false,否則true | |||
内部String構造BigDecimal,然後BigDecimal.longValue() | 直接傳回内部String | |||||
按照 配置的編碼轉換為String,預設 | ||||||
按照配置的date/time/datetime/extra格式解析 | 用String構造BigDecimal,然後取longValue() | 用String構造BigDecimal,然後取doubleValue(),會正确處理 | 配置的編碼轉換為byte[],預設 | "true"為 , "false"為 ,大小寫不敏感。其他字元串不支援 | ||
為 ,否則 | |
加載原理
- 架構掃描
plugin/reader
目錄,加載每個插件的plugin/writer
檔案。plugin.json
- 以
檔案中plugin.json
為key,索引所有的插件配置。如果發現重名的插件,架構會異常退出。name
- 使用者在插件中在
reader
配置的writer
字段指定插件名字。架構根據插件的類型(name
reader
)和插件名稱去插件的路徑下掃描所有的jar,加入writer
classpath
- 根據插件配置中定義的入口類,架構通過反射執行個體化對應的
Job
對象。Task