本篇主要阐述在taskrunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回。
spark已经安装完毕
spark运行在local mode或local-cluster mode
local-cluster模式也称为伪分布式,可以使用如下指令运行
[1,2,1024] 分别表示,executor number, core number和内存大小,其中内存大小不应小于默认的512m
sparkcontext.scala 整个初始化过程的入口
sparkenv.scala 创建blockmanager, mapoutputtrackermaster, connectionmanager, cachemanager
dagscheduler.scala 任务提交的入口,即将job划分成各个stage的关键
taskschedulerimpl.scala 决定每个stage可以运行几个task,每个task分别在哪个executor上运行
schedulerbackend
最简单的单机运行模式的话,看localbackend.scala
如果是集群模式,看源文件sparkdeployschedulerbackend
<b>步骤1</b>: 根据初始化入参生成sparkconf,再根据sparkconf来创建sparkenv, sparkenv中主要包含以下关键性组件 1. blockmanager 2. mapoutputtracker 3. shufflefetcher 4. connectionmanager
<b>步骤2</b>:创建taskscheduler,根据spark的运行模式来选择相应的schedulerbackend,同时启动taskscheduler,这一步至为关键
taskscheduler.start目的是启动相应的schedulerbackend,并启动定时器进行检测
<b>步骤3</b>:以上一步中创建的taskscheduler实例为入参创建dagscheduler并启动运行
<b>步骤4</b>:启动web ui
还是以最简单的wordcount为例说明rdd的转换过程
上述一行简短的代码其实发生了很复杂的rdd转换,下面仔细解释每一步的转换过程和转换结果
textfile先是生成hadooprdd,然后再通过map操作生成mappedrdd,如果在spark-shell中执行上述语句,得到的结果可以证明所做的分析
flatmap将原来的mappedrdd转换成为flatmappedrdd
利用word生成相应的键值对,上一步的flatmappedrdd被转换成为mappedrdd
步骤2,3中使用到的operation全部定义在rdd.scala中,而这里使用到的reducebykey却在rdd.scala中见不到踪迹。reducebykey的定义出现在源文件pairrddfunctions.scala
细心的你一定会问reducebykey不是mappedrdd的属性和方法啊,怎么能被mappedrdd调用呢?其实这背后发生了一个隐式的转换,该转换将mappedrdd转换成为pairrddfunctions
这种隐式的转换是scala的一个语法特征,如果想知道的更多,请用关键字"scala implicit method"进行查询,会有不少的文章对此进行详尽的介绍。
接下来再看一看reducebykey的定义:
reducebykey最终会调用combinebykey, 在这个函数中pairedrddfunctions会被转换成为shufflerdd,当调用mappartitionswithcontext之后,shufflerdd被转换成为mappartitionsrdd
log输出能证明我们的分析
小结一下整个rdd转换过程
hadooprdd->mappedrdd->flatmappedrdd->mappedrdd->pairrddfunctions->shufflerdd->mappartitionsrdd
整个转换过程好长啊,这一切的转换都发生在任务提交之前。
在对任务运行过程中的函数调用关系进行分析之前,我们也来探讨一个偏理论的东西,作用于rdd之上的transformantion为什么会是这个样子?
对这个问题的解答和数学搭上关系了,从理论抽象的角度来说,任务处理都可归结为“input->processing->output"。input和output对应于数据集dataset.
在此基础上作一下简单的分类
one-one 一个dataset在转换之后还是一个dataset,而且dataset的size不变,如map
one-one 一个dataset在转换之后还是一个dataset,但size发生更改,这种更改有两种可能:扩大或缩小,如flatmap是size增大的操作,而subtract是size变小的操作
many-one 多个dataset合并为一个dataset,如combine, join
one-many 一个dataset分裂为多个dataset, 如groupby
task的提交过程参考本系列中的第二篇文章。本节主要讲解当task在运行期间是如何一步步调用到作用于rdd上的各个operation
taskrunner.run
task.run
task.runtask (task是一个基类,有两个子类,分别为shufflemaptask和resulttask)
rdd.iterator
rdd.computeorreadcheckpoint
rdd.compute
或许当看到rdd.compute函数定义时,还是觉着f没有被调用,以mappedrdd的compute定义为例
注意,这里最容易产生错觉的地方就是map函数,这里的map不是rdd中的map,而是scala中定义的iterator的成员函数map, 请自行参考http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.iterator
compute的计算过程对于shufflemaptask比较复杂,绕的圈圈比较多,对于resulttask就直接许多。
上面的分析知道,wordcount这个job在最终提交之后,被dagscheduler分为两个stage,第一个stage是shufflemaptask,第二个stage是resulttask.
那么shufflemaptask的计算结果是如何被resulttask取得的呢?这个过程简述如下
shffulemaptask将计算的状态(注意不是具体的数据)包装为mapstatus返回给dagscheduler
dagscheduler将mapstatus保存到mapoutputtrackermaster中
resulttask在执行到shufflerdd时会调用blockstoreshufflefetcher的fetch方法去获取数据
第一件事就是咨询mapoutputtrackermaster所要取的数据的location
根据返回的结果调用blockmanager.getmultiple获取真正的数据
blockstoreshufflefetcher的fetch函数伪码
注意上述代码中的getserverstatuses及getmultiple,一个是询问数据的位置,一个是去获取真正的数据。
spark计算速度远胜于hadoop的原因之一就在于中间结果是缓存在内存而不是直接写入到disk,本文尝试分析spark中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互关系。

上图是spark存储子系统中几个主要模块的关系示意图,现简要说明如下
cachemanager rdd在进行计算的时候,通过cachemanager来获取数据,并通过cachemanager来存储计算结果
blockmanager cachemanager在进行数据读取和存取的时候主要是依赖blockmanager接口来操作,blockmanager决定数据是从内存(memorystore)还是从磁盘(diskstore)中获取
memorystore 负责将数据保存在内存或从内存读取
diskstore 负责将数据写入磁盘或从磁盘读入
blockmanagerworker 数据写入本地的memorystore或diskstore是一个同步操作,为了容错还需要将数据复制到别的计算结点,以防止数据丢失的时候还能够恢复,数据复制的操作是异步完成,由blockmanagerworker来处理这一部分事情
connectionmanager 负责与其它计算结点建立连接,并负责数据的发送和接收
blockmanagermaster 注意该模块只运行在driver application所在的executor,功能是负责记录下所有blockids存储在哪个slaveworker上,比如rdd task运行在机器a,所需要的blockid为3,但在机器a上没有blockid为3的数值,这个时候slave worker需要通过blockmanager向blockmanagermaster询问数据存储的位置,然后再通过connectionmanager去获取,具体参看“数据远程获取一节”
由于blockmanager起到实际的存储管控作用,所以在讲支持的操作的时候,以blockmanager中的public api为例
put 数据写入
get 数据读取
remoterdd 数据删除,一旦整个job完成,所有的中间计算结果都可以删除
上述的各个模块由sparkenv来创建,创建过程在sparkenv.create中完成
这段代码容易让人疑惑,看起来像是在所有的cluster node上都创建了blockmanagermasteractor,其实不然,仔细看registerorlookup函数的实现。如果当前节点是driver则创建这个actor,否则建立到driver的连接。
初始化过程中一个主要的动作就是blockmanager需要向blockmanagermaster发起注册
<b>数据写入的简要流程:</b>
rdd.iterator是与storage子系统交互的入口
cachemanager.getorcompute调用blockmanager的put接口来写入数据
数据优先写入到memorystore即内存,如果memorystore中的数据已满则将最近使用次数不频繁的数据写入到磁盘
通知blockmanagermaster有新的数据写入,在blockmanagermaster中保存元数据
将写入的数据与其它slave worker进行同步,一般来说在本机写入的数据,都会另先一台机器来进行数据的备份,即replicanumber=1
写入的具体内容可以是序列化之后的bytes也可以是没有序列化的value. 此处有一个对scala的语法中either, left, right关键字的理解。
首先在查询本机的memorystore和diskstore中是否有所需要的block数据存在,如果没有则发起远程数据获取。
远程获取调用路径, getremote->dogetremote, 在dogetremote中最主要的就是调用blockmanagerworker.syncgetblock来从远程获得数据
上述这段代码中最有意思的莫过于sendmessagereliablysync,远程数据读取毫无疑问是一个异步i/o操作,这里的代码怎么写起来就像是在进行同步的操作一样呢。也就是说如何知道对方发送回来响应的呢?
别急,继续去看看sendmessagereliablysync的定义
要是我说秘密在这里,你肯定会说我在扯淡,但确实在此处。注意到关键字promise和future没。
如果这个future执行完毕,返回s.ackmessage。我们再看看这个ackmessage是在什么地方被写入的呢。看一看connectionmanager.handlemessage中的代码片段
注意,此处的所调用的sentmessagestatus.markdone就会调用在sendmessagereliablysync中定义的promise.success. 不妨看看messagestatus的定义。
我想至此调用关系搞清楚了,scala中的future和promise理解起来还有有点费劲。
在spark的最新源码中,storage子系统引入了tachyonstore. tachyonstore是在内存中实现了hdfs文件系统的接口,主要目的就是尽可能的利用内存来作为数据持久层,避免过多的磁盘读写操作。
一点点疑问,目前在spark的存储子系统中,通信模块里传递的数据即有“心跳检测消息”,“数据同步的消息”又有“数据获取之类的信息流”。如果可能的话,要将心跳检测与数据同步即数据获取所使用的网卡分离以提高可靠性。