天天看点

MaxCompute基础与MaxCompute SQL优化MaxCompute SQL基础:MaxCompute SQL优化与大数据开发套件:

总论:

maxcompute 主要服务于批量结构化数据的存储和计算,可以提供海量数据仓库的解决方案以及针对大数据的分析建模服务 。随着社会数据收集手段的不断丰富及完善,越来越多的行业数据被积累下来 。数据规模已经增长到了传统软件行业无法承载的海量数据(百 gb、tb、乃至 pb)级别 。

在分析海量数据场景下,由于单台服务器的处理能力限制,数据分析者通常采用分布式计算模式 。但分布式的计算模型对数据分析人员提出了较高的要求,且不易维护 。使用分布式模型,数据分析人员不仅需要了解业务需求,同时还需要熟悉底层计算模型 。maxcompute 的目的是为用户提供一种便捷的分析处理海量数据的手段 。用户可以不必关心分布式计算细节,从而达到分析大数据的目的 。

   首先maxcompute不同于普通的mysql,oracle这样的关系型数据库,它其实是一个综合性的数据服务平台,它并不能在毫秒级甚至秒级返回查询结果,一条odps命令的执行通常需要经过如下流程:

提交一个sql语句,发送 restful 请求给http服务器

http 服务器做用户认证。认证通过后,请求就会以 kuafu通信协议方式发送给 worker。

worker判断该请求作业是否需要启动fuxi job。如果不需要,本地执行并返回结果。如果需要,则生成一个 instance, 发送给 scheduler。

scheduler把instance信息注册到 ots,将其状态置成 running。scheduler 把 instance 添加到 instance 队列。

worker把 instance id返回给客户端。

scheduler会把instance拆成多个task,并生成任务流dag图。

把可运行的task 放入到优先级队列taskpool中。

scheduler 有一个后台线程定时对taskpool 中的任务进行排序。scheduler 有一个后台线程定时查询计算集群的资源状况。executor在资源未满的情况下,轮询taskpool,请求task。scheduler判断计算资源。若集群有资源,就将该task发给executor。

executor调用sql parse planner,生成sql plan。executor 将 sql plan 转换成计算层的 fuxi job 描述文件。executor 将该描述文件提交给计算层运行,并查询 task 执行状态。task 执行完成后,executor更新 ots 中的 task信息,并汇报给scheudler。

schduler 判断 instance 结束,更新 ots 中 instance 信息,置为 terminated。

客户端接收到返回的 instance id 后,可以通过 instance id 来查询作业状态:

客户端会发送另一个 rest 的请求,查询作业状态。

http 服务器根据配置信息做用户认证。用户认证通过后,把查询的请求发送给 worker。

worker 根据 instanceid 去 ots 中查询该作业的执行状态。worker 将查询到的执行状态返回给客户端。

    其实maxcompute是一个透明的数据服务平台,用户不需要了解分布式数据处理的细节,就可以在client上比较方便的处理pb级别的数据了。所以,在了解了以上内容之后,对于maxcompute只能在分钟级别返回结果就有一个比较清楚的理解了。

ps:以上这些内容在大数据开发套件中都是透明的。

    maxcompute sql与普通关系型数据库的sql大体类似,不同在于maxcompute不支持如事务、主键约束、索引等,可以看成标准sql的子集。

    maxcompute 操作以表为基础,ddl中涉及到对表的一系列操作,包括create,drop,alter

    我们以大数据开发套件上的一张表为例:

在工作流中我们希望任务能够顺利的执行,所以不管是ddl和dml中我们都尽量希望语句返回成功(if not exist,overwrite)

comment包括对应字段的注释和对应表的注释,这些都可以alter

与传统的sql不同,maxcompute面向全域数据,所以即使是用create xxxx select  xxxx from xxx的方式也需要as加上列的名称。

分区字段注明,由于maxcompute操作的数据量很大,通常来说分区字段需要特别关注

生命周期:非常方便的属性,便于用户释放存储空间,简化回收数据的流程,不需要传统的繁杂的空间维护。灵活运用lastdatamodifiedtime与touch(修改为当前时间),关注分区表和非分区表的区别。

对于大表结构的复制,odps提供非常灵活的create like语句。

alter几乎可以对表的所有属性进行更改,包括列,注释,分区,分区属性,生命周期等等。

    常见的比如insert,select,join

     静态分区,分区字段常量;动态分区,可以不指定值,适用select字句中的分区列值

     multi-insert 单次读入,多次写入,减少数据读取。 

     与传统的sql不同的是,distinct作用所有select字段

     编译过程group > select > order/sort/distribute,理解了编译顺序也就理解了各个字句间别名的使用规范。

     distribute by:对数据按照某几列的值做hash分片。

     sort by:局部排序,语句前必须加distribute by。实际上sort by是对distribute by的结果进行局部排序。

     从功能的理解可知:order by不和distribute by/sort by共用,同时group by也不和distribute by/sort by共用。

     join和传统sql的表现较为一致,odps支持left outer join,right outer join,full outer join,inner join。

     mapjoin hint:当大表和小表join的情况下利用mapjoin将用户指定的小表全部加载到内存中,从而加快join的执行速度,同时支持非等值连接,full join 不可用,连接的主表需为大表

    主要包括数学与统计函数,字符串操作函数,时间函数,窗口函数,聚合函数,转置函数等

    就不一一列举了,功能强大。

   包括udf,udtf,udaf

   udf:用户自定义标量函数

   udtf:用户自定义表值函数(返回多个字段)

   udaf:用户自定义聚合函数

   udf:

 继承udf类,实现evaluate方法即可。evaluate方法可以有多个,满足多态特性。

 udaf:

继承com.aliyun.odps.udf.aggregator,主要实现iterate,merge和terminate三个接口,udaf的主要逻辑依赖于这三个接口的实现。此外,还需要用户实现自定义的writable buffer,因为udaf的主要逻辑是将数据进行分片后遍历,处理完之后进行merge。

udtf:

继承com.aliyun.odps.udf.udtf类,主要实现process和forward两个接口,sql中每一条记录都会对应调用一次process,process的参数为udtf的输入参数。输入参数以object[]的形式传入,输出结果通过调用forward函数输出。

udf统一添加方法:

add jar xxx

create function xxx as packagename.classname using 'jarname'

与传统sql类型,区别在于变量引用时前面加$

     explain,show instance,merge smallfile,添加/移除/显示统计信息(add/remove/show statistc 统计值或者符合某个表达式的值)

选择满足需求的小表,比如汇总表。维表尽量选择全量表,事实表尽量选择增量表;

选择产出早的表;

选择可回滚的表,比如使用加购事件表代替加购全流程表;

依赖的n个上游表,尽量保证上游产出时间要均匀,如果有差异,考虑换依赖表;

行数小于100万的表认为是小表,这个时候使用mapjoin性能会提高很多;

读取数据的时候要加上分区等过滤条件,大表变小表。常用过滤条件字段,做成动态分区,方便下游过滤;

不得不读取n天大表的时候,使用unionall方式合并多天数据;

join关联要尽可能是主键关联。关联字段类型要一致;

多天汇总,先生成1天轻度汇总表,多天使用1天数据再汇总;

multiinsert,实现一次读取多次写入;

使用系统udf代替自己的写的udf;

依赖max_pt的,要排除当天依赖;

上游是小时任务,使用max_pt要慎重;

执行超过1个小时任务要关注;

大数据开发套件:

大数据开发套件提供了直观的数据操作入口,数据研发过程代码的编写,调试,优化,发布都可以在大数据开发套件中进行。

拿一个任务耗时过长作例子,看看在大数据开发套件上我们是怎么处理碰到的问题的。

一个task执行时间过长,除掉本身代码的性能问题,那么有两种比较大的可能:

一种是等待问题,一种是数据倾斜问题

等待问题可能是由于系统资源不足,系统繁忙,优先级不够,数据量太大,碰到了坏盘等原因导致的

我们可以通过调整优先级,重跑,过滤初始数据等方法来处理。

倾斜问题则一般是数据本身的问题,常见的数据倾斜是怎么造成的?

shuffle的时候,将各个节点上相同的key拉取到某个节点的一个task进行处理,比如按照key进行聚合或join等操作,如果某个key对应的数据量特别大的话,就会发生数据倾斜现象。数据倾斜就成为了整个task运行时间的短板。

触发shuffle的常见算子:distinct、groupby、join等。

要解决数据倾斜的问题,首先要定位数据倾斜发生在什么地方,首先是哪个stage,直接在d2 ui上看就可以,查看数据是否倾斜了

logview--odps task--detail--stage--longtail

根据stage日志,判断出数据倾斜发生在哪个算子上。

根据倾斜发生的阶段,我们又可以把它们分为map倾斜,reduce倾斜,join倾斜

通常来说,对于倾斜现象,我们首先查看导致数据倾斜的key的数据分布情况,接下来大概有几种处理方案:

1:过滤数据

过滤掉某些脏数据,比如说是否可以去掉null,去掉某些条件对应的值

2:加大并行度

给任务添加处理资源,加大instance的数量,暴力

3:对数据进行拆分,分而治之

如果大表join小表,我们可以用mapjoin,将小表cache进内存

二次分发,加上随机前缀(数据膨胀),拆分数据集为热点+非热点再进一步处理

大表join超大表,还可以考虑bloomfilter

4:组合使用

上述方法,组合使用

5:修改业务

实在没有进步空间,从业务上过滤数据