天天看点

Spark Core源码分析: RDD基础RDDDependencyPartitionPartitionerPersistCheckpointAPISubRDDs

rdd初始参数:上下文和一组依赖

以下需要仔细理清:

a list of partitions

function to compute split (sub rdd impl)

a list of dependencies

partitioner for k-v rdds (optional)

preferred locations to compute each spliton (optional)

dependency代表了rdd之间的依赖关系,即血缘

rdd给子类提供了getdependencies方法来制定如何依赖父类rdd

事实上,在获取first parent的时候,子类经常会使用下面这个方法

可以看到,seq里的第一个dependency应该是直接的parent,从而从第一个dependency类里获得了rdd,这个rdd就是父rdd。

一般的rdd子类都会这么实现compute和getpartition方法,以schemardd举例:

compute()方法调用了第一个父类的compute,把结果rdd copy返回

getpartitions返回的就是第一个父类的partitions

下面看一下dependency类及其子类的实现。

dependency里传入的rdd,就是父rdd本身。

继承结构如下:

Spark Core源码分析: RDD基础RDDDependencyPartitionPartitionerPersistCheckpointAPISubRDDs

narrowdependency代表窄依赖,即父rdd的分区,最多被子rdd的一个分区使用。所以支持并行计算。

子类需要实现方法:

onetoonedependency表示父rdd和子rdd的分区依赖是一对一的。

rangedependency表示在一个range范围内,依赖关系是一对一的,所以初始化的时候会有一个范围,范围外的partitionid,传进去之后返回的是nil。

下面介绍宽依赖。

宽依赖针对的rdd是kv形式的,需要一个partitioner指定分区方式(下一节介绍),需要一个序列化工具类,序列化工具目前的实现如下:

Spark Core源码分析: RDD基础RDDDependencyPartitionPartitionerPersistCheckpointAPISubRDDs

宽依赖和窄依赖对失败恢复时候的recompute有不同程度的影响,宽依赖可能是要全部计算的。

partition具体表示rdd每个数据分区。

partition提供trait类,内含一个index和hashcode()方法,具体子类实现与rdd子类有关,种类如下:

Spark Core源码分析: RDD基础RDDDependencyPartitionPartitionerPersistCheckpointAPISubRDDs

在分析每个rdd子类的时候再涉及。

partitioner决定kv形式的rdd如何根据key进行partition

Spark Core源码分析: RDD基础RDDDependencyPartitionPartitionerPersistCheckpointAPISubRDDs

在shuffledependency里对应一个partitioner,来完成宽依赖下,子rdd如何获取父rdd。

partitioner的伴生对象提供defaultpartitioner方法,逻辑为:

传入的rdd(至少两个)中,遍历(顺序是partition数目从大到小)rdd,如果已经有partitioner了,就使用。如果rdd们都没有partitioner,则使用默认的hashpartitioner。而hashpartitioner的初始化partition数目,取决于是否设置了spark.default.parallelism,如果没有的话就取rdd中partition数目最大的值。

如果上面这段文字看起来费解,代码如下:

hashpartitioner基于java的object.hashcode。会有个问题是java的array有自己的hashcode,不基于array里的内容,所以rdd[array[_]]或rdd[(array[_], _)]使用hashpartitioner会有问题。

顾名思义,getpartition方法实现如下

rangepartitioner处理的kv rdd要求key是可排序的,即满足scala的ordered[k]类型。所以它的构造如下:

内部会计算一个rangbounds(上界),在getpartition的时候,如果rangboundssize小于1000,则逐个遍历获得;否则二分查找获得partitionid。

默认cache()过程是将rdd persist在内存里,persist()操作可以为rdd重新指定storagelevel,

rdd的persist()和unpersist()操作,都是由sparkcontext执行的(sparkcontext的persistrdd和unpersistrdd方法)。

persist过程是把该rdd存在上下文的timestampedweakvaluehashmap里维护起来。也就是说,其实persist并不是action,并不会触发任何计算。

unpersist过程如下,会交给sparkenv里的blockmanager处理。

rdd actions api里提供了checkpoint()方法,会把本rdd save到sparkcontext checkpointdir

目录下。建议该rdd已经persist在内存中,否则需要recomputation。

如果该rdd没有被checkpoint过,则会生成新的rddcheckpointdata。rddcheckpointdata类与一个rdd关联,记录了checkpoint相关的信息,并且记录checkpointrdd的一个状态,

[ initialized --> marked for checkpointing-->

checkpointing in progress --> checkpointed ]

内部有一个docheckpoint()方法(会被下面调用)。

真正的checkpoint触发,在rdd私有方法docheckpoint()里。docheckpoint()会被dagscheduler调用,且是在此次job里使用这个rdd完毕之后,此时这个rdd就已经被计算或者物化过了。可以看到,会对rdd的父rdd进行递归。

rddcheckpointdata的docheckpoint()方法关键代码如下:

runjob最终调的是dagscheduler的runjob。做完后,生成一个checkpointrdd。

具体checkpointrdd相关内容可以参考其他章节。

子类需要实现的方法

略。

部分rdd子类的实现分析,包括以下几个部分:

1)  子类本身构造参数

2)  子类的特殊私有变量

3)  子类的partitioner实现

4)  子类的父类函数实现

checkpointrddpartition继承自partition,没有什么增加。

有一个被广播的hadoop conf变量,在compute方法里使用(readfromfile的时候用)

getpartitions: array[partition]方法:

根据checkpointpath去查看path下有多少个partitionfile,file个数为partition数目。getpartitions方法返回的array[partition]内容为new checkpointrddpartition(i),i为[0, 1, …, partitionnum]

getpreferredlocations(split:partition): seq[string]方法:

文件位置信息,借助hadoop core包,获得block location,把得到的结果按照host打散(flatmap)并过滤掉localhost,返回。

compute(split: partition, context:taskcontext): iterator[t]方法:

调用checkpointrdd.readfromfile(file,

broadcastedconf,context)方法,其中file为hadoopfile path,conf为广播过的hadoop conf。

伴生对象提供writetofile方法和readfromfile方法,主要用于读写hadoop文件,并且利用env下的serializer进行序列化和反序列化工作。两个方法具体实现如下:

创建hadoop文件的时候会若存在会抛异常。把hadoop的outputstream放入serializer的stream里,serializestream.writeall(iterator)写入。

writetofile的调用在rddcheckpointdata类的docheckpoint方法里,如下:

打开hadoop的inutstream,读取的时候使用env下的serializer得到反序列化之后的流。返回的时候,deserializationstream这个trait提供了asiterator方法,每次next操作可以进行一次readobject。

在返回之前,调用了taskcontext提供的addoncompletecallback回调,用于关闭hadoop的inputstream。

getpartitions操作:

根据inputformatclass和conf,通过hadoop inputformat实现类的getsplits(jobcontext)方法得到inputsplits。(orcfile在此处的优化)

这样获得的split同rdd的partition直接对应。

compute操作:

针对本次split(partition),调用inputformat的createrecordreader(split)方法,

得到recordreader<k,v>。这个recordreader包装在iterator[(k,v)]类内,复写iterator的next()和hasnext方法,让compute返回的interruptibleiterator[(k,v)]能够被迭代获得recordreader取到的数据。

getpreferredlocations(split: partition)操作:

在newhadooppartition里serializablewritable将split序列化,然后调用inputsplit本身的getlocations接口,得到有数据分布节点的nodes name列表。

newhadooprdd的子类

复写了getpartitions方法:

newhadooprdd有自己的inputformat实现类和recordreader实现类。在spark/input package下专门写了这两个类的实现。感觉是种参考。

wholetextfilerdd在spark里实现了自己的inputformat。读取的file以k,v的结构获取,k为path,v为整个file的content。

复写createrecordreader以使用wholetextfilerecordreader

复写setmaxsplitsize方法,由于用户可以传入minsplits数目,计算平均大小(splits files总大小除以split数目)的时候就变了。

复写nextkeyvalue方法,会读出指定path下的file的内容,生成new text()给value,结果是string。如果文件正在被别的进行打开着,会返回false。否则把file内容读进value里。

在sparkcontext下提供wholetextfile方法,

用于读取一个路径下的所有text文件,以k,v的形式返回,k为一个文件的path,v为文件内容。比较适合小文件。

全文完  :)

继续阅读