天天看点

HIVE查询优化

所有的调优都离不开对CPU、内存、IO这三样资源的权衡及调整

Hive QL的执行本质上是MR任务的运行,因此优化主要考虑到两个方面:Mapreduce任务优化、SQL语句优化

一、Mapreduce任务优化 1、设置合理的task数量(map task、reduce task)

这里有几个考虑的点,一方面Hadoop MR task的启动及初始化时间较长,如果task过多,可能会导致任务启动和初始化时间远超逻辑处理时间,这种情况白白浪费了计算资源。另一方面,如果任务复杂,task过少又会导致任务迟迟不能完成,这种情况又使计算资源没有充分利用。

因为其读取输入使用Hadoop API,所以其map数量由以下参数共同决定:

minSize = Math.max(job.getLong(“mapred.min.split.size”, 1), minSplitSize);

mapred.min.split.size:设置每个map处理的最小数据量

minSplitSize:一般默认为都为1,可由子类复写函数protected void setMinSplitSize(long minSplitSize) 重新设置。

blockSize:默认的HDFS文件块大小

goalSize=totalSize/numSplits 期望的每个Map处理的split大小,仅仅是期望的

numSplits :是在 job 启动时通过JobConf.setNumMapTasks(int n) 设置的值,是给框架的map数量的提示 totalSize :整个job所有输入的总大小

splitSize的计算方式如下:

max( minSize, min(blockSize, goalSize) )

最终map数量由以下方式计算得出:

map数量=totalSize/splitSize

可以看出在调整map数量时,可通过调整blockSize和mapred.min.split.size的方式实现,但是调整blockSize可能并不现实,所以程序执行时通过设置mapred.min.split.size参数来设定。

当然,需要特别注意的是,如果文件特别大,需要支持分割才能进行分片,产生多个map,否则单个文件不可分割那就一个map。如果文件都特别小(比blockSize都小),可以使用CombineFileInputFormat将input path中的小文件合并成再送给mapper处理。

reduce task个数确定:

1)设置set mapred.reduce.tasks=?

2)若1)没有设置,hive通过下面两个参数来计算

set hive.exec.reducers.maxs=? (每个job最大的task数量)

set hive.exec.reducers.bytes.per.reducer = ? (每个reduce任务处理的数据量,默认为1G)

2、对小文件合并

过多的小文件对于执行引擎起map reduce任务进行处理非常不利,一个是每个小文件起一个task去处理,非常浪费资源,另一个是过多的小文件也对NameNode造成很大压力(每个小文件都要记录一个元数据 150 byte)。所以,减少小文件的数量也是一个优化措施。

尽可能避免产生:

1)在执行hive查询时合理设置reduce的数量

2)可以使用hadoop archive命令将多个小文件进行归档

3)动态分区插入数据可能会产生大量小文件,所以在使用时尽量避免动态分区

4)对map和reduce的输出进行merge

set hive.merge.mapfiles = true //设置map端输出进行合并,默认为true

set hive.merge.mapredfiles = true //设置reduce端输出进行合并,默认为false

set hive.merge.size.per.task = 25610001000 //设置合并文件的大小

set hive.merge.smallfiles.avgsize=16000000 //当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge

若已产生小文件:

1)set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat 用于设置在执行map前对小文件进行合并,以下参数设置决定了合并文件的大小,并最终减小map任务数

set mapred.max.split.size=256000000 //每个Map最大输入大小(这个值决定了合并后文件的数量)

set mapred.min.split.size.per.node=100000000 //一个节点上split的最小大小(这个值决定了多个DataNode上的文件是否需要合并)

set mapred.min.split.size.per.rack=100000000 //一个机架下split的最小大小(这个值决定了多个机架上的文件是否需要合并)

3、避免数据倾斜

不管task数量是多少,发生数据倾斜就会大大影响查询效率。

1)设置hive.map.aggr=true

该配置可将顶层的聚合操作放在map端进行,以减轻reduce端的压力。

4、并行执行

Hive会将查询转化成一个或者多个阶段,这些阶段可能包括:MR阶段、抽样阶段、合并阶段、limit阶段等。默认单次执行一个阶段,可进行如下配置,使得两个并不冲突的阶段可以并行执行,提高查询效率。

set hive.exec.parallel=true // 可以开启并行执行

set hive.exec.parallel.thread.number=16 // 同一个sql允许最大并行度,默认为8

5、本地模式

这一情况适用于数据集小的情况,并且启动和运行MR任务消耗的时间可能超过逻辑处理的时候,可配置hive在适当情况下自动进行此项优化

条件如下:

输入数据量大小必须小于 hive.exec.mode.local.auto.inputbytes.max(默认128MB)

map任务数量必须小于 hive.exec.mode.local.auto.tasks.max(默认4)

需开启设置:

set hive.exec.mode.local.auto=true // 默认为false

6、JVM重用

hadoop也通过JVM来执行map或者reduce任务,当任务非常多时,启动并初始化任务的耗时会大大增加,有时甚至比逻辑执行时间都要长,所以可通过JVM重用使得JVM实例在同一个job时可以由多task重用,避免增加过多的启动和初始化时间。可通过如下参数设置:

set mapred.job.reuse.jvm.num.tasks=10; // 10为重用个数

二、SQL语句优化

SQL优化有些原则是通用的,部分可参考前一篇总结:

这里针对HIVE来进行一些说明:

1、列裁剪分区裁剪这些基本缩小查找范围的自不必说。 2、join优化

1)在查询语句中如果小表在join的左边,那么hive可能将小表放入分布式缓存,实现map-side join。我们不用考虑哪张表是小表而调整join顺序,新版hive提供了 hive.auto.convert.join 可以来优化,默认是启动的。

hive.mapjoin.smalltable.filesize=25000000(默认值) // 小于该值的表在join时被hive认为是小表

2)在特定情况下,大表也可以使用map-side join,需要满足以下条件

  • 表数据必须按照ON语句中的键进行分桶
  • 其中一张表的分桶数必须是另一张表的分桶数的倍数

    hive这种情况下在map阶段可以进行分桶连接,这一功能需要通过如下设置进行开启

    set hive.enforce.sortmergebucketmapjoin=true

如果分桶表数据是按照连接键或者桶的键进行排序的,hive可以进行一个更快的合并-连接(sort-merge join),需要如下配置

hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat

hive.auto.convert.sortmerge.join=true

hive.enforce.sortmergebucketmapjoin=true

3、group by操作

可先进行map端部分聚合,然后在reduce端最终聚合,进行如下设置

set hive.map.aggr=true; // 开启Map端聚合参数设置

set hive.grouby.mapaggr.checkinterval=100000; // 聚合的键对应的记录条数超过这个值则会进行分拆

如下设置可在出现数据倾斜时分两个MR作业来完成,而不是在一个job中完成

set hive.groupby.skewindata = true (默认为false) // 在第一步mapreduce中map的结果随机分布于到reduce,于是这一步中每个reduce只做部分聚合,在下一个mapreduce中再进行最终聚合。

4、使用LEFT SEMI JOIN (左半连接)替代 IN/EXISTS 子查询

hive中前者 LEFT SEMI JOIN 是后者 IN/EXISTS 更高效的实现

例如:

select a.id, a.name from a where a.id in (select b.id from b);

应该改为 select a.id, a.name from a left semi job b on a.id=b.id;

5、选择合适的分区、分桶、排序方式

distribute by : 用来在map端按键对数据进行拆分,根据reduce的个数进行数据分发,默认是采用hash算法

cluster by :除了有distribute by的功能外,还能对查询结果进行排序,等于distribute by + sort by

sort by :数据在进入reducer之前就排好序,根据指定值对进入到同一reducer的所有行进行排序

order by :对所有数据进行排序(全局有序),若数据量很大,可能需要很多时间

注:严格模式( hive.mapred.mode=strict 默认为 nonstrict ) 用于 hive 在特定情况下阻止任务的提交,针对以下三种情况

(1)对于分区表,不加分区字段过滤条件,不能执行

(2)对于order by语句,必须使用limit语句

(3)限制笛卡尔积的查询(join的时候不使用on,而使用where的)

三、其它 1、存储格式

优先选择列式存储格式如orc、parquet,这将使hive查询时避免全部扫描,列数据存储在一起,只需拿相应列即可,能够大大缩短响应时间

2、压缩方式

在mapreduce的世界中,性能瓶颈主要来自于磁盘IO和网络IO,CPU基本不构成压力,根据需要选择合适的压缩方式,如下图可参考

HIVE查询优化
3、vectorized query( 向量化查询 Hive 0.13中引入 )

通过一次性批量执行1024行而不是每次单行执行,提供扫描、聚合、筛选器和连接等操作的性能。通过如下方式启用:

set hive.vectorized.execution.enabled = true;

set hive.vectorized.execution.reduce.enabled = true;

4、CBO(cost based query optimization)基于成本的优化

hive 0.14.0 加入,hive1.1.0 后默认开启,hive底层自动优化多个join的顺序并选择合适的join算法,优化每个查询的执行逻辑和物理执行计划。使用时需进行如下设置

set hive.cbo.enable = true;

set hive.compute.query.using.stats = true;

set hive.stats.fetch.column.stats = true;

set hive.stats.fetch.partition.stats = true;

5、推测执行

侦测执行缓慢的任务(通常由于负载不均衡或者资源分布原因导致),通过执行其备份任务对相同数据进行重算,加速获取单个task的结果,以此来提高整体的执行效率,可由如下参数控制:

set mapred.map.tasks.speculative.execution=true

set mapred.reduce.tasks.speculative.execution=true

以上内容,如有错误,请看到的小伙伴帮忙指正,多谢。

继续阅读