天天看点

Spark技术内幕:究竟什么是RDD从一个例子开始

rdd是spark最基本,也是最根本的数据抽象。http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf 是关于rdd的论文。如果觉得英文阅读太费时间,可以看这篇译文:http://shiyanjun.cn/archives/744.html 

本文也是基于这篇论文和源码,分析rdd的实现。

第一个问题,rdd是什么?resilient distributed datasets(rdd,) 弹性分布式数据集。rdd是只读的、分区记录的集合。rdd只能基于在稳定物理存储中的数据集和其他已有的rdd上执行确定性操作来创建。这些确定性操作称之为转换,如map、filter、groupby、join(转换不是程开发人员在rdd上执行的操作)。

rdd不需要物化。rdd含有如何从其他rdd衍生(即计算)出本rdd的相关信息(即lineage),据此可以从物理存储的数据计算出相应的rdd分区。

看一下内部实现对于rdd的概述:

internally, each rdd is characterized by five main properties:

 *

 *  - a list of partitions

 *  - a function for computing each split

 *  - a list of dependencies on other rdds

 *  - optionally, a partitioner for key-value rdds (e.g. to say that the rdd is hash-partitioned)

 *  - optionally, a list of preferred locations to compute each split on (e.g. block locations for

 *    an hdfs file)

每个rdd有5个主要的属性:

一组分片(partition),即数据集的基本组成单位

一个计算每个分片的函数

对parent rdd的依赖,这个依赖描述了rdd之间的lineage

对于key-value的rdd,一个partitioner

一个列表,存储存取每个partition的preferred位置。对于一个hdfs文件来说,存储每个partition所在的块的位置。

org.apache.spark.rdd.rdd是一个抽象类,定义了rdd的基本操作和属性。这些基本操作包括map,filter和persist。另外,org.apache.spark.rdd.pairrddfunctions定义了key-value类型的rdd的操作,包括groupbykey,join,reducebykey,countbykey,saveashadoopfile等。org.apache.spark.rdd.sequencefilerddfunctions包含了所有的rdd都适用的saveassequencefile。

rdd支持两种操作:转换(transformation)从现有的数据集创建一个新的数据集;而动作(actions)在数据集上运行计算后,返回一个值给驱动程序。 例如,map就是一种转换,它将数据集每一个元素都传递给函数,并返回一个新的分布数据集表示结果。另一方面,reduce是一种动作,通过一些函数将所有的元素叠加起来,并将最终结果返回给driver程序。(不过还有一个并行的reducebykey,能返回一个分布式数据集)

spark中的所有转换都是惰性的,也就是说,他们并不会直接计算结果。相反的,它们只是记住应用到基础数据集(例如一个文件)上的这些转换动作。只有当发生一个要求返回结果给driver的动作时,这些转换才会真正运行。这个设计让spark更加有效率的运行。例如,我们可以实现:通过map创建的一个新数据集,并在reduce中使用,最终只返回reduce的结果给driver,而不是整个大的新数据集。

默认情况下,每一个转换过的rdd都会在你在它之上执行一个动作时被重新计算。不过,你也可以使用persist(或者cache)方法,持久化一个rdd在内存中。在这种情况下,spark将会在集群中,保存相关元素,下次你查询这个rdd时,它将能更快速访问。在磁盘上持久化数据集,或在集群间复制数据集也是支持的。

下表列出了spark中的rdd转换和动作。每个操作都给出了标识,其中方括号表示类型参数。前面说过转换是延迟操作,用于定义新的rdd;而动作启动计算操作,并向用户程序返回值或向外部存储写数据。

表1 spark中支持的rdd转换和动作

转换

map(f : t ) u) : rdd[t] ) rdd[u]

filter(f : t ) bool) : rdd[t] ) rdd[t]

flatmap(f : t ) seq[u]) : rdd[t] ) rdd[u]

sample(fraction : float) : rdd[t] ) rdd[t] (deterministic sampling)

groupbykey() : rdd[(k, v)] ) rdd[(k, seq[v])]

reducebykey(f : (v; v) ) v) : rdd[(k, v)] ) rdd[(k, v)]

union() : (rdd[t]; rdd[t]) ) rdd[t]

join() : (rdd[(k, v)]; rdd[(k, w)]) ) rdd[(k, (v, w))]

cogroup() : (rdd[(k, v)]; rdd[(k, w)]) ) rdd[(k, (seq[v], seq[w]))]

crossproduct() : (rdd[t]; rdd[u]) ) rdd[(t, u)]

mapvalues(f : v ) w) : rdd[(k, v)] ) rdd[(k, w)] (preserves partitioning)

sort(c : comparator[k]) : rdd[(k, v)] ) rdd[(k, v)]

partitionby(p : partitioner[k]) : rdd[(k, v)] ) rdd[(k, v)]

动作

count() : rdd[t] ) long

collect() : rdd[t] ) seq[t]

reduce(f : (t; t) ) t) : rdd[t] ) t

lookup(k : k) : rdd[(k, v)] ) seq[v] (on hash/range partitioned rdds)

save(path : string) : outputs rdd to a storage system, e.g., hdfs

注意,有些操作只对键值对可用,比如join。另外,函数名与scala及其他函数式语言中的api匹配,例如map是一对一的映射,而flatmap是将每个输入映射为一个或多个输出(与mapreduce中的map类似)。

除了这些操作以外,用户还可以请求将rdd缓存起来。而且,用户还可以通过partitioner类获取rdd的分区顺序,然后将另一个rdd按照同样的方式分区。有些操作会自动产生一个哈希或范围分区的rdd,像groupbykey,reducebykey和sort等。

下面的例子摘自rdd的论文,实现了处理一个hdfs日志文件中错误日志的逻辑。

spark是一个org.apache.spark.sparkcontext的实例,基本上spark的应用都是以定义一个sparkcontext开始的。textfile的定义如下:

hadoopfile创建了一个org.apache.spark.rdd.hadooprdd,而在hadooprdd上调用map则生成了一个mappedrdd:

errors.cache()并不会立即执行,它的作用是在rdd的计算完成后,将结果cache起来,以供以后的计算使用,这样的话可以加快以后运算的速度。

errors.count() 就触发了一个action,这个时候就需要向集群提交job了:

提交后,sparkcontext会将runjob提交到dagscheduler,dagscheduler会将当前的dag划分成stage,然后生成taskset后通过taskscheduler的submittasks提交tasks,而这又会调用schedulerbackend,schedulerbackend会将这些任务发送到executor去执行。

如何划分stage?如何生成tasks?接下来会进行解析。明天要上班了,今天早点休息吧。

继续阅读