天天看点

使用DataX同步MaxCompute数据到TableStore(原OTS)优化指南

现在越来越多的技术架构下会组合使用maxcompute和tablestore,用maxcompute作大数据分析,计算的结果会导出到tablestore提供在线访问。maxcompute提供海量数据计算的能力,而tablestore提供海量数据高并发低延迟读写的能力。

将maxcompute内数据导出至tablestore,目前可选的几种主要途径包括:

自己编写工具:使用maxcompute sdk通过tunnel读取表数据,再通过tablestore sdk再写入数据。

其中第二种是我们最常推荐给用户做临时的数据导出使用的,如果没有需要对数据做特殊处理的需求,我们一般不推荐第一种途径。

datax在阿里集团内部已经应用了很多年,经历了多次双十一的考验,是一个稳定、易用、高效的工具。随着maxcompute上结果数据越来越庞大,数据导出的速率越来越被看重,海量的数据需要在基线内完成导出。本篇文章,主要会介绍几种优化手段,以提高使用datax来进行maxcompute向tablestore数据导出的吞吐量。

优化过程

我们会以实际的场景,来演示如何通过一步步的优化,提升数据导出的速度。在数据导出的整个链路上,主要有三个环节,一是maxcompute数据通道的读,二是datax的数据交换,三是tablestore的在线写,这三个环节任意一个成为瓶颈,都会影响导出的速度。

maxcompute数据通道的读的性能比较高,一般不会成为瓶颈,本文主要是针对后两个环节来优化。优化的核心指导方针就是:1. 提高并发,2. 降低写入延迟。接下来列举的几种优化手段,也是围绕这两点,来不断进行优化。

实验选择使用tablestore的测试环境,在maxcompute上,我们会创建一张表并准备1亿行数据。tablestore的测试环境规模以及datax job宿主机的规格都较小,所以整个实验最终达到的速率是比较小的,主要为了演示速率如何提升。而在真实的tablestore生产环境上,规模足够的情况下,我们帮助过应用优化到每秒上百m甚至上g的速度,优化手段相同。

首先在maxcompute内创建如下表:

其次在表内倒入1亿行数据,每行数据约200个字节,其中userid列采用随机值,计算出的md5值取4个字节作为md5列,数据样例如下:

md5

userid

name

comments

attr0

attr1

attr2

attr3

create_time

update_time

028f

108217721721

john

0123456789....

0123456789...

20170201

20170206

01d2

192871726121

bill

f01d

284671281623

jura

测试数据导入使用的是maxcompute tunnel,速度还是比较可观的。

数据准备完毕后,在tablestore上创建一张表,使用md5和userid作为主键列:

表和数据均准备完毕后,使用如下datax job配置类进行一次数据导出:

启动datax任务,从标准输出中可以看到当前数据导出的速度:

可以看到,当前的速度大约是1mb/s,接下来会演示如何进行优化,一步一步将速度给提升上去。

第一步是对datax的几个基础参数进行调优,先大致了解下一个datax job内部,任务的运行结构:

使用DataX同步MaxCompute数据到TableStore(原OTS)优化指南

一个datax job会切分成多个task,每个task会按taskgroup进行分组,一个task内部会有一组reader->channel->writer。channel是连接reader和writer的数据交换通道,所有的数据都会经由channel进行传输。

在datax内部对每个channel会有严格的速度控制,默认的速度限制是1mb/s,这也是为何我们使用默认配置,速度为1mb/s的原因。所以第一个需要优化的基础参数就是单个channel的速度限制,更改配置如下:

我们把单个channel的速度上限配置为5mb。这个值需要针对不同的场景进行不同的配置,例如对于maxcompute,单个channel的速度可以达到几十mb,对于tablestore,在列较小较多的场景下,单个channel的速度是几mb,而在列较大的场景下,可能速度就会上到几十mb。

我们当前默认配置中配置启动的job内channel数为1,要提高速度,并发必须提高,这个是第二步要做的优化。但是在做第二个优化之前,还需要调整一个基础参数,那就是datax job启动的jvm的内存大小配置。

目前datax启动的jvm默认的配置是"-xms1g -xmx1g",当一个job内channel数变多后,内存的占用会显著增加,因为datax作为数据交换通道,在内存中会缓存较多的数据,例如channel中会有一个buffer,作为临时的数据交换的缓冲区,而在部分reader和writer的中,也会存在一些buffer。

调整jvm参数的方式有两种,一种是直接更改datax.py,另一种是在启动的时候,加上对应的参数,如下:

通常我们建议将内存设置为4g或者8g,这个也可以根据实际情况来调整。

在优化完单channel的限速和jvm的内存参数之后,我们重新跑一下任务:

当前数据导出的速度已经从1mb提升到2mb。

在上一点中指出,当前job内部,只有单个channel在执行导出任务,而要提升速率,要做的就是提升channel的并发数。

datax内部对每个channel会做限速,可以限制每秒byte数,也可以限制每秒record数。除了对每个channel限速,在全局还会有一个速度限制的配置,默认是不限。

提升channel并发数有三种途径:

1, 配置全局byte限速以及单channel byte限速,channel个数 = 全局byte限速 / 单channel byte限速。(下面示例中最终channel个数为10)

2,配置全局record限速以及单channel record限速,channel个数 = 全局record限速 / 单channel record限速。(下面示例中最终channel个数为3)

3, 全局不限速,直接配置channel个数。(下面示例中最终channel个数为5)

第三种方式最简单直接,但是这样就缺少了全局的限速。在选择channel个数时,同样需要注意,channel个数并不是越多越好。channel个数的增加,带来的是更多的cpu消耗以及内存消耗。如果channel并发配置过高导致jvm内存不够用,会出现的情况是发生频繁的full gc,导出速度会骤降,适得其反。

可以在datax的输出日志中,找到本次任务的channel的数:

在我们这次实验中,我们把channel数直接配置为10,再进行一次导出:

可以看到在channel数从1提升到10之后,速度从2mb/s提升到了9mb/s。此时若再提高channel数到15,速度已经不见涨,而从服务端监控看,每批次导入的写入延迟确在涨,说明当前瓶颈在tablestore写入端。

在上面几个优化做完后,datax数据交换这一环节已经不是瓶颈,当前瓶颈在tablestore端的写入能力上。tablestore是分布式的存储,一张大表会被切分成很多的分区,分区会分散到后端的各个物理机上提供服务。一张新创建的表,默认分区数为1,当这张表越来越大,tablestore会将其分裂,此时分裂是自动完成的。分区的个数,一定程度上与能提供的服务能力相关。某些业务场景,新建表后,就需要对表进行大规模的数据导入,此时默认的单个分区肯定是不够用的,当然可以等数据量慢慢涨上来后等表自动分裂,但是这个周期会比较长。此时,我们推荐的做法是在创建表的时候进行预分区。

不过目前我们还没有对外开放通过sdk来进行预分区的功能,所以如果需要对表进行预分区,可以先通过工单来联系我们帮助进行预分区。

我们新建一张表,并将表预分4个分区,partition key为md5列,采用md5列的主要原因是在其上数据的分区基本是均匀的。如果数据在partition key分布不均匀,则即使做了预分区,导入性能也不会得到明显的提升。以相同的job配置,再跑一下导出任务:

此时速度从9mb/s提升到18mb/s左右,在tablestore服务端能够提高更多的服务能力后,我们尝试再将channel的并发从10提高到15:

此时速度又进一步提升,从18mb/s提升到22mb/s左右。

我们构建的场景,每行大约是200字节左右大小。datax的otswriter写入插件底层是使用的tablestore sdk提供的batchwrite接口进行数据写入,默认一次请求写入100行数据,也就是说一次请求只会导入约20kb大小的数据。每次写过来的数据包都比较小,非常的不经济。

当前tablestore的batchwrite的限制比较不灵活,会限制行数和数据大小,其中行数默认上限是200行。在每行都比较小的场景下,200行一次批量写入是非常不经济的,在我们的这次实验中,我们将上限改为1000行,并将datax tablestore写入插件内部一次批量写入的行数也改为1000行,来验证将每次写入的包变大后,对写入效率的提升。任务配置更改如下(配置项为job.content.writer.parameter.batchwritecount):

再次执行任务,速度如下:

速度再次提升,从22mb/s提升到29mb/s。tablestore后续会优化对batchwrite的行数限制,对于行比较小的场景采用一个比较友好的策略。

以上优化策略都是在单个datax job的场景下进行的优化,单个datax job只能够运行在单台服务器上,没有办法分布式的执行。d2上的托管服务器,一般是千兆网卡,也就是说最多提供100mb/s的速度。若想要进一步的速度提升,则必须采用多个datax job分布在多台服务器上执行才行。

datax内的odpsreader,可以通过配置一次导出整张表或者表的某个partition。我们可以利用partition,来将一张表拆分成多个job分散导出,但是要求表必须是多分区的。

在我们的实验中,创建的maxcompute表并不是多分区的,我们重新创建一张多分区的表:

增加一列为partid,作为分区,我们通过一个sql将原表的数据导入到新表,并自动均匀的分散到partid:

以上sql会将partid的值取自md5列的第一个字符,md5是一个十六进制的值,字符的取值范围是:0-f,这样我们就将原表切成了一个带16个分区的表。我们希望在每个分区内,数据都是均匀的,为了避免长尾,这也是为什么要设计一个md5列的原因。

在将一张表拆成多个分区后,我们就可以选择在不同的服务器上,为每个分区启动一个任务,配置如下(job.content.reader.parameter.partition):

由于测试集群规模的原因,我们不演示多个job并发后的速度提升。在tablestore服务端能力不是瓶颈的情况下,通过扩展datax job的并发,速度是能线性提升的。

总结下上面的几个优化点:

对datax的几个基本参数进行调整,包括:channel数、单个channel的限速以及jvm的内存参数。

创建tablestore表的时候尽量采取预分区,在设计partition key的时候尽量保证在每个partition key上导入数据的分布均匀。

如果导入tablestore的数据行都比较小,则需要考虑提高单批次的导入行数。

若单个datax job已成瓶颈,则需要考虑将任务拆成多个datax job并行执行。

希望以上经验对各位有用,欢迎交流。