天天看点

Apache Spark源码走读(七)Standalone部署方式分析&sql的解析与执行

在spark源码走读系列之2中曾经提到spark能以standalone的方式来运行cluster,但没有对application的提交与具体运行流程做详细的分析,本文就这些问题做一个比较详细的分析,并且对在standalone模式下如何实现ha进行讲解。

先从比较简单的说起,所谓的没有ha是指master节点没有ha。

组成cluster的两大元素即master和worker。slave worker可以有1到多个,这些worker都处于active状态。

driver application可以运行在cluster之内,也可以在cluster之外运行,先从简单的讲起即driver application独立于cluster。那么这样的整体框架如下图所示,由driver,master和多个slave worker来共同组成整个的运行环境。

Apache Spark源码走读(七)Standalone部署方式分析&sql的解析与执行

在start_master.sh中最关键的一句就是

检测master的jvm进程

master的日志在$spark_home/logs目录下

worker运行时,需要注册到指定的master url,这里就是spark://localhost:7077.

master侧收到registerworker通知,其处理代码如下

spark-shell属于application,有关appliation的运行日志存储在$spark_home/works目录下

spark-shell作为application,在master侧其处理的分支是registerapplication,具体处理代码如下。

每当有新的application注册到master,master都要调度schedule函数将application发送到相应的worker,在对应的worker启动相应的executorbackend. 具体代码请参考master.scala中的schedule函数,代码就不再列出。

从运行的进程之间的关系可以看出,worker和master之间的连接建立完毕之后,如果有新的driver application连接上master,master会要求worker启动相应的executorbackend进程。此后若有什么task需要运行,则会运行在这些executor之上。可以从以下的日志信息得出此结论,当然看源码亦可。

worker中启动exectuor的相关源码见worker中的receive函数,相关代码如下

关于standalone的部署,需要详细研究的源码文件如下所列。

deploy/master/master.scala

deploy/worker/worker.scala

executor/coarsegrainedexecutorbackend.scala

查看进程之间的父子关系,请用"pstree"

使用下图来小结单master的部署情况。

Apache Spark源码走读(七)Standalone部署方式分析&sql的解析与执行

在谈部署driver到cluster上之前,我们先回顾一下java的一大特性“类的动态加载和反射机制”。本人不是一直写java代码出身,所以好多东西都是边用边学,难免挂一漏万。

所谓的反射,其实就是要解决在运行期实现类的动态加载。

来个简单的例子

谈到这里,就自然想到了一个面试题,“谈一谈class.forname和classloader.loadclass的区别"。说到面试,我总是很没有信心,面试官都很屌的, :)。

上一节之所以写到类的动态加载与反射都是为了谈这一节的内容奠定基础。

将driver application部署到cluster中,启动的时序大体如下图所示。

Apache Spark源码走读(七)Standalone部署方式分析&sql的解析与执行

 首先启动master,然后启动worker

使用”deploy.client"将driver application提交到cluster中

master在收到registerdriver的请求之后,会发送launchdriver给worker,要求worker启动一个driver的jvm process

driver application在新生成的jvm进程中运行开始时会注册到master中,发送registerapplication给master

master发送launchexecutor给worker,要求worker启动执行executorbackend的jvm process

一当executorbackend启动完毕,driver application就可以将任务提交到executorbackend上面执行,即launchtask指令

从deploy.client发送出来的消息被谁接收呢?答案比较明显,那就是master。 master.scala中的receive函数有专门针对requestsubmitdriver的处理,具体代码如下

sparkenv对于整个spark的任务来说非常关键,不同的role在创建sparkenv时传入的参数是不相同的,如driver和executor则存在重要区别。

在executor.scala中,创建sparkenv的代码如下所示:

driver application则会创建sparkcontext,在sparkcontext创建过程中,比较重要的一步就是生成sparkenv,其代码如下:

spark在standalone模式下利用zookeeper来实现了ha机制,这里所说的ha是专门针对master节点的,因为上面所有的分析可以看出master是整个cluster中唯一可能出现单点失效的节点。

采用zookeeper之后,整个cluster的组成如下图所示。

Apache Spark源码走读(七)Standalone部署方式分析&sql的解析与执行

为了使用zookeeper,master在启动的时候需要指定如下的参数,修改conf/spark-env.sh, spark_daemon_java_opts中添加如下选项。

system property

meaning

spark.deploy.recoverymode

set to zookeeper to enable standby master recovery mode (default: none).

spark.deploy.zookeeper.url

the zookeeper cluster url (e.g., 192.168.1.100:2181,192.168.1.101:2181).

spark.deploy.zookeeper.dir

the directory in zookeeper to store recovery state (default: /spark).

在spark中没有直接使用zookeeper的api,而是使用了curator,curator对zookeeper做了相应的封装,在使用上更为友好。

步步演进讲到在standalone模式下,如何利用zookeeper来实现ha。从中可以看出standalone master一个最主要的任务就是resource management和job scheduling,看到这两个主要功能的时候,您也许会想到这不就是yarn要解决的问题。对了,从本质上来说standalone是yarn的一个简化版本。

本系列下篇内容就要仔细讲讲spark部署到yarn上的实现细节。

在即将发布的spark 1.0中有一个新增的功能,即对sql的支持,也就是说可以用sql来对数据进行查询,这对于dba来说无疑是一大福音,因为以前的知识继续生效,而无须去学什么scala或其它script.

一般来说任意一个sql子系统都需要有parser,optimizer,execution三大功能模块,在spark中这些又都是如何实现的呢,这些实现又有哪些亮点和问题?带着这些疑问,本文准备做一些比较深入的分析。

<b>sql模块分析有几大难点</b>,分别为:

sql分析和执行的通用过程,这个与是否用spark无关,应该是非常general的问题

spark sql中具体实现时的整体架构

源码阅读时碰到的scala特殊语法,也就是常说的语法糖问题

sql是一种标准,一种用来进行数据分析的标准,已经存在多年。

在大数据的背景下,随着数据规模的日渐增大,原有的分析技巧是否就过时了呢?答案显然是否定的,原来的分析技巧在既有的分析维度上依然保持有效,当然对于新的数据我们想挖掘出更多有意思有价值的内容,这个目标可以交给数据挖掘或者机器学习去完成。

那么原有的数据分析人员如何快速的转换到big data的平台上来呢,去重新学一种脚本吗,直接用scala或python去编写rdd。显然这样的代价太高,学习成本大。数据分析人员希望底层存储机制和分析引擎的变换不要对上层分析的应用有直接的影响,需求用一句话来表达就是,“直接使用sql语句来对数据进行分析”。

这也是为什么hive兴起的原因了。hive的流行直接证明这种设计迎合了市场的需求。由于hive是采用了hadoop的mapreduce作为分析执行引擎,其处理速度上不是尽如人意。spark以快著称,很快有好事者写出了shark,shark取得了非常不俗的成绩,迎得了极好的口碑。

毕竟shark是游离于spark之外的一个项目,不受spark节制,那么spark开发团队的目标是将对sql支持作用spark的核心功能里面。以上分析就是spark中的sql功能的由来。

上述代码的逻辑非常清晰,就是将存在于person.txt中年龄界于13到19岁的年轻人名字打印出来。

sql语句大家都很熟悉,那么有没有仔细想过其有几大部分组成呢?可能你会说,”这还用问,不就是“select * from tablex where f1=?”,有什么好想吗?“

还是先来看看再说吧,说不定有些新的思维在里面呢?

Apache Spark源码走读(七)Standalone部署方式分析&amp;sql的解析与执行

上图是对最简单的sql语句的重新标注,select表示是一种具体的操作,即查询数据,”f1,f2,f3"表示返回的结果,tablex是数据源,condition部分是查询条件。有没有发觉sql表达式中的顺序与常见的rdd处理逻辑其在表达的顺序上有差异。还是继续用图来表示不同吧。

Apache Spark源码走读(七)Standalone部署方式分析&amp;sql的解析与执行

sql语句在分析执行过程中会经历下图所示的几个步骤

Apache Spark源码走读(七)Standalone部署方式分析&amp;sql的解析与执行

语法解析

操作绑定

优化执行策略

交付执行

语法解析之后,会形成一棵语法树,如下图所示。树中的每个节点是执行的rule,整棵树称之为执行策略。

Apache Spark源码走读(七)Standalone部署方式分析&amp;sql的解析与执行

形成上述的执行策略树还只是第一步,因为这个执行策略可以进行优化,所谓的优化就是对树中节点进行合并或是进行顺序上的调整。

以大家熟悉的join操作为例,下图给出一个join优化的示例。a join b等同于b join a,但是顺序的调整可能给执行的性能带来极大的影响,下图就是调整前后的对比图。

Apache Spark源码走读(七)Standalone部署方式分析&amp;sql的解析与执行

再举一例,一般来说尽可能的先实施聚合操作(aggregate)然后再join

Apache Spark源码走读(七)Standalone部署方式分析&amp;sql的解析与执行

上述一大通分析,希望达到的目的就两个。

语法解析之后生成一个执行策略树

执行策略树可以优化,优化的过程就是对树中节点进行合并或者顺序调整

有了上述内容的铺垫,想必你已经意识到spark如果要很好的支持sql,势必也要完成,解析,优化,执行的三大过程。

整个sql部分的代码,其大致分类如下图所示:

Apache Spark源码走读(七)Standalone部署方式分析&amp;sql的解析与执行

sqlparser生成logicplan tree

analyzer和optimizer将各种rule作用于logicalplan tree

最终优化生成的logicalplan生成spark rdd

最后将生成的rdd交由spark执行

在sql中引入了一种新的rdd,即schemardd,且看schemardd的构造函数:

 构造函数中总共两入参一为sparkcontext,另一个logicalplan。logicalplan又是如何生成的呢?

要回答这个问题,不得不回到整个问题的入口点sql函数,sql函数的定义如下

parsesql(sqltext)负责生成logicalplan,parsesql就是sqlparser的一个实例。

sqlparser这一部分的代码要理解起来关键是要搞清楚standardtokenparsers的调用规则,里面有一大堆的符号,如果不理解是什么意思,估计很难理清头绪。

由于apply函数可以不被显示调用,所以parsesql(sqltext)一句其实会隐式的调用sqlparser中的apply函数。

最最最让人蛋疼的一行代码就是phrase(query)(new lexical.scanner(input))这里了,翻译过来就是如果输入的input字符串符合lexical中定义的规则,则继续使用query处理。

<b>看一下query的定义是什么:</b>

到了这里终于看到有logicalplan了,也就是说将普通的string转换成logicalplan在这里发生了。

query这段代码同时说明,在目前的spark sql中仅支持select和insert两种操作,至于delete, update暂不支持。

注:即便是到现在,估计你和当初一样对于sqlparser的使用还是一头雾水,不要紧,请参考ref[3]和[4]中的内容,至于那些稀奇古怪的符号到底是什么意思,请参考ref[5].

第一阶段,将string转换成为logicalplan tree,第二阶段将各种规则作用于logicalplan。

在第一阶段中展示的代码,哪一句会触发优化规则呢?是sql函数中的"result.queryexecution.tordd",此处的queryexecution就是queryexecution。这里又涉及到scala的一个语法糖问题。queryexecution是一个抽象类,但却看到了下述的代码

怎么可以创建抽象类的实例?我的世界坍塌了,呵呵。不要紧张,这在scala的世界是允许的,只不过scala是隐含的创建了一个queryexecution的子类并初始化而已,java里的原则还是对的,人家背后有猫腻。

ok,轮到阶段2中最重要的角色queryexecution闪亮登场了

<b>三大步:</b>

lazy val analyzed = analyzer(logical)

lazy val optimizedplan = optimizer(analyzed)

lazy val sparkplan = planner(optimizedplan).next()

无论analyzer还是optimizer,它们都是ruleexecutor的子类,

ruleexecutor的默认处理函数是apply,对所有的子类都是一样的,ruleexecutor的apply函数定义如下,

对于ruleexecutor的子类来说,最主要的是定义自己的batches,来看analyzer中的batches是如何定义的

batch中定义了一系列的规则,这里再次出现语法糖问题。“如何理解::这个操作符”? ::表示cons的意思,即连接生成一个list.

batch构造函数中需要指定一系列的rule,像resolvereferences就是rule,有关rule的代码就不一一分析了。

在阶段3最主要的代码就两行:

lazy val executeplan: sparkplan = prepareforexecution(sparkplan)

lazy val tordd: rdd[row] = executedplan.execute()

与logicalplan不同,sparkplan最重要的区别就是有execute函数

针对sparkplan的具体实现,又要分成unarynode, leafnode和binarynode,简要来说即单目运算符操作,叶子结点,双目运算符操作。每个子类的具体实现可以自行参考源码。

rdd被触发真正执行的过程在看了前面几篇文章之后想来难不住你来,所有的所有都在这一行代码。

如果真的不明白,建议回头再读一下spark job的执行过程分析。

行为至此,可以收笔了。应该说sql部分的代码涉及到的知识点还是比较多的,最重要的是理清两点,即sql语句的通用处理过程。另一个是spark sql子系统中具体实现机制。

spark sql子模块的具体实现紧紧围绕logicalplan tree展开,一是用sqlparser来生成logicalplan,二是用ruleexecutor将各种rule作用于logicalplan。最后生成普通的rdd将会给spark core处理。

<a href="http://jgsj.iteye.com/blog/2050696">spark catalyst 源码分析</a>

<a href="http://sqlblog.com/blogs/paul_white/archive/2012/04/28/query-optimizer-deep-dive-part-1.aspx">query optimizer deep diver</a>

<a href="http://kerflyn.wordpress.com/2012/08/25/playing-with-scala-parser-combinator/">playing with scala parser combinator</a>

<a href="http://blog.xebia.com/2009/10/21/parsing-text-with-scala/">parsing text with scala</a>

<a href="http://www.scala-lang.org/api/2.10.4/index.html#scala.util.parsing.combinator.parsers%24parser">parser api</a>