天天看点

MaxCompute上如何处理非结构化数据

maxcompute作为阿里云大数据平台的核心计算组件,拥有强大的计算能力,能够调度大量的节点做并行计算,同时对分布式计算中的failover,重试等均有一套行之有效的处理管理机制。 而maxcompute sql能在简明的语义上实现各种数据处理逻辑,在集团内外更是广为应用,在其上实现与各种数据源的互通,对于打通整个阿里云的数据生态具有重要意义。基于这一点,最近maxcompute团队依托maxcompute2.0系统架构,引入了非结构化数据处理框架:通过外部表,为各种数据在maxcompute上的计算处理提供了入口。这里以maxcompute处理存储在oss上的数据为例,介绍这些新功能。

现阶段maxcompute sql面对的主要是以cfile列格式,存储在内部maxcompute表格中的结构化数据。而对于maxcompute表外的各种用户数据(包括文本以及各种非结构化的数据),需要首先通过各种工具导入maxcompute表,才能在其上面进行计算。这个数据导入的过程,具有较大的局限性。以oss为例子,想要在maxcompute里处理oss上的数据,通常有两种做法:

通过oss sdk或者其他工具从oss下载数据,然后再通过maxcompute tunnel将数据导入表里。

写udf,在udf里直接调用oss sdk访问oss数据。

但这两种做法都不够好。#1需要在maxcompute系统外部做一次中转,如果oss数据量太大,还需要考虑如何并发来加速,无法充分利用maxcompute大规模计算的能力;#2通常需要申请udf网络访问权限,还要开发者自己控制作业并发数和数据如何分片的问题。

本文介绍了一种外部表的功能 ,支持旨在提供处理除了maxcompute现有表格以外的其他数据的能力。在这个框架中,通过一条简单的ddl语句,即可在maxcompute上创建一张外部表,建立maxcompute表与外部数据源的关联,提供各种数据的接入和输出能力。创建好的外部表可以像普通的maxcompute表一样使用(大部分场景),充分利用maxcompute sql的强大计算功能。

这里的“各种数据”涵盖两个维度:

多样的数据存储介质

插件式的框架可以对接多种数据存储介质,比如oss、ots

多样的数据格式:maxcompute表是结构化的数据,而外部表可以不限于结构化数据

完全无结构数据;比如图像,音频,视频文件,raw binaries等

半结构化数据;比如csv,tsv等隐含一定schema的文本文件

非cfile的结构化数据; 比如orc/parquet文件,甚至hbase/ots数据

下面通过一个简单例子,来演示如何在maxcompute上轻松访问oss上的数据。

使用maxcompute内置的extractor,可以非常方便的读取按照约定格式存储的oss数据。我们只需要创建一个外部表,就能以这张表为源表做查询。假设有一份csv数据存在oss上,endpoint为<code>oss-cn-hangzhou-zmf.aliyuncs.com</code>,bucket为<code>oss-odps-test</code>,数据文件放在<code>/demo/sampledata/csv/ambulancedata/vehicle.csv</code>。

首先需要在ram中授权maxcompute访问oss的权限。登录ram控制台,创建角色<code>aliyunodpsdefaultrole</code>,并将策略内容设置为:

然后编辑该角色的授权策略,将权限<code>aliyunodpsrolepolicy</code>授权给该角色。

执行一条ddl语句,创建外部表:

注释:

这个功能是maxcompute2.0的一部分,目前处于试用状态。这里需要提前设置一些开关来临时打开这个功能(这个开关设置前需要申请试用maxcompute2.0,文章末尾有介绍),本文后面所有的sql例子语句运行前都需要设置这些开关,为了便于阅读,我只在这里明确写出。

<code>com.aliyun.odps.csvstoragehandler</code>是内置的处理csv格式文件的storagehandler,它定义了如何读写csv文件。我们只需要指明这个名字,相关逻辑已经由系统实现。

location必须指定一个oss目录,默认系统会读取这个目录下所有的文件。

外部表只是在系统中记录了与oss目录的关联,当drop这张表时,对应的location数据不会被删除。

前面我们已经通过ram将账号中的oss资源访问权限授权给了maxcompute。在后续的访问oss过程中,maxcompute将通过sts拿到对oss资源的临时权限。需要注意的是,maxcompute在获取权限时,是以表的创建者的身份去sts申请的,因此,这里创建表的账号和oss必须是同一个云账号,而且必须是主账号,不能是子账号。

外部表创建成功后,我们可以像对普通表一样使用这个外部表。

假设<code>/demo/sampledata/csv/ambulancedata/vehicle.csv</code>数据为:

执行:

这条语句会提交一个作业,调用内置csv extractor,从oss读取数据进行处理。

结果:

当oss中数据格式比较复杂,内置的extractor无法满足需求时,需要自定义extractor来读取oss文件中的数据。

例如有一个text数据文件,并不是csv格式,记录之间的列通过<code>|</code>分割。<code>/demo/sampledata/customtxt/ambulancedata/vehicle.csv</code>数据为:

这个时候可以写一个通用的extractor,将分隔符作为参数传进来,可以处理所有类似格式的text文件。

<code>inputs</code>是一个<code>inputstreamset</code>,每次调用<code>next()</code>返回一个<code>inputstream</code>,这个<code>inputstream</code>可以读取一个oss文件的所有内容。

<code>delimiter</code>通过ddl语句传参

<code>extactor()</code>调用返回一条<code>record</code>,代表外部表中的一条记录

返回<code>null</code>来表示这个表中已经没有记录可读了

storagehandler作为external table自定义逻辑的统一入口。

将自定义代码编译打包,并上传到maxcompute。

与使用内置extractor类似,我们同样需要建立一个外部表,不同的是这次需要指定外部表访问数据的时候,使用自定义的storagehandler。

<code>stored by</code>指定自定义storagehandler的类名

<code>serdeproperites</code>可以指定参数,这些参数会通过<code>dataattributes</code>传递到extractor代码中

同时需要指定类定义所在的jar包

这条语句会提交一个作业,调用自定义的extractor,从oss读取数据进行处理。

在前面我们看到了通过内置与自定义的extractor可以轻松处理存储在oss上的csv等文本数据。接下来我们以语音数据(wav格式文件)为例,来看看怎样通过自定义的extractor来访问处理oss上的非文本文件。

这里我们从最终执行的sql query开始,介绍以maxcompute sql为入口,处理存放在oss上的语音文件的使用方法:

这里我们依然建立的外部表,并且通过外部表的schema定义了我们希望通过外部表从语音文件中抽取出来的信息:

一个语音文件中的语句信噪比(snr):sentence_snr

对应语音文件的名字:id

创建了外部表后,通过标准的select语句进行查询,则会触发extractor运行计算。

从这我们可以更直接的感受到,在读取处理oss数据时,除了之前介绍过的对文本文件做简单的反序列化处理,还可以通过在自定义的extractor中实现更复杂的数据处理抽取逻辑:在这个例子中,我们通过自定义的com.aliyun.odps.udf.example.speech. speechstoragehandler 中封装的extractor, 实现了对语音文件计算平均有效语句信噪比的功能,并将抽取出来的结构化数据直接进行sql运算(where sentence_snr &gt; 10), 最终返回所有信噪比大于10的语音文件以及对应的信噪比值。

在oss地址<code>oss://oss-cn-hangzhou-zmf.aliyuncs.com/oss-odps-test/dev/speechsentencetest/</code>上,存储了原始的多个wav格式的语音文件,maxcompute 框架将读取该地址上的所有文件,并在必要的时候进行文件级别的分片,自动将文件分配给多个计算节点处理。每个计算节点上的extractor则负责处理通过inputstreamset分配给该节点的文件集。具体的处理逻辑则与用户单机程序相仿,用户不用关心分布计算中的种种细节,按照类单机方式实现其用户算法即可。

这里简单介绍一下定制化的speechsentencesnrextractor主体逻辑。首先我们在setup接口中读取参数,进行初始化,并且导入语音处理模型(通过resource引入):

<code>extractor()</code>接口中,实现了对语音文件的具体读取和处理逻辑,对读取的数据根据语音模型进行信噪比的计算,并且将结果填充成[snr, id]格式的record。

这个例子中对实现进行了简化,同时也没有包括涉及语音处理的算法逻辑,具体实现可参见:

<a href="https://github.com/aliyun/aliyun-odps-java-sdk/blob/master/odps-sdk-impl/odps-udf-example/src/main/java/com/aliyun/odps/udf/example/speech/speechsentencesnrextractor.java">https://github.com/aliyun/aliyun-odps-java-sdk/blob/master/odps-sdk-impl/odps-udf-example/src/main/java/com/aliyun/odps/udf/example/speech/speechsentencesnrextractor.java</a>

执行一开始的sql语句,可获得计算结果:

可以看到,通过自定义extractor,我们在sql语句上即可分布式地处理多个oss上语音数据文件。同样的,用类似的方法,我们可以方便的利用maxcompute的大规模计算能力,完成对图像,视频等各种类型非结构化数据的处理。

在前面的例子中,一个外部表关联的数据通过location上指定的oss“目录”来实现,而在处理的时候,maxcompute对读取“目录”下面的所有数据,包括子目录中的所有文件。在数据量比较大,尤其是对于随着时间不断积累的数据目录,这样子的全目录扫描,可能带来不必要的额外io以及数据处理时间。 解决这个问题通常有两种做法:

比较直接的减少访问数据量的方法是用户自己负责对数据存放地址做好规划,同时考虑使用多个external table来描述不同部分的数据,让每个externaltable的location指向数据的一个子集。

另一个方面,external table与内部表一样,支持分区表的功能,用户可以利用这个功能来对数据做系统化的管理。这个章节主要介绍一下external table的分区功能。

与maxcompute内部表不同,对于存放在外部存储上(如oss)上面的数据,maxcompute不拥有数据的管理权,所以用户如果希望系统的使用分区表功能的话,在oss上的数据文件的存放路径应该符合一定的格式,具体说来就是路径为如下格式:

<code>partitionkey1=value1\partitionkey2=value2\...</code>

举一个例子,假设用户每天的log文件存放在oss上面,然后希望能在通过maxcompute处理的时候,能够按照粒度为“天”来访问一部分数据。简单假设这些log文件就是csv的格式(复杂自定义格式用法也类似),那么可以使用如下的分区外部表来定义数据:

可以看到,这里和前面的例子不一样的主要就是在定义external table的时候,我们通过<code>partitioned by</code>的语法,来指定该外部表为分区表,这里举了一个三层分区的例子,分区的key分别是 <code>year</code>, <code>month</code> 和 <code>day</code>。为了让这样的分区能生效,在oss上面存储数据的时候需要遵循上面提到的路径格式。这里举一个有效的路径存储layout的例子:

需要再次强调的,因为用户自己离线准备了数据,即通过osscmd或者其他oss工具自行上载到oss存储服务上,所以数据路径格式是由用户决定的,如果想要external table的分区表功能正常工作的话,我们推荐用户上载数据的时候遵循如上路径格式。在这个前提下,通过<code>alter table add partition</code>ddl语句,就可以把这些分区信息引入maxcompute。在这里对应的ddl语句是:

比如如果只想分析2016年6月1号当天,有多少不同的ip出现在log里面,可以通过如下语句实现:

这种情况下, 对log_table_external这个外表对应的目录,将只访问<code>log_data/year=2016/month=06/day=01</code>子目录下的文件(<code>logfile</code>和<code>logfile.1</code>), 而不会对整个<code>log_data/</code>目录作全量数据扫描,避免大量无用的io操作。

同样如果只希望对2016年下半年的数据做分析, 则可以通过

来只访问oss上面存储的下半年的log数据.

如果用户已经有事先存在oss上面的历史数据,但是又不是根据<code>partitionkey1=value1\partitionkey2=value2\...</code>的路径格式来组织存放的,那也是可以通过maxcompute的分区方式来进行访问计算的。虽然通常情况下不这样推荐,但是在非结构化数据处理这方面,maxcompute同样提供了通过自定义路径来引入partition的方法。

比如假设用户的数据路径上只有简单的分区值(而无分区key信息),也就是数据的layout为:

那么要绑定不同的子目录到不同的分区上,可以通过类似如下自定义分区路径的ddl语句实现:

在add partition的时候增加了location信息,从而实现自定义分区数据路径后,即使数据存放不符合推荐的<code>partitionkey1=value1\partitionkey2=value2\...</code>格式,也能正确的实现对子目录数据的分区访问了。

最后在一些特殊情况下,用户可能会有访问某个oss路径上的任意文件子集的需求,而这个文件子集中的文件在路径格式上没有明显的规律性。在这方面maxcompute非结构化数据处理框架也提供了相应的支持,但是在这里不展开做介绍了。如果对于这些高阶的特殊用法,有详细的具体需求和场景描述的话,可以联系maxcompute技术团队。

maxcompute上增添处理非结构化数据的能力,能够有效的利用maxcompute框架上成熟的大规模计算资源,处理来自各种数据源上的数据,从而真正实现数据计算互通。随着maxcompute 2.0框架的逐步上线,这些新功能将为集团内外用户提供更多的计算价值。目前对于oss数据的读取计算功能,在集团内一些急需大规模非结构化数据处理能力的团队中已经使用。maxcompute团队将进一步完善相关功能,并且提供对更多数据源的支持,例如tablestore(ots)等。后继我们也会在ata上做更多的介绍。

MaxCompute上如何处理非结构化数据