天天看点

Presto查询优化拾遗

作者:小虾好望角

Grouped Execution

为了方便大家理解 Grouped Execution 的原理,我们先来介绍两个概念:分桶 和 Hash Join。

1.1 分桶

  • 其实 Hive 表中桶的概念就是 MapReduce 的分区的概念,两者完全相同。物理上每个桶就是目录里的一个文件,一个作业产生的桶(输出文件)数量和reduce任务个数相同。
  • 而分区表的概念,则是新的概念。分区代表了数据的仓库,也就是文件夹目录。每个文件夹下面可以放不同的数据文件,通过文件夹可以查询里面存放的文件,但文件夹本身和数据的内容毫无关系。
  • 桶则是按照数据内容的某个值进行分桶,把一个大文件散列称为一个个小文件。
Presto查询优化拾遗

1.2 Hash Join

主要分为两个阶段:建立阶段(build phase)和探测阶段(probe phase)

  • Bulid Phase:选择一个表(一般情况下是较小的那个表,以减少建立哈希表的时间和空间),对其中每个元组上的连接属性(join attribute)采用哈希函数得到哈希值,从而建立一个哈希表。
  • Probe Phase:对另一个表,扫描它的每一行并计算连接属性的哈希值,与bulid phase建立的哈希表对比,若有落在同一个 bucket 的,如果满足连接谓词(predicate)则连接成新的表。
Presto查询优化拾遗

1.3 Grouped Execution 原理

所谓的 Grouped Execution 针对的是那种数据摆放上进行了 bucketed 存储的数据查询的一种优化手段。

  • 比如两个表 JOIN,在没有 Grouped Execution 的时候做 hash join,一般会把小表作为 build 表发到每个 worker 上,然后针对大表的数据做 probe,这样内存占用由整个小表数据量的大小决定
Presto查询优化拾遗
  • 而如果这两个表是 bucketed 表,而且 bucket 的字段就是这个 JOIN 的字段,那么不同 bucket 之间的数据天然就 JOIN 不上,那么两个表之间的 JOIN 可以转变为相对应的 bucket 之间的 JOIN,最后再做个汇总即可,这样每个 worker 上的内存量会大大降低。

Recoverable Grouped Execution

单个 bucket 的 JOIN 如果失败是可以单独重试的,这也就引出了 Facebook 做的第二个优化:Lifespan 重试。通过 Lifespan 级别的重试可以提高大查询的成功率。

Presto查询优化拾遗

Exchange Materialization

上面两种优化手段的应用场景是很受限的,必须要 bucketed table,而且要用 bucketed 字段 JOIN 才能有用。确实,如果故事只到这里就结束就很没意思了,因此 Facebook 提出了第三个优化手段: Exchange Materialization。

我们知道 Presto 的 Exchange 本来是流式的,上游把数据通过 HTTP 发给下游,下游如果处理不过来会反压上游,中间是没有数据落盘的,如下图所示:

Presto查询优化拾遗

Exchange Materialization 则是要把数据落到盘上,并且按照 JOIN 的 key 组织成 bucketed table,那么从这个 Exchange 节点开始往后就可以应用上面的优化了,如下图所示:

Presto查询优化拾遗

Exchange Materialization 启用参数:

SET SESSION exchange_materialization_strategy='ALL';
SET SESSION partitioning_provider_catalog='hive';
SET SESSION hash_partition_count = 4096;           

Spill to Disk

默认情况下,如果查询执行所请求的内存超过会话属性 query_max_memory 或 query_max_memory_per_node,Presto 就会终止查询。这种机制确保了查询分配内存的公平性,防止了内存分配导致的死锁。当集群中有很多小查询时,它是有效的,但会导致杀死不符合限制的大型查询。

4.1 原理简介

为了克服这种低效率,Presto 引入了可撤销内存的概念。Query 可以请求不计入限制的内存,但内存管理器可以在任何时候撤销这些内存。当内存被撤销时,查询运行程序将中间数据从内存溢出到磁盘,然后继续处理它。

在实践中,当集群空闲且所有内存都可用时,内存密集型查询可能会使用集群中的所有内存。另一方面,当集群没有太多空闲内存时,同一个查询可能被迫使用磁盘作为中间数据的存储。与完全在内存中运行的查询相比,强制溢出到磁盘的查询的执行时间可能要长几个数量级。

请注意,启用 spill-to-disk 并不能保证能够执行所有内存密集型的查询。查询运行程序仍然有可能无法将中间数据划分为足够小的块,使每个块都能装入内存,从而导致从磁盘加载数据时出现内存不足错误。

4.2 支持的操作

1、Joins

  • 当任务并发大于 1 时,将对 build 表进行分区,分区的数量等于 task.concurrency 配置值
  • 在对 build 表进行分区时,spill-to-disk 机制可以减少连接操作所需的峰值内存使用。当查询接近内存限制时,build 表分区的一个子集将溢出到磁盘,join 另一侧表中的记录也会落到相同的分区。
  • 然后,就可以 one-by-one 读取溢出的分区数据以完成连接操作。
  • 需要注意的是 join_spill_enabled 默认值是 false,可通过 set session join_spill_enabled=true; 启用

2、Aggregations

  • 如果正在聚合的 group 中数量很大,则可能需要大量的内存。
  • 当启用 spill-to-disk 时,如果没有足够的内存,则将中间累积聚合结果写入磁盘。
  • 当内存可用时,它们被加载回来进行合并。
  • 开启 spill_enabled 参数后无需单独设置其他参数

Dynamic Filtering

早在2005年,Oracle 数据库就支持比较丰富的 dynamic filtering 功能,而 Spark 和 Presto 在最近版本才开始支持这个功能。为了让大家快速理解这个功能,我们先来看一个 Spark 的栗子:

5.1 Spark 动态分区裁减

SELECT * FROM fact_iteblog
JOIN dim_iteblog
ON (dim_iteblog.partcol = fact_iteblog.partcol) 
WHERE dim_iteblog.othercol > 10           

通过 Spark 的动态分区裁减,可以将执行计划修改成如下形式:

可见,在扫描 fact_iteblog 表时,如果能自动加上了类似于 select partcol from dim_iteblog WHERE dim_iteblog.othercol > 10 的过滤条件,那么当 fact_iteblog 表有大量的分区,而 select partcol from dim_iteblog WHERE dim_iteblog.othercol > 10 查询语句只返回少量的分区,这样可以大大提升查询性能。

5.2 Presto 动态过滤

Presto 也支持上面的功能,这个功能称为动态过滤(Dynamic Filtering)。事实上,Presto 的动态过滤比 Spark 的动态分区裁减要丰富。因为 Spark 动态分区裁减只能用于分区表,而 Presto 的动态过滤支持分区表和非分区表,Presto 的动态分区包含 Partition Pruning(分区表) 以及 Row filtering(非分区表)。直到 Presto 0.241,这个功能正式加入到 master 分支。

注意事项:

  • 目前 PrestoDB 的实现中只有 Hive 数据源支持动态过滤
  • 而且非分区表动态过滤只支持 ORC 数据格式
  • 另外,Presto 的 dynamic filtering 和 grouped_execution 不能同时使用
  • 并且需要设置以下参数
set session enable_dynamic_filtering=true;
set session hive.pushdown_filter_enabled=true;           

Alluxio Cache Service

提高 Presto 查询延迟的一个常见优化是缓存工作集,以避免来自远程数据源或通过慢速网络的不必要的 I/O。Presto 利用 Alluxio 作为缓存层,主要有两种使用方式:Alluxio File System 和 Alluxio Structured Data Service。

注意:目前我们公司 Presto 集群尚未加入 Alluxio 缓存服务,这个优化手段可以作为一个知识储备。

6.1 Alluxio File System

Alluxio File System 将 Presto Hive Connector 作为一个独立的分布式缓存文件系统服务于 HDFS 或 AWS S3、GCP、Azure blob store 等对象存储之上。用户可以通过文件系统接口明确地了解缓存的使用情况并控制缓存。例如,可以预加载 Alluxio 目录中的所有文件,为 Presto 查询预热缓存,并设置缓存数据的 TTL(生存时间),以回收缓存容量。举个栗子:

Presto查询优化拾遗
Presto查询优化拾遗

注意对比 Hive 表的 location,前缀换成了 “alluxio://”

6.2 Alluxio Structured Data Service

Alluxio Structured Data Service 通过基于 Alluxio File System 的目录和缓存文件系统与 Presto 进行交互。这种方式有额外的优势,在不用修改 Hive 表的 location 的前提下,就能无缝访问现有的 Hive 表,并通过合并小文件或转换输入文件的格式进一步性能优化。具体做法如下:

Presto查询优化拾遗

继续阅读