rdd初始参数:上下文和一组依赖
以下需要仔细理清:
a list of partitions
function to compute split (sub rdd impl)
a list of dependencies
partitioner for k-v rdds (optional)
preferred locations to compute each spliton (optional)
dependency代表了rdd之间的依赖关系,即血缘
rdd给子类提供了getdependencies方法来制定如何依赖父类rdd
事实上,在获取first parent的时候,子类经常会使用下面这个方法
可以看到,seq里的第一个dependency应该是直接的parent,从而从第一个dependency类里获得了rdd,这个rdd就是父rdd。
一般的rdd子类都会这么实现compute和getpartition方法,以schemardd举例:
compute()方法调用了第一个父类的compute,把结果rdd copy返回
getpartitions返回的就是第一个父类的partitions
下面看一下dependency类及其子类的实现。
dependency里传入的rdd,就是父rdd本身。
继承结构如下:
narrowdependency代表窄依赖,即父rdd的分区,最多被子rdd的一个分区使用。所以支持并行计算。
子类需要实现方法:
onetoonedependency表示父rdd和子rdd的分区依赖是一对一的。
rangedependency表示在一个range范围内,依赖关系是一对一的,所以初始化的时候会有一个范围,范围外的partitionid,传进去之后返回的是nil。
下面介绍宽依赖。
宽依赖针对的rdd是kv形式的,需要一个partitioner指定分区方式(下一节介绍),需要一个序列化工具类,序列化工具目前的实现如下:
宽依赖和窄依赖对失败恢复时候的recompute有不同程度的影响,宽依赖可能是要全部计算的。
partition具体表示rdd每个数据分区。
partition提供trait类,内含一个index和hashcode()方法,具体子类实现与rdd子类有关,种类如下:
在分析每个rdd子类的时候再涉及。
partitioner决定kv形式的rdd如何根据key进行partition
在shuffledependency里对应一个partitioner,来完成宽依赖下,子rdd如何获取父rdd。
partitioner的伴生对象提供defaultpartitioner方法,逻辑为:
传入的rdd(至少两个)中,遍历(顺序是partition数目从大到小)rdd,如果已经有partitioner了,就使用。如果rdd们都没有partitioner,则使用默认的hashpartitioner。而hashpartitioner的初始化partition数目,取决于是否设置了spark.default.parallelism,如果没有的话就取rdd中partition数目最大的值。
如果上面这段文字看起来费解,代码如下:
hashpartitioner基于java的object.hashcode。会有个问题是java的array有自己的hashcode,不基于array里的内容,所以rdd[array[_]]或rdd[(array[_], _)]使用hashpartitioner会有问题。
顾名思义,getpartition方法实现如下
rangepartitioner处理的kv rdd要求key是可排序的,即满足scala的ordered[k]类型。所以它的构造如下:
内部会计算一个rangbounds(上界),在getpartition的时候,如果rangboundssize小于1000,则逐个遍历获得;否则二分查找获得partitionid。
默认cache()过程是将rdd persist在内存里,persist()操作可以为rdd重新指定storagelevel,
rdd的persist()和unpersist()操作,都是由sparkcontext执行的(sparkcontext的persistrdd和unpersistrdd方法)。
persist过程是把该rdd存在上下文的timestampedweakvaluehashmap里维护起来。也就是说,其实persist并不是action,并不会触发任何计算。
unpersist过程如下,会交给sparkenv里的blockmanager处理。
rdd actions api里提供了checkpoint()方法,会把本rdd save到sparkcontext checkpointdir
目录下。建议该rdd已经persist在内存中,否则需要recomputation。
如果该rdd没有被checkpoint过,则会生成新的rddcheckpointdata。rddcheckpointdata类与一个rdd关联,记录了checkpoint相关的信息,并且记录checkpointrdd的一个状态,
[ initialized --> marked for checkpointing-->
checkpointing in progress --> checkpointed ]
内部有一个docheckpoint()方法(会被下面调用)。
真正的checkpoint触发,在rdd私有方法docheckpoint()里。docheckpoint()会被dagscheduler调用,且是在此次job里使用这个rdd完毕之后,此时这个rdd就已经被计算或者物化过了。可以看到,会对rdd的父rdd进行递归。
rddcheckpointdata的docheckpoint()方法关键代码如下:
runjob最终调的是dagscheduler的runjob。做完后,生成一个checkpointrdd。
具体checkpointrdd相关内容可以参考其他章节。
子类需要实现的方法
略。
部分rdd子类的实现分析,包括以下几个部分:
1) 子类本身构造参数
2) 子类的特殊私有变量
3) 子类的partitioner实现
4) 子类的父类函数实现
checkpointrddpartition继承自partition,没有什么增加。
有一个被广播的hadoop conf变量,在compute方法里使用(readfromfile的时候用)
getpartitions: array[partition]方法:
根据checkpointpath去查看path下有多少个partitionfile,file个数为partition数目。getpartitions方法返回的array[partition]内容为new checkpointrddpartition(i),i为[0, 1, …, partitionnum]
getpreferredlocations(split:partition): seq[string]方法:
文件位置信息,借助hadoop core包,获得block location,把得到的结果按照host打散(flatmap)并过滤掉localhost,返回。
compute(split: partition, context:taskcontext): iterator[t]方法:
调用checkpointrdd.readfromfile(file,
broadcastedconf,context)方法,其中file为hadoopfile path,conf为广播过的hadoop conf。
伴生对象提供writetofile方法和readfromfile方法,主要用于读写hadoop文件,并且利用env下的serializer进行序列化和反序列化工作。两个方法具体实现如下:
创建hadoop文件的时候会若存在会抛异常。把hadoop的outputstream放入serializer的stream里,serializestream.writeall(iterator)写入。
writetofile的调用在rddcheckpointdata类的docheckpoint方法里,如下:
打开hadoop的inutstream,读取的时候使用env下的serializer得到反序列化之后的流。返回的时候,deserializationstream这个trait提供了asiterator方法,每次next操作可以进行一次readobject。
在返回之前,调用了taskcontext提供的addoncompletecallback回调,用于关闭hadoop的inputstream。
getpartitions操作:
根据inputformatclass和conf,通过hadoop inputformat实现类的getsplits(jobcontext)方法得到inputsplits。(orcfile在此处的优化)
这样获得的split同rdd的partition直接对应。
compute操作:
针对本次split(partition),调用inputformat的createrecordreader(split)方法,
得到recordreader<k,v>。这个recordreader包装在iterator[(k,v)]类内,复写iterator的next()和hasnext方法,让compute返回的interruptibleiterator[(k,v)]能够被迭代获得recordreader取到的数据。
getpreferredlocations(split: partition)操作:
在newhadooppartition里serializablewritable将split序列化,然后调用inputsplit本身的getlocations接口,得到有数据分布节点的nodes name列表。
newhadooprdd的子类
复写了getpartitions方法:
newhadooprdd有自己的inputformat实现类和recordreader实现类。在spark/input package下专门写了这两个类的实现。感觉是种参考。
wholetextfilerdd在spark里实现了自己的inputformat。读取的file以k,v的结构获取,k为path,v为整个file的content。
复写createrecordreader以使用wholetextfilerecordreader
复写setmaxsplitsize方法,由于用户可以传入minsplits数目,计算平均大小(splits files总大小除以split数目)的时候就变了。
复写nextkeyvalue方法,会读出指定path下的file的内容,生成new text()给value,结果是string。如果文件正在被别的进行打开着,会返回false。否则把file内容读进value里。
在sparkcontext下提供wholetextfile方法,
用于读取一个路径下的所有text文件,以k,v的形式返回,k为一个文件的path,v为文件内容。比较适合小文件。
全文完 :)