天天看点

DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具

DataX篇—阿里开源ETL同步工具

  • DataX
    • 插件简介
    • DataX架构
    • 核心模块介绍
    • DataX 执行流程
    • 编程接口模板:
  • dataX-web 可视化管理工具
    • DataX-web架构图
    • Data-web 功能说明
    • 日志分析

DataX

插件简介

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。

DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具

DataX架构

DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  1. Reader: Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  2. Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  3. Framework: Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

核心模块介绍

接下来我们以DataX执行一次数据同步作业流程的角度介绍下其核心模块:

  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务。
  5. DataX作业运行起来之后,Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0.
    DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具

DataX 执行流程

  1. 解析配置,包括job.json、core.json、plugin.json三个配置
  2. 设置jobId到configuration当中
  3. 启动Engine,通过Engine.start()进入启动程序
  4. 设置RUNTIME_MODE到configuration当中
  5. 通过JobContainer的start()方法启动
  6. 依次执行job的preHandler()、init()、prepare()、split()、schedule()、- post()、postHandle()等方法。
  7. init()方法涉及到根据configuration来初始化reader和writer插件,这里涉及到jar包热加载以及调用插件init()操作方法,同时设置reader和writer的configuration信息
  8. prepare()方法涉及到初始化reader和writer插件的初始化,通过调用插件的prepare()方法实现,每个插件都有自己的jarLoader,通过集成URLClassloader实现而来
  9. split()方法通过adjustChannelNumber()方法调整channel个数,同时执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,达到切分后数目相等,才能满足1:1的通道模型
  10. channel的计数主要是根据byte和record的限速来实现的(如果自己没有设置了channel的个数),在split()的函数中第一步就是计算channel的大小
  11. split()方法reader插件会根据channel的值进行拆分,但是有些reader插件可能不会参考channel的值,writer插件会完全根据reader的插件1:1进行返回
  12. split()方法内部的mergeReaderAndWriterTaskConfigs()负责合并reader、writer、以及transformer三者关系,生成task的配置,并且重写job.content的配置
  13. schedule()方法根据split()拆分生成的task配置分配生成taskGroup对象,根据task的数量和单个taskGroup支持的task数量进行配置,两者相除就可以得出taskGroup的数量
  14. schdule()内部通过AbstractScheduler的schedule()执行,继续执行startAllTaskGroup()方法创建所有的TaskGroupContainer组织相关的task,TaskGroupContainerRunner负责运行TaskGroupContainer执行分配的task。
  15. taskGroupContainerExecutorService启动固定的线程池用以执行TaskGroupContainerRunner对象,TaskGroupContainerRunner的run()方法调用taskGroupContainer.start()方法,针对每个channel创建一个TaskExecutor,通过taskExecutor.doStart()启动任务。
    DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具
    DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具

理论上现有数据源插件已经非常齐全了,但不乏我们需要为我们的业务二次开发、改造,所以我们需要清楚了解插件的执行流程:

其中,我们需要对其进行业务定制化时,主要集中在prepare、Start、post这三个方法中进行改造。如果我们需要扩展数据源时,也是按照这个执行过程进行开发的。

编程接口模板:

public class SomeReader extends Reader {
    public static class Job extends Reader.Job {

        @Override
        public void init() {
        }
		
		@Override
		public void prepare() {
        }

        @Override
        public List<Configuration> split(int adviceNumber) {
            return null;
        }

        @Override
        public void post() {
        }

        @Override
        public void destroy() {
        }

    }

    public static class Task extends Reader.Task {

        @Override
        public void init() {
        }
		
		@Override
		public void prepare() {
        }

        @Override
        public void startRead(RecordSender recordSender) {
        }

        @Override
        public void post() {
        }

        @Override
        public void destroy() {
        }
    }
}
           

Job接口功能如下:

  1. init: Job对象初始化工作,此时可以通过super.getPluginJobConf()获取与本插件相关的配置。读插件获得配置中reader部分,写插件获得writer部分。
  2. prepare: 全局准备工作,比如odpswriter清空目标表。
  3. split: 拆分Task。参数adviceNumber框架建议的拆分数,一般是运行时所配置的并发度。值返回的是Task的配置列表。
  4. post: 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。
  5. destroy: Job对象自身的销毁工作。

Task接口功能如下:

6. init:Task对象的初始化。此时可以通过super.getPluginJobConf()获取与本Task相关的配置。这里的配置是Job的split方法返回的配置列表中的其中一个。

7. prepare:局部的准备工作。

8. startRead: 从数据源读数据,写入到RecordSender中。RecordSender会把数据写入连接Reader和Writer的缓存队列。

9. startWrite:从RecordReceiver中读取数据,写入目标数据源。RecordReceiver中的数据来自Reader和Writer之间的缓存队列。

10. post: 局部的后置工作。

11. destroy: Task象自身的销毁工作。

需要注意的是:

Job和Task之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。两者之间只能通过配置文件进行依赖。

prepare和post在Job和Task中都存在,插件需要根据实际情况确定在什么地方执行操作。

dataX-web 可视化管理工具

DataX Web是在DataX之上开发的分布式数据同步工具,提供简单易用的 操作界面,降低用户使用DataX的学习成本,缩短任务配置时间,避免配置过程中出错。用户可通过页面选择数据源即可创建数据同步任务,RDBMS数据源可批量创建数据同步任务,支持实时查看数据同步进度及日志并提供终止同步功能,集成并二次开发xxl-job可根据时间、自增主键增量同步数据。

任务"执行器"支持集群部署,支持执行器多节点路由策略选择,支持超时控制、失败重试、失败告警、任务依赖,执行器CPU.内存.负载的监控等等。后续还将提供更多的数据源支持、数据转换UDF、表结构同步、数据同步血缘等更为复杂的业务场景。

DataX-web架构图

我们采用dataX-Web来进行数据同步可视化操作并执行DataX

DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具
DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具

Data-web 功能说明

  1. 执行器配置(使用开源项目xxl-job)
    DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具
  2. 创建数据源
    DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具
  3. 创建任务模版
    DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具
  4. 构建JSON脚本
    DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具
    DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具
    DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具
  5. 任务创建介绍
    DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具
  6. 任务列表
    DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具
  7. 日志管理

    可以点击查看日志,实时获取日志信息,终止正在执行的datax进程

    DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具
    DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具

日志分析

我们可以在DataX-Web 日志管理功能中查看DataX单次执行任务的详细信息,包括构建的JSON,同步过程,资源消耗情况(JVM,GC),网速控制,同步数据量,成功与否等等。

传输过程中打印传输速度、精度信息等:

DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具

传输过程中打印进程相关的CPU、JVM等:

DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具

在任务结束之后,打印总体运行情况:

DataX篇—阿里开源ETL同步工具DataXdataX-web 可视化管理工具