天天看点

《Spark大数据分析:核心概念、技术及实践》一3.5 API

 本节书摘来自华章出版社《spark大数据分析:核心概念、技术及实践》一书中的第3章,第3.5节,作者[美] 穆罕默德·古勒(mohammed guller),更多章节内容可以访问云栖社区“华章计算机”公众号查看。

3.5 api

应用可以通过使用spark提供的库获得spark集群计算的能力。这些库都是用scala编写的。但是spark提供了各种语言的api。在本书编写之际,spark api提供了如下语言的支持:scala、java、python和r。可以使用上面的任何语言来开发spark应用。也有其他语言(比如clojure)的非官方支持。

spark api主要由两个抽象部件sparkcontext和弹性分布式数据集(rdd)构成。应用程序通过这两个部件和spark进行交互。应用程序可以连接到spark集群并使用相关资源。接下来会介绍这两个抽象部件,然后详细介绍rdd。

3.5.1 sparkcontext

sparkcontext是一个在spark库中定义的类。它是spark库的入口点。它表示与spark集群的一个连接。使用spark api创建的其他一些重要对象都依赖于它。

每个spark应用程序都必须创建一个sparkcontext类实例。目前,每个spark应用程序只能拥有一个激活的sparkcontext类实例。如果要创建一个新的实例,那么在此之前必须让当前激活的类实例失活。

sparkcontext有多个构造函数。最简单的一个不需要任何参数。一个sparkcontext类实例可以用如下代码创建。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

在这种情况下,sparkcontext的配置信息都从系统属性中获取,比如spark master的地址、应用名称等。也可以创建一个sparkconf类实例,然后把它作为sparkcontext的参数从而设定配置信息。sparkconf 是spark库中定义的一个类。通过这种方式可以像下面这样设置各种spark配置信息。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

sparkconf为设置诸如spark master这样的常用配置信息都提供了对应的显式方法。此外,它还提供了一个通用的方法用于设置配置信息,它使用键-值对进行设置。sparkcontext和sparkconf可以使用的参数将在第4章进行详细介绍。

在本章接下来的例子中会继续使用上面创建的变量sc。

3.5.2 rdd

弹性分布式数据集(rdd)表示一个关于分区数据元素的集合,可以在其上进行并行操作。它是spark的主要数据抽象概念。它是spark库中定义的一个抽象类。

从概念上看,除了可以用于表示分布式数据集和支持惰性操作的特性外,rdd类似于spark的集合。惰性操作将在本章稍后部分详细介绍。

下面分别简要描述rdd的特点。

不可变性

rdd是一种不可变的数据结构。一旦创建,它将不可以在原地修改。基本上,一个修改rdd的操作都会返回一个新的rdd。

分片

rdd表示的是一组数据的分区。这些分区分布在多个集群节点上。然而,当spark在单个节点运行时,所有的分区数据都会在当前节点上。

spark存储rdd的分区和数据集物理分区之间关系的映射关系。rdd是各个分布式数据源之中数据的一个抽象,它通常表示分布在多个集群节点上的分区数据。比如hdfs将数据分片或分块分散存储在集群中。默认情况下,一个rdd分区对应一个hdfs文件分片。其他的分布式数据源(比如cassandra)同样也将数据分片分散存储在集群多个节点上。然而,一个rdd对应多个cassandra分片。

容错性

rdd为可容错的。rdd代表了分散在集群中多个节点的数据,但是任何一个节点都有可能出故障。诚如之前所说的,一个节点出故障的可能性和集群节点数量成正比。集群越大,在任何一个节点它出故障的可能性就越高。

rdd会自动处理节点出故障的情况。当一个节点出故障时,该节点上存储的数据将无法被访问。此时,spark会在其他节点上重建丢失的rdd分区数据。spark存储每一个rdd的血统信息。通过这些血统信息,spark可以恢复rdd的部分信息,当节点出故障的时候,它甚至可以恢复整个rdd。

接口

需要着重指出的是,rdd是一个处理数据的接口。在spark库中它定义为一个抽象类。rdd为多种数据源提供了一个处理数据的统一接口,包括hdfs、hbase、cassandra等。这个接口同样可以用于处理存储于多个节点内存中的数据。

spark为不同数据源提供了各自具体的实现类,比如hadooprdd、parallelcollection-rdd、jdbcrdd和cassandrardd。它们都支持基础的rdd接口。

强类型

rdd类有一个参数用于表示类型,这使得rdd可以表示不同类型的数据。rdd可以表示同一类型数据的分布式集合,包括integer、long、float、string或者应用开发者自己定义的类型。而且,一个应用总会使用某种类型的rdd,包括integer、long、float、double、string或自定义类型。

驻留在内存中

之前已经提及了spark的内存集群计算特性。rdd类提供一套支持内存计算的api。spark允许rdd在内存中缓存或长期驻留。就像之前所说的,对一个缓存在内存中的rdd进行操作比操作没缓存的rdd要快很多。

3.5.3 创建rdd

由于rdd是一个抽象类,因此无法直接创建一个rdd的类实例。sparkcontext类提供了一个工厂方法用来创建rdd实现类的类实例。rdd也可以通过由其他rdd执行转换操作得到。就像之前所说的,rdd是不可变的。任何一个对rdd的修改操作都将返回一个代表修改后数据的新rdd。

本节总结了几种创建rdd的常见方法。在下面的示例代码中,sc是一个sparkcontext的类实例。之前的章节已经介绍了怎么创建它。

parallelize

这个方法用于从本地scala集合创建rdd实例。它会对scala集合中的数据重新分区、重新分布,然后返回一个代表这些数据的rdd。这个方法很少用在生产上,但是使用它有助于学习spark。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

textfile

textfile方法用于从文本文件创建rdd实例。它可以从多种来源读取数据,包括单个文件、本地同一目录下的多个文件、hdfs、amazon s3,或其他hadoop支持的存储系统。这个方法返回一个rdd,这个rdd代表的数据集每个元素都是一个字符串,每一个字符串代表输入文件中的一行。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

上面的代码表示从存储于hdfs上的一个文件或者目录创建rdd实例。

textfile方法也可以读取压缩文件中的数据。而且,它的参数中可以存在通配符,用于从一个目录中读取多个文件。下面是一个例子。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

textfile的第二个参数是一个可选参数,它用于指定分区的个数。默认情况下,spark为每一个文件分块创建一个分区。可以设置成一个更大的数字从而提高并行化程度,但是设置成一个小于文件分块数的数字是不可以的。

wholetextfiles

这个方法读取目录下的所有文本文件,然后返回一个由键值型rdd。返回rdd中的每一个键值对对应一个文件。键为文件路径,对应的值为该文件的内容。这个方法可以从多种来源读取数据,包括本地文件系统、hdfs、amazon s3,或者其他hadoop支持的存储系统。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

sequencefile

sequencefile方法从sequencefile文件中获取键值对数据,这些sequencefile文件可以存储于本地文件系统、hdfs或者其他hadoop支持的存储系统。这个方法返回一个键值对型rdd实例。当使用这个方法的时候,不仅需要提供文件名,还需要提供文件中数据键和值各自的类型。

3.5.4 rdd操作

spark应用使用rdd类或其继承类中定义的方法来处理数据。这些方法也称为操作。既然scala中可以把一个方法当成操作符使用,那么rdd中的方法有时也称为操作符。

spark的美好之处就在于同样一个rdd方法既可以处理几字节的数据也可以处理pb级的数据。而且spark应用可以使用同样的方法去处理数据,无论它是存储于本地还是存储于一个分布式存储系统。这样的灵活性使得开发者可以在单机上开发、调试、测试spark应用,然后不用改动任何代码就可以将它部署到一个大集群上。

rdd操作可以归为两类:转换和行动。转换将会创建一个新的rdd实例。行动则会将结果返回给驱动程序。

转换

转换指的是在原rdd实例上进行计算,而后创建一个新的rdd实例。本节将介绍一些常见的转换操作。

从概念上看,rdd转换操作的类似于scala集合上的方法。主要的区别在于scala集合方法操作的数据是在单机内存中的,而rdd的转换操作可以处理分布在集群各个节点上的数据。另外一个重要的区别是,rdd转换操作是惰性的,而scala集合方法不是。本章余下部分会详细介绍这些内容。

map

map方法是一个高阶方法,它把一个函数作为它的参数,并把这个函数作用在原rdd的每个元素上,从而创建一个新rdd实例。这个作为参数的函数拥有一个参数并返回一个值。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

filter

filter方法是一个高阶方法,它把一个布尔函数作为它的参数,并把这个函数作用在原rdd的每个元素上,从而创建一个新rdd实例。一个布尔函数只有一个参数作为输入,返回true或false。filter方法返回一个新的rdd实例,这个rdd实例代表的数据集由布尔函数返回true的元素构成。因此,新rdd实例代表的数据集是原rdd的子集。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

flatmap

flatmap方法是一个高阶方法,它把一个函数作为它的参数,这个函数处理原rdd中每个元素返回一个序列。扁平化这个序列的集合得到一个数据集,flatmap方法返回的rdd就代表这个数据集。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

mappartitions

mappartitions是一个高阶方法,它使你可以以分区的粒度来处理数据。相比于一次处理一个元素,mappartitions一次处理处理一个分区,每个分区被当成一个迭代器。mappartitions方法的函数参数把迭代器作为输入,返回另外一个迭代器作为输出。map-partitions将自定义函数参数作用于每一个分区上,从而返回一个新rdd实例。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

union

union方法把一个rdd实例作为输入,返回一个新rdd实例,这个新rdd实例的数据集是原rdd和输入rdd的合集。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

intersection

intersection方法把一个rdd实例作为输入,返回一个新rdd实例,这个新rdd实例代表的数据集是原rdd和输入rdd的交集。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

这是另外一个例子。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

subtract

subtract方法把一个rdd实例作为输入,返回一个新rdd实例,这个新rdd实例代表的数据集由那些存在于原rdd实例中但不在输入rdd实例中的元素构成。

《Spark大数据分析:核心概念、技术及实践》一3.5 API
《Spark大数据分析:核心概念、技术及实践》一3.5 API

distinct

rdd实例上的distinct方法返回一个新rdd实例,这个新rdd实例的数据集由原rdd的数据集去重后得到。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

cartesian

cartesian方法把一个rdd实例作为输入,返回一个新rdd实例,这个新rdd实例的数据集由原rdd和输入rdd的所有元素的笛卡儿积构成。返回的rdd实例的每一个元素都是一个有序二元组,每一个有序二元组的第一个元素来自原rdd,第二个元素来自输入rdd。元素的个数等于原rdd的元素个数乘以输入rdd的元素个数。

这个方法类似于sql中的join操作。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

zip

zip方法把一个rdd实例作为输入,返回一个新rdd实例,这个新rdd实例的每一个元素是一个二元组,二元组的第一个元素来自原rdd,第二个元素来自输入rdd。和cartesian方法不同的是,zip方法返回的rdd的元素个数于原rdd的元素个数。原rdd的元素个数和输入rdd的相同。进一步地说,原rdd和输入rdd不仅有相同的分区数,每个分区还有相同的元素个数。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

zipwithindex

zipwithindex方法返回一个新rdd实例,这个新rdd实例的每个元素都是由原rdd元素及其下标构成的二元组。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

groupby

groupby是一个高阶方法,它将原rdd中的元素按照用户定义的标准分组从而组成一个rdd。它把一个函数作为它的参数,这个函数为原rdd中的每一个元素生成一个键。groupby把这个函数作用在原rdd的每一个元素上,然后返回一个由二元组构成的新rdd实例,每个二元组的第一个元素是函数生成的键,第二个元素是对应这个键的所有原rdd元素的集合。其中,键和原rdd元素的对应关系由那个作为参数的函数决定。

需要注意的是,groupby是一个费时操作,因为它可能需要对数据做shuffle操作。

假设有一个csv文件,文件的内容为公司客户的姓名、年龄、性别和邮编。下面的示例代码演示了按照邮编将客户分组。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

keyby

keyby方法与groupby方法相类似。它是一个高阶方法,把一个函数作为参数,这个函数为原rdd中的每一个元素生成一个键。keyby方法把这个函数作用在原rdd的每一个元素上,然后返回一个由二元组构成的新rdd实例,每个二元组的第一个元素是函数生成的键,第二个元素是对应这个键的原rdd元素。其中,键和原rdd元素的对应关系由那个作为参数的函数决定。返回的rdd实例的元素个数和原rdd的相同。

groupby和keyby的区别在于返回rdd实例的元素上。虽然都是二元组,但是

groupby返回的二元组中的第二个元素是一个集合,而keyby的是单个值。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

sortby

sortby是一个高阶方法,它将原rdd中的元素进行排序后组成一个新的rdd实例返回。它拥有两个参数。第一个参数是一个函数,这个函数将为原rdd的每一个元素生成一个键。第二个参数用来指明是升序还是降序排列。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

下面是另一个示例。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

pipe

pipe方法可以让你创建子进程来运行一段外部程序,然后捕获它的输出作为字符串,用这些字符串构成rdd实例返回。

randomsplit

randomsplit方法将原rdd分解成一个rdd数组。它的参数是分解的权重。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

coalesce

coalesce方法用于减少rdd的分区数量。它把分区数作为参数,返回分区数等于这个参数的rdd实例。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

使用coalesce方法时需要小心,因为减少了rdd的分区数也就意味着降低了spark的并行能力。它通常用于合并小分区。举例来说,在执行filter操作之后,rdd可能会有很多小分区。在这种情况下,减少分区数能提升性能。

repartition

repartition方法把一个整数作为参数,返回分区数等于这个参数的rdd实例。它有助于提高spark的并行能力。它会重新分布数据,因此它是一个耗时操作。

coalesce和repartition方法看起来一样,但是前者用于减少rdd中的分区,后者用于增加rdd中的分区。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

sample

sample方法返回原rdd数据集的一个抽样子集。它拥有三个参数。第一个参数指定是有放回抽样还是无放回抽样。第二个参数指定抽样比例。第三个参数是可选的,指定抽样的随机数种子。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

键值对型rdd的转换

除了上面介绍的rdd转换之外,针对键值对型rdd还支持其他的一些转换。下面将介绍只能作用于键值对型rdd的常用转换操作。

keys

keys方法返回只由原rdd中的键构成的rdd。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

values

values方法返回只由原rdd中的值构成的rdd。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

mapvalues

mapvalues是一个高阶方法,它把一个函数作为它的参数,并把这个函数作用在原rdd的每个值上。它返回一个由键值对构成的rdd。它和map方法类似,不同点在于它把作为参数的函数作用在原rdd的值上,所以原rdd的键都没有变。返回的rdd和原rdd拥有相同的键。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

join

join方法把一个键值对型rdd作为参数输入,而后在原rdd和输入rdd上做内连接操作。它返回一个由二元组构成的rdd。二元组的第一个元素是原rdd和输入rdd都有的键,第二个元素是一个元组,这个元组由原rdd和输入rdd中键对应的值构成。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

leftouterjoin

leftouterjoin方法把一个键值对型rdd作为参数输入,而后在原rdd和输入rdd之间做左连接操作。它返回一个由键值对构成的rdd。键值对的第一个元素是原rdd中的键,第二个元素是一个元组,这个元组由原rdd中键对应的值和输入rdd中的可选值构成。可选值用option类型表示。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

rightouterjoin

rightouterjoin方法把一个键值对型rdd作为参数输入,而后在原rdd和输入rdd之间做右连接操作。它返回一个由键值对构成的rdd。键值对的第一个元素是输入rdd中的键,第二个元素是一个元组,这个元组由原rdd中的可选值和输入rdd中键对应的值构成。可选值用option类型表示。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

fullouterjoin

fullouterjoin方法把一个键值对型rdd作为参数输入,而后在原rdd和输入rdd之间做全连接操作。它返回一个由键值对构成的rdd。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

samplebykey

samplebykey通过在键上抽样返回原rdd的一个子集。它把对每个键的抽样比例作为输入参数,返回原rdd的一个抽样。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

subtractbykey

subtractbykey方法把一个键值对型rdd作为输入参数,返回一个键值对rdd,这个键值对rdd的键都是只存在原rdd中但是不存在于输入rdd中。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

groupbykey

groupbykey方法返回一个由二元组构成的rdd,二元组的第一个元素是原rdd的键,第二个元素是一个集合,集合由该键对应的所有值构成。它类似于上面介绍过的group-by方法。二者的区别在于groupby是一个高阶方法,它的参数是一个函数,这个函数为原rdd的每一个元素生成一个键。groupbykey方法作用于rdd的每一个键值对上,故不需要一个生成键的函数作为输入参数。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

应当尽量避免使用groupbykey。它是一个耗时操作,因为它可能会对数据进行shuffle操作。在大多数情况下,都有不使用groupbykey的更好的替代方案。

reducebykey

reducebykey是一个高阶方法,它把一个满足结合律的二元操作符当作输入参数。它把这个操作符作用于有相同键的值上。

一个二元操作符把两个值当作输入参数,返回一个值。一个满足结合律的二元操作符返回同样的结果,但是它不关心操作数的分组情况。

reducebykey方法可以用于对同一键对应的值进行汇总操作。比如它可以用于对同一键对应的值进行求和,求乘积,求最小值,求最大值。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

对于基于键的汇总操作、合并操作,reducebykey比groupbykey更合适。

操作

操作指的是那些返回值给驱动程序的rdd方法。本节介绍一些rdd中常用的操作。

collect

collect方法返回一个数组,这个数组由原rdd中的元素构成。在使用这个方法的时候需要小心,因为它把在worker节点的数据移给了驱动程序。如果操作一个有大数据集的rdd,它有可能会导致驱动程序崩溃。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

count

count方法返回原rdd中元素的个数。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

countbyvalue

countbyvalue方法返回原rdd中每个元素的个数。它返回是一个map类实例,其中,键为元素的值,值为该元素的个数。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

first

first方法返回原rdd中的第一个元素。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

max

max方法返回rdd中最大的元素。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

min

min方法返回rdd中最小的元素。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

take

take方法的输入参数为一个整数n,它返回一个由原rdd中前n个元素构成的rdd。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

takeordered

takeordered方法的输入参数为一个整数n,它返回一个由原rdd中前n小的元素构成的rdd。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

top

top方法的输入参数为一个整数n,它返回一个由原rdd中前n大的元素构成的rdd。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

fold

fold是一个高阶方法,用于对原rdd的元素做汇总操作,汇总的时候使用一个自定义的初值和一个满足结合律的二元操作符。它首先在每一个rdd的分区中进行汇总,然后再汇总这些结果。

初值的取值取决于rdd中的元素类型和汇总操作的目的。比如,给定一个元素为整数的rdd,为了计算这个rdd中所有元素的和,初值取为0。相反,给定一个元素为整数的rdd,为了计算这个rdd中所有元素的乘积,初值则应取为1。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

reduce

reduce是一个高阶方法,用于对原rdd的元素做汇总操作,汇总的时候使用一个满足结合律和交换律的二元操作符。它类似于fold方法,然而,它并不需要初值。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

键值对型rdd上的操作

键值对rdd上有一些额外的操作,我们在下面进行介绍。

countbykey

countbykey方法用于统计原rdd每个键的个数。它返回一个map类实例,其中,键为原rdd中的键,值为个数。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

lookup

lookup方法的输入参数为一个键,返回一个序列,这个序列的元素为原rdd中这个键对应的值。

数值型rdd上的操作

如果rdd的元素类型为integer、long、float或double,则这样的rdd为数值型rdd。这类rdd还有一些对于统计分析十分有用的额外操作,下面将介绍一些常用的行动。

mean

mean方法返回原rdd中元素的平均值。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

stdev

stdev方法返回原rdd中元素的标准差。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

sum

sum方法返回原rdd中所有元素的和。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

variance

variance方法返回原rdd中元素的方差。

3.5.5 保存rdd

一般来说,数据处理完毕后,结果会保存在硬盘上。spark允许开发者将rdd保存在任何hadoop支持的存储系统中。保存在硬盘上的rdd可以被其他spark应用或hadoop应用使用。

本节介绍将rdd保存成文件的常用方法。

saveastextfile

saveastextfile方法将原rdd中的元素保存在指定目录中,这个目录位于任何hadoop支持的存储系统中。每一个rdd中的元素都用字符串表示并另存为文本中的一行。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

saveasobjectfile

saveasobjectfile方法将原rdd中的元素序列化成java对象,存储在指定目录中。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

saveassequencefile

saveassequencefile方法将键值对型rdd以sequencefile的格式保存。键值对型rdd也可以以文本的格式保存,只须使用saveastextfile方法即可。

《Spark大数据分析:核心概念、技术及实践》一3.5 API

需要注意的是,上面的方法都把一个目录的名字作为输入参数,然后在这个目录为每个rdd分区创建一个文件。这种设计不仅高效而且可容错。因为每一个分区被存成一个文件,所以spark在保存rdd的时候可以启动多个任务,并行执行,将数据写入文件系统中。这样也保证了写入数据的过程是可容错的。一旦有一个将分区写入文件的任务失败了,spark可以再启动一个任务,重写刚才失败任务创建的文件。

继续阅读