天天看点

presto 基础和执行过程

  • 背景
  • 在数据湖日益发展的阶段,也暴露出很多问题,其中有一点就是数据之间的隔离

    如果想要计算的数据分散在Hdfs、Hive、ES、Hbase、MySql、Kafka中,应该怎么做?

    Facebook科学家们发现目前并没有一款合适的计算引擎,最终决定开发一款MPP交互式计算引擎

    2012年秋天进行研发,2013年开源出来并成功用其对300PB的数据进行运算,奠定了Presto的地位

    1. 特点

    Presto是面向SQL的跨数据源并基于内存计算的分析引擎,且拥有丰富的插件接口

    使用插件可以轻松实现以下问题

    1. 对接自己的存储系统
    2. 添加自定义数据类型
    3. 添加自定义处理函数
    4. 自定义权限控制
    5. 自定义资源控制
    6. 添加query事件处理逻辑
    7. 角色
    presto是典型的master/slave架构
    • cn
      • 管理元数据,work节点
      • 解析查询,拆分查询并进行子任务调度
      • 提供discovery server用于监听任务执行状态
    • work
      • 负责读写和计算

    二、查询流程

    查询组成因素

    一个查询是由Stage、Task、Driver、Split、Operator和DataSource组成

    每个查询分解为多个stage

    每个 stage拆分多个task(每个PlanFragment都是一个Stage)

    每个task处理一个或多个split(Driver)

    每个Driver处理一个split,每个Driver都是作用于一个split的一系列Operator集合

    每个split对应一个或多个page对象 :一个大的数据集中的一个小的子集

    每个page包含多个block对象,每个block对象是个字节数据 (presto中最小处理单位)

    Operator:一个operator代表对一个split的一种操作(如过滤、转化等)

    operator每次只会读取一个page对象 并且每次也只会产生一个page对象

    Cli —> parser—>analysis —> 优化 —> 拆分为子plan —> 调度

    1. 根据Sql构建语法树
    2. 将语法树转化为逻辑执行计划

      通过执行计划编译器将语法树层层展开,把树转成树状的执行结构,每个节点都是一个操作

    3. 对逻辑执行计划做优化 优化规则在 planOptimizers 对象中

      对树状执行结构进行性能优化、转写和分布式处理(此时已包含mr操作)

      性能优化包括将一些能提前加速计算的节点进行下推、动态编译、使用Slice和GC控制等

    4. 生成PlanFragment/Stage并进行任务下发
    Stage之间是树状的结构
    • RootStage 将结果返回给coordinator
    • SourceStage接收coordinator数据
    • 其他stage都有上下游
    • stage分为四种 single(root)、Fixed、source、coordinator_only(DML or DDL)
    • stage 并不会被执行,只是对执行计划进行管理
    Exchange 两个stage数据的交换通过Exchange 两种Exchange ;
    • Output Buffer (生产数据的stage通过此传给下游stage)Exchange Client (下游消费)
    • 如果stage 是source 直接通过connector 读数据,则改stage通过Operator与connector交互

    1、词法和语法分析

    解析SQL语句得到Statement对象,并将Statement对象封装成一个QueryStarter对象放入线程池中等待处理

    Satement对象其实就是一棵树,树中的每个节点都是一段SQL表达式

    2、语义分析

    通过SQL语法解析器把Statement对象解析成一个抽象的语法树AST,只是进行语法解析如果有错误此环节暴露

    语法符合SQL语法,会经过一个逻辑查询计划器组件,通过connector 查询metadata中schema 列名 列类型等,将之与抽象语法树对应起来,生成一个物理的语法树节点 如果有类型错误会在此步报错

    3、逻辑执行计划生成和优化

    执行计划编译器将语法树层层展开,把树转成树状的执行结构,每个节点都是一个操作

    对树状执行结构进行性能优化、转写和分布式处理(此时已包含mr操作)

    性能优化包括将一些能提前加速计算的节点进行下推、动态编译、使用Slice和GC控制等

    动态编译:将执行计划中的ScanFilterAndProjectOperator和FilterAndProjectOperator动态编译为Byte Code,并交给JIT去编译为native代码

    4、逻辑执行计划分段

    将执行计划分发到分布式的逻辑计划器里,进行分布式解析,最后转化为一个个task

    在每个task里面,会将位置信息解析出来,交给执行的plan,由plan将task分给worker执行

    样例SQL

    select c1.rank, count(*) from dim.city c1 join dim.city c2 on c1.id = c2.id where c1.id > 10 group by c1.rank limit 10;
               

    cn会将这条SQL Plan划分为四个SubPlan

    SubPlan的属性

    PlanDistribution :表示Stage的分发方式 SubPlan是否等于Stage

    ① Source类型任务会按照数据源大小确定分配多少个节点进行执行

    ② Fixed会分配固定的节点数进行执行(query.initial-hash-partitions参数,默认是8)

    ③ None表示这个SubPlan只分配到一个节点进行执行

    OutputPartitioning:SubPlan的输出是否按照partitionBy的key值对数据进行Shuffle

    属性只有两个值HASH和NONE

    5、生成物理执行计划并任务调度

    scheduler从meta获取数据分布,然后通过connector获取表的split列表,

    再将这些split配合逻辑执行计划进行task的下发。同时将拆分后的任务通过RESTful下发给Worker节点

    初始化获取Split(HiveSplit)的各种异步对象(SplitSource)

    本地化计算

    Presto在选择Source任务计算节点的时候,对于每一个Split,按下面的策略选择一些minCandidates

    1. 优先选择与Split同一个Host的Worker节点
    2. 如果节点不够优先选择与Split同一个Rack的Worker节点
    3. 如果节点还不够随机选择其他Rack的节点

    对于所有Candidate节点,选择assignedSplits最少的节点

    6、任务执行

    Worker节点将最细粒度的任务封装成一个PrioritizedSplitRunner对象,放入pending split优先级队列中

    三、特性

    1、流水线

    1. 数据模型

    Page:Presto中处理数据的最小单元。一个Page包含多个Block,每个Page不超过1MB且不超过16*1024行数据

    Block:以字节数组的形式存储某个字段的若干行。一个Block包含多个Slice

    Slice:Slice是存储数据的最小单位。其使用Unsafe#copyMemory实现了高效的内存拷贝

    1. 节点内部流水线计算
    2. Worker节点将小任务封装成一个PrioritizedSplitRunner对象,放入pending split优先级队列中
    3. Worker节点启动一定数目的线程进行计算,线程数task.shard.max-threads=availableProcessors() * 4
    4. 每个空闲线程从队列中取出PrioritizedSplitRunner对象执行,每隔1秒,判断任务是否执行完成,如果完成,从allSplits队列中删除,如果没有,则放回pendingSplits队列
    5. 每个任务的执行流程如下图右侧,依次遍历所有Operator,尝试从上一个Operator取一个Page对象,如果取得的Page不为空,交给下一个Operator执行
    6. 节点间流水线计算

    ExchangeOperator的执行流程图,ExchangeOperator为每一个Split启动一个HttpPageBufferClient对象,主动向上一个Stage的Worker节点拉数据,拉取的最小单位也是一个Page对象,取到数据后放入Pages队列

    2、完全基于内存的并行计算

    Presto SQL的执行流程

    1. Cli通过HTTP协议提交SQL查询,被封装成一个SqlQueryExecution对象交给Coordinator的SqlQueryManager#queryExecutor线程池去执行
    2. 每个SqlQueryExecution线程(图中Q-X线程)启动后对查询请求的SQL进行语法解析和优化并最终生成多个Stage的SqlStageExecution任务,每个SqlStageExecution任务仍然交给同样的线程池去执行
    3. 每个SqlStageExecution线程(图中S-X线程)启动后每个Stage的任务按PlanDistribution属性构造一个或者多个RemoteTask通过HTTP协议分配给远端的Worker节点执行
    4. Worker节点接收到RemoteTask请求之后,启动一个SqlTaskExecution线程(图中T-X线程)将这个任务的每个Split包装成一个PrioritizedSplitRunner任务(图中SR-X)交给Worker节点的TaskExecutor#executor线程池去执行
    5. Coordinator通过HTTP协议调用Worker节点的 /v1/task 接口将执行计划分配给所有Worker节点
    6. SubPlan1的每个节点读取一个Split的数据并过滤后将数据分发给每个SubPlan0节点进行Join操作和Partial Aggr操作
    7. SubPlan1的每个节点计算完成后按GroupBy Key的Hash值将数据分发到不同的SubPlan2节点
    8. 所有SubPlan2节点计算完成后将数据分发到SubPlan3节点
    9. SubPlan3节点计算完成后通知Coordinator结束查询,并将数据发送给Coordinator
    10. 源数据的并行读取
    在上面的执行计划中SubPlan1和SubPlan0都是Source节点,其实它们读取HDFS文件数据的方式就是调用的HDFS InputSplit API,然后每个InputSplit分配一个Worker节点去执行,每个Worker节点分配的InputSplit数目上限是参数可配置的,Config中的query.max-pending-splits-per-node参数配置,默认是100。
    1. 分布式的Hash聚合

    上面的执行计划在SubPlan0中会进行一次Partial的聚合计算,计算每个Worker节点读取的部分数据的部分聚合结果,然后SubPlan0的输出会按照group by字段的Hash值分配不同的计算节点,最后SubPlan3合并所有结果并输出

    四、插件

    1. ConnectorMetadata: 管理表的元数据,表的元数据,partition等信息。在处理请求时,需要获取元信息,以便确认读取的数据的位置。Presto会传入filter条件,以便减少读取的数据的范围。元信息可以从磁盘上读取,也可以缓存在内存中。
    2. ConnectorSplit: 一个IO Task处理的数据的集合,是调度的单元。一个split可以对应一个partition,或多个partition。
    3. SplitManager : 根据表的meta,构造split。
    4. SlsPageSource : 根据split的信息以及要读取的列信息,从磁盘上读取0个或多个page,供计算引擎计算。
    hive 的connector中实现了三种类型
    • parquet connector(Impala支持最好)
    • orc connector(Presto支持最好)
    • rc file connector

    五、小结

    本篇文章主要是介绍Presto的基础知识,后面会逐渐带大家阅读Presto的源码,敬请期待

    本人于2020年初开始接触Presto,喜欢Presto的设计(灵活、可扩展性好),同时发现Presto的相关资料很有限

    因此希望学习的同时能够留下更多的资料方便后面接触Presto的朋友更好学习

    国内使用Presto的知名公司有:美团、京东、携程、有赞

    参考文章

    1. https://tech.meituan.com/2014/06/16/presto.html
    2. https://www.jianshu.com/p/938d2a3a055c
    3. https://zhuanlan.zhihu.com/p/101366898
    4. https://www.cnblogs.com/lin-/articles/13616608.html

继续阅读