天天看点

Spark笔记三之RDD,算子

RDD核心概念

Resilientdistributed DataSet,弹性分布式数据集

1是只读的,分区记录的集合对象

2分区(partition)是RDD的基本组成单位,其决定了并行计算的粒度。应用程序对RDD的转换最终都是对其分区的转换。

3用户可以指定RDD的分区个数,如果不指定则默认程序分配到的CPU的core数

4每个分区被影射为一个block,在调用hdfs底层时此block对应于hdfs的block(默认128M),spark通过blockManager来管理block是一个block管理器。

RDD的创建

sc.textFile("hdfs://shb01:9000/word").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collect

1文件系统加载

通过textFile从文件系统(hive,hdfs)加载数据创建一个RDD

2RDD之间的转换

例子中flatMap方法会产生RDD,之后在此RDD的基础上每调用一个方法就会创建一个RDD,如果前面的父RDD如果不调用cache方法保存则会消失。

3通过函数转换

通过parallelize,makeRDD方法也可以转换生成RDD

val rdd1 =sc.makeRDD(Array(("A",1),("B",2),("C",3)))

val rdd1 =sc.parallelize(Array(10,12,3,5,23))//sc.parallelize(seq, numSlices)

算子

在spark中函数又称为算子,算子分为两大类转换算子(Transformations)和行动(Action)算子.

转换型算子:不会立即执行,不会触发计算通常使RDD之间互相转换,转换型算子又分为value型算子和key-value型算子

行动行算子:立即执行触发DAG计算

Value型算子:

Key-value型算子:使用key-value型算子必须引入SparkContext._

importorg.apache.spark.SparkContext._

算子作用于rdd上,但由于rdd是由partition(分区)组成,所以算子最终还是作用于分区上。

例:

val file =sc.textFile("hdfs://shb01:9000/word")

val errors = file.filter(line =>line.contains("ERROR")).count()

这是一个过滤日志的代码,filter会在file基础上再产生一个rdd,会作用于每个分区上然后得到一个新分区,这些新分区的总和组成一个rdd。另外一个分区对应一个task。

spark-core_包下的rdd类可以查看算子的定义

package org.apache.spark.rdd

分区依赖关系

算子操作父rdd中的分区并产生子rdd和分区,父rdd如果不保存就会被丢弃,一旦子rdd计算失败就需要重新计算父rdd。Spark中通过rdd之间的依赖关系来确定需要重新计算那些父rdd。

依赖关系分为两种窄依赖(NarrowDependencies)和宽依赖(Wide Dependencies)

窄依赖:子rdd的一个分区依赖一个或多个父rdd中的一个分区

Spark笔记三之RDD,算子

宽依赖:子RDD中的一个分区依赖父RDD的两个或多个或全部分区

Spark笔记三之RDD,算子

窄依赖:重新计算时代价小,只需要计算一个partition在一台机器上就能完成

宽依赖:重新计算时代价大,可能需要计算多个partition,而partition对应的是集群中的block,而这些block很有可能会存储在集群的多个节点上。

所以一般需要将宽依赖的RDD进行缓存

判断宽依赖:

        1一般情况下value型的算子产生的RDD是窄依赖,key-value型的算子产生的RDD是宽依赖。

         2通过调用dependencies来判断是那种分区依赖关系

显示oneToOne是窄依赖,反之如果显示shuffledRDD则是宽依赖

Spark笔记三之RDD,算子

继续阅读