天天看点

Spark原理及应用

作者:勇者热情生活家

Apache Spark是通用的分布式大数据计算引擎。Spark是UC Berkeley AMPLab(美国加州大学伯克利分校的AMP实验室)开源的通用并行框架。Spark拥有Hadoop MapReduce所具有的优点,但不同于Hadoop MapReduce的是,Hadoop每次经过Job执行的中间结果都存储到HDFS等磁盘上,而Spark的Job中间输出结果可以保存在内存中,而不再需要读写HDFS。因为内存的读写速度与磁盘的读写速度不在一个数量级上,所以Spark利用内存中的数据能更快速地完成数据的处理。Spark启用了弹性分布式数据集(Resilient Distributed Dataset,RDD),除了能够提高交互式查询效率,还可以优化迭代器的工作负载。由于弹性分布式数据集的存在,使得数据挖掘与机器学习等需要迭代的MapReduce的算法更容易实现。

Spark的原理

Spark的特点

1.计算速度快

Spark将每个任务都构造成一个DAG(Directed Acyclic Graph,有向无环图)来执行,其内部计算过程基于弹性分布式数据集在内存中对数据进行迭代计算,因此其运行效率很高。官方数据表明,如果计算的数据从磁盘上读取,则Spark的速度是Hadoop MapReduce的10倍以上;如果计算的数据从内存中读取,则Spark的计算速度是Hadoop MapReduce的100倍以上。

2.易于使用

Spark提供了80多个高级运算操作,支持丰富的算子,开发人员只需要按照其封装好的API实现即可,不需要关心Spark的底层架构。同时,Spark支持多种语言开发,包括Java、Scala、Python。

3.通用大数据框架

Spark提供了多种类型的开发库,包括Spark Core、Spark SQL(即时查询)、Spark Streaming(实时流处理)、Spark MLlib、GraphX(图计算),使得开发人员可以在同一个应用程序中无缝组合使用这些库,而不用像传统的大数据方案那样将离线任务放在Hadoop MapReduce上运行,将实时流计算任务放在Storm上运行,并维护多个平台。Spark提供了从实时流计算、MapReduce离线计算、SQL计算、机器学习到图计算的一站式整体解决方案。

4.支持多种资源管理器

Spark支持单机、Standalone、Hadoop YARN、Apache Mesos等多种资源管理器,用户可以根据现有的大数据平台灵活地选择运行模式。

5.Spark生态圈丰富

Spark生态圈以Spark Core为核心,支持从HDFS、S3、HBase等多种持久化层读取数据。同时,Spark支持以Hadoop YARN、Apache Mesos和Standalone为资源管理器调度Job,完成Spark应用程序的计算。Spark应用程序可以基于不同的组件实现,如Spark Shell、Spark Submit、Spark Streaming、Spark SQL、BlinkDB(权衡查询)、MLlib/MLbase(机器学习)、GraphX和SparkR(数学计算)等。Spark生态圈已经从大数据计算和数据挖掘扩展到机器学习、自然语言处理和语音识别等领域。

Spark的模块

Spark基于Spark Core建立了Spark SQL、Spark Streaming、MLlib、GraphX、SparkR核心组件,基于不同组件可以实现不同的计算任务,这些计算任务的运行模式有:本地模式、独立模式、Mesos模式、YARN模式。Spark任务的计算可以从HDFS、S3、Hypertable、HBase或Cassandra等多种数据源中存取数据。

Spark原理及应用

Spark Core

Spark的核心功能实现包括基础设施、存储系统、调度系统和计算引擎。

(1)基础设施:Spark中有很多基础设施,这些基础设施被Spark中的各种组件广泛使用,包括SparkConf(配置信息)、SparkContext(Spark上下文)、Spark RPC(远程过程调用)、ListenerBus(事件总线)、MetricsSystem(度量系统)、SparkEnv(环境变量)等。

①SparkConf:SparkConf用于定义Spark应用程序的配置信息。

②SparkContext:SparkContext是Spark应用程序的入口,Spark应用程序的提交与执行离不开SparkContext。SparkContext隐藏了网络通信、分布式部署、消息通信、存储体系、计算引擎、度量系统、文件服务等内容,开发人员只需要使用SparkContext提供的API完成功能开发即可。

③Spark RPC:Spark组件之间的网络通信依赖Spark RPC框架。Spark RPC基于Netty实现,使用中分同步和异步两种方式。

④ListenerBus:ListenerBus即事件总线,主要用于SparkContext内部各组件之间的事件交互。ListenerBus属于监听者模式,采用异步调用的方式实现。

⑤MetricsSystem:MetricsSystem为度量系统,用于整个Spark集群中各个组件运行状态的监控。度量系统由多种度量源和多种度量输出组成。

⑥SparkEnv:SparkEnv为Spark的执行环境,SparkEnv内部封装了RpcEnv(RPC环境)、序列化管理器、BroadcastManager(广播管理器)、MapOutputTracker(Map任务输出跟踪器)、存储系统、MetricsSystem(度量系统)、OutputCommitCoordinator(输出提交协调器)等Spark程序运行所需要的基础环境组件。

(2)存储系统:Spark存储系统用于管理Spark运行过程中依赖的数据的存储方式和存储位置。Spark存储系统首先考虑在各节点的内存中存储数据,当内存不足时会将数据存储到磁盘上,这种内存优先的存储策略使得Spark的计算性能无论在实时流计算还是在批量计算的场景下都表现很好。Spark的内存存储空间和执行存储空间之间的边界可以灵活控制。

(3)调度系统:Spark调度系统主要由DAGScheduler和TaskScheduler组成。DAGScheduler负责创建Job、将DAG中的RDD划分到不同Stage中、为Stage创建对应的Task、批量提交Task等。TaskScheduler负责按照FIFO(First Input First Output,先进先出)或者FAIR(公平调度)等调度算法对Task进行批量调度。

(4)计算引擎:计算引擎由内存管理器、任务管理器、Task、Shuffle管理器等组成。

Spark SQL

Spark SQL提供基于SQL的数据处理方式,使得分布式数据的处理变得更加简单。此外,Spark提供了对Hive SQL的支持。

Spark Streaming

Spark Streaming提供流计算能力,支持Kafka、Flume、Kinesis和TCP等多种流式数据源。此外,Spark Streaming提供了基于时间窗口的批量流操作,用于对一定时间周期内的流数据执行批量处理。

GraphX

GraphX用于分布式图计算。通过Pregel提供的API可以快速解决图计算中的常见问题。

Spark MLlib

Spark MLlib为Spark的机器学习库。Spark MLlib提供了统计、分类、回归等多种机器学习算法的实现。其简单易用的API接口降低了机器学习的门槛。

SparkR

SparkR是一个R语言包,提供了轻量级的基于R语言使用Spark的方式。SparkR实现了分布式的数据框,支持类似查询、过滤及聚合的操作(类似R语言中的数据框包dplyr),使得基于R语言能够更方便地处理大规模的数据集。同时,SparkR支持基于Spark MLlib进行机器学习。

Spark的运行原理

Spark的运行模式

Spark的运行模式主要包括Local模式、Standalone模式、On YARN、On Mesos和运行在AWS等公有云平台上

Spark原理及应用

Spark的集群架构

Spark的集群架构主要由Cluster Manager(管理器)、Worker(工作节点)、Executor(执行器)、Driver(驱动器)、Application(应用程序)5部分组成

Spark原理及应用

(1)Cluster Manager:Spark集群管理器,主要用于整个集群资源的管理和分配。根据部署模式的不同,可以分为Local、Standalone、YARN、Mesos和AWS。

(2)Worker:Spark的工作节点,用于执行提交的任务。Worker的工作职责如下。

①通过注册机制向Cluster Manager汇报自身的CPU和内存等资源使用信息。

②在Master的指示下创建并启动Executor,Executor是真正的计算单元。

③将资源和任务进一步分配给Executor并运行。

④同步资源信息和Executor状态信息给Cluster Manager。

(3)Executor:真正执行计算任务的组件,是某个Application运行在Worker上的一个进程。该进程负责Task的运行并且将运行的结果数据保存到内存或磁盘上。Task是运行在Executor上的任务单元,Spark应用程序最终被划分为经过优化的多个Task的集合。

(4)Driver:Application的驱动程序,可以理解为驱动程序运行中的main()函数,Driver在运行过程中会创建SparkContext。Application通过Driver与Cluster Manager和Executor进行通信。Driver可以运行在Application上,也可以由Application提交给Cluster Manager,再由Cluster Manager安排Worker运行。Driver的主要职责如下。

①运行应用程序的main()函数。

②创建SparkContext。

③划分RDD并生成DAG。

④构建Job并将每个Job都拆分为多个Task,这些Task的集合被称为Stage。各个Stage相互独立,由于Stage由多个Task构成,因此也被称为Task Set。Job是由多个Task构建的并行计算任务,具体为Spark中的Action操作(例如collect、save等)。

⑤与Spark中的其他组件进行资源协调。

⑥生成并发送Task到Executor。

(5)Application:基于Spark API编写的应用程序,其中包括实现Driver功能的代码和在集群中多个节点上运行的Executor代码。Application通过Spark API创建RDD、对RDD进行转换、创建DAG、通过Driver将Application注册到Cluster Manager。

Spark的运行流程

Spark的数据计算主要通过RDD的迭代完成,RDD是弹性分布式数据集,可以看作是对各种数据计算模型的统一抽象。在RDD的迭代计算过程中,其数据被分为多个分区并行计算,分区数量取决于应用程序设定的Partition数量,每个分区的数据都只会在一个Task上计算。所有分区可以在多个机器节点的Executor上并行执行。

Spark原理及应用

(1)创建RDD对象,计算RDD之间的依赖关系,并将RDD生成一个DAG。

(2)DAGScheduler将DAG划分为多个Stage,并将Stage对应的Task Set提交到集群管理中心。划分Stage的一个主要依据是当前计算因子的输入是否确定。如果确定,则将其分到同一个Stage中,避免多个Stage之间传递消息产生的系统资源开销。

(3)TaskScheduler通过集群管理中心为每个Task都申请系统资源,并将Task提交到Worker。

(4)Worker的Executor执行具体的Task。

Spark的使用

Spark被广泛应用于大数据行业的各个领域,包括实时流计算、历史数据分析、机器学习、图计算等。本节将从Spark的安装、Spark RDD的使用、Spark Streaming的使用和Spark SQL的使用等方面来介绍Spark各个组件的特性。

Spark的安装

这里以Linux系统单机版为例介绍Spark的安装方式,具体步骤如下。

(1)到官网下载最新的Spark安装包,注意这里下载Spark编译好的带Hadoop的版本,即spark-2.4.3-bin-hadoop2.7.tgz。

Spark原理及应用

(2)将安装包复制到安装目录下,执行以下命令解压Spark安装包。

Spark原理及应用

(3)配置系统的Spark环境变量。

①执行以下命令编译profile文件。

Spark原理及应用

②在profile文件的最后加上以下内容来设置Spark环境变量。

Spark原理及应用

③键盘按下“Esc”,输入冒号“:”加“wq”保存,退出vim编辑模式。

④执行以下命令使文件修改立刻生效。

Spark原理及应用

(4)新建spark-env.sh配置文件。

①执行以下命令,进入Spark的conf目录。

Spark原理及应用

②执行以下命令,根据Spark提供的spark-env.sh.template模板文件复制一份新的名为spark-env.sh的配置文件。

Spark原理及应用

③执行以下命令打开spark-env.sh文件,输入vim进入编辑模式。

Spark原理及应用

④在spark-env.sh文件的最后加上以下内容。

Spark原理及应用
Spark原理及应用

⑤键盘按下“Esc”,输入冒号“:”加“wq”保存,退出vim编辑模式。

(5)新建slaves配置文件。

①执行以下命令,进入Spark的conf目录。

Spark原理及应用

②执行以下命令,根据Spark提供的slaves.template模板文件复制一份新的名为slaves的配置文件。

Spark原理及应用

③执行以下命令打开slaves文件,输入vim进入编辑模式。

Spark原理及应用

④在slaves文件的最后加上以下内容,表示在Localhost服务器上有一个Slave角色。

Spark原理及应用

⑤键盘按下“Esc”,输入冒号“:”加“wq”保存,退出vim编辑模式。

(6)启动Spark。

①执行以下命令,进入Spark的sbin目录。

Spark原理及应用

②执行以下命令,启动Spark,在启动过程中会要求输入Linux的登录密码,按照提示输入即可。

Spark原理及应用

③在启动后,控制台会打印出以下日志,提示日志文件的目录。

Spark原理及应用

④查看Master日志:在Spark启动后会看到Master的核心日志如下。

Spark原理及应用

⑤查看Worker日志:在Spark启动后会看到Worker的核心日志如下。

Spark原理及应用

⑥Jps查看进程。

Spark原理及应用

⑦在浏览器地址栏中输入http://192.168.2.103:8080查看Master页面。注意,192.168.2.103是笔者当前的服务器IP地址

Spark原理及应用

⑧在浏览器地址栏中输入http://192.168.2.103:8081查看Worker页面

Spark原理及应用

(7)执行Spark默认example:进入Spark安装目录,执行以下命令,启动Spark示例中的SparkPi任务。

Spark原理及应用

Spark RDD的使用

1.RDD的介绍

RDD是Spark中最基本的数据抽象,代表一个不可变、可分区、元素可并行计算的集合。RDD具有自动容错、位置感知性调度和可伸缩等特点。RDD允许用户在执行多个查询时显式地将数据集缓存在内存中,后续查询能够重用该数据集,这极大地提升了查询效率。

2.RDD的核心结构及概念

(1)Partition:RDD内部的数据集在逻辑上和物理上都被划分为多个分区(Partition)以提高运行的效率,分区数量决定了计算的并行度,每一个分区内的数据都在一个单独的任务中被执行,如果在计算过程中没有指定分区数,那么Spark会采用默认分区数量。默认分区数量为程序运行分配到的CPU核数。

(2)Partitioner:Partitioner是RDD的分区函数。分区函数不但决定了RDD本身的分区数量,也决定了其父RDD Shuffle输出时的分区数量。Spark实现了基于Hash(HashPartitioner)和基于范围(RangePartitioner)的两种分区函数。

注意:只有对于Key-Value的RDD才会有Partitioner,而非Key-Value的RDD的Parititioner值是None。

(3)RDD的依赖关系:RDD的每次转换都会生成一个新的RDD,因此RDD之间会有前后依赖关系。当在计算过程中出现异常情况导致部分分区数据丢失时,Spark可以通过依赖关系从父RDD中重新计算丢失的分区数据,而不需要对RDD上的所有分区全部重新计算。RDD的依赖分为窄依赖和宽依赖。

◎窄依赖:如果父RDD的每个分区最多只能被子RDD的一个分区使用,则称之为窄依赖。

◎宽依赖:如果父RDD的每个分区都可以被子RDD的多个分区使用,则称之为宽依赖。

窄依赖的每个子RDD的Partition的生成操作都是可以并行的,而宽依赖则需要所有父Partition Shuffle结果完成后再被执行

Spark原理及应用

4)Stage:Stage是由一组RDD组成的可进行优化的执行计划。如果RDD的依赖关系为窄依赖,则可放在同一个Stage中运行;若RDD的依赖关系为宽依赖,则要划分到不同Stage中。这样,当Spark执行作业时,会按照Stage划分不同的RDD,生成一个完整的最优的执行计划,使每个Stage内的RDD都尽可能在各个节点上并行地被执行

Spark原理及应用

(5)PreferredLocation:PreferredLocation是一个用于存储每个Partition的优先位置的列表。对于每个HDFS文件来说,这个列表保存的是每个Partition所在的块的位置,也就是该HDFS文件的“划分点”。

(6)CheckPoint:CheckPoint是Spark提供的一种基于快照的缓存机制。当需要计算的RDD过多时,为了避免任务执行失败后重新计算之前的RDD,可以对RDD做快照(CheckPoint)处理,检查RDD是否被计算,并将结果持久化到磁盘或HDFS上。此外,Spark提供另一种缓存机制Cache,Cache缓存数据由Executor管理,当Executor消失时,Cache缓存的数据将被清除,而CheckPoint将数据保存到永久性磁盘或HDFS,当计算出现运行错误时,Job可以从CheckPoint点继续计算。

3.创建一个RDD应用

在创建RDD应用前,首先需要创建一个Spark项目,下面以Java基于Maven的项目为例介绍RDD的创建。

(1)构建Maven项目。按照编译器提示构建一个简单的Maven项目,并打开pom.xml文件,将Spark依赖加入项目中。需要注意的是,Spark不再对Java 1.7进行维护,因此必须指明Maven源码编译和Target编译均使用Java 1.8。

在pom.xml文件中加入Maven的编辑插件,具体代码如下。

Spark原理及应用

(2)新建RDD类。

通过上述代码已经构建了一个简单的Spark应用项目,在项目的Java目录下新建一个名为RDDSimple的Java类,并在类中输入下面代码来构建一个简单的RDD。

Spark原理及应用

上述代码定义了一个简单的RDD,具体过程为:首先定义SparkConf实例conf,然后调用sc.parallelize方法将一个数组转换为一组名为distDatardd的Spark RDD,最后通过调用distDatardd.reduce方法对distDatardd进行操作。上述代码实现了对RDD中的数据进行加和操作然后输出。

利用JavaSparkContext的parallelize方法将已经存在的一个集合转换为RDD,集合中的数据将被复制到RDD并参与并行计算。并行集合的一个重要参数是分区数量(将数据集切割为多份)。Spark将为集群的每个分区都运行一个任务。一般希望集群中的每个CPU都有2~4个分区,这样既能良好地利用CPU,又不至于任务太多导致任务阻塞等待。通常Spark会尝试根据集群的CPU核数自动设置分区数量,也可以手动设置分区大小。设置代码如下。

Spark原理及应用

(3)打包和作业提交

在项目的根目录下输入如下命令对项目进行打包,打包后的程序在Target目录下。

Spark原理及应用

Spark的作业提交很简单,只需要调用spark-submit命令指定主函数入口类并将编译好的JAR包提交到集群即可,集群会自动为程序分配资源并执行。在提交的时候,首先需要通过--class指定Spark程序的入口,然后通过--master指定提交给哪个集群,最后跟上JAR包路径即可。提交命令如下。

Spark原理及应用

在作业提交后,通过控制台能看到程序输出如下结果。

Spark原理及应用

4.利用外部数据集生成RDD

Spark可以从Hadoop或者其他外部存储系统创建RDD,包括本地文件系统、HDFS、Cassandra、HBase、S3等。Spark RDD支持多种文件格式,包括文本文件、SequenceFiles、JSON文件和任何其他Hadoop InputFormat。通过SparkContext的textFile方法读取文本文件创建RDD的代码如下。

Spark原理及应用

在上述代码中,textFile()方法的URI参数可以是本地文件、本地路径、HDFS路径、S3路径等。如果该参数的输入值是具体的文件,则Spark会读取参数中的文件;如果是路径,则Spark会读取该路径下的所有文件,并最终将其作为数据源加载到内存生成对应的RDD。

Spark加载数据的注意事项包括以下几个方面。

(1)如果访问的是本地文件路径,则必须可以在工作节点上以相同路径访问该文件。一般做法是将数据文件远程复制到所有工作节点对应的路径下或使用共享文件系统实现。

(2)除了支持基于文件名的方式加载文件,Spark还支持基于目录、压缩文件和通配符的方式加载文件。例如,可以使用textFile("/my/directory")表示加载“/my/directory”路径下的所有文件;使用textFile("/my/directory/*.txt")表示加载“/my/directory”路径下所有以.txt为后缀名的文件;使用textFile("/my/directory/*.gz")表示加载并解压“/my/directory”目录下所有以.gz为后缀名的文件。

(3)Spark加载文件时可以设置分区数。Spark在默认情况下为每个文件块都创建一个分区(HDFS中默认文件块大小为128MB),也可以通过传递更大值来设置更多分区。需要注意的是,分区数不能小于文件块的数量。

(4)除了加载文本文件,Spark的Java API还支持其他多种数据格式。

①wholeTextFiles:JavaSparkContext.wholeTextFiles允许客户端程序读取包含多个小文本文件的目录,并将每个文件都以<文件名,内容>的键值对返回。该方法与textFile不同,textFile是将每个文件中的每行都作为一条记录返回的。

②SequenceFiles:加载SequenceFiles需要使用SparkContext的sequenceFile[Key,Value]方法实现,其中,Key和Value是文件中键和值的类型。这些键值对对应的是Hadoop的Writable接口的子类,比如IntWritable和Text。

③Hadoop InputFormat:对于Hadoop InputFormat类型的数据,可以使用JavaSparkContext.hadoopRDD方法或JavaSparkContext.newAPIHadoopRDD加载并生成RDD。

④文件保存和序列化:JavaRDD.saveAsObjectFile方法和JavaSparkContext.objectFile方法采用Java默认序列化的方式将数据序列化并保存到RDD。同时,用户可以在保存数据的时候,使用其他更高效的序列化方法(例如Avro、Kyro等)。

5.RDD的转换和操作

RDD支持两种类型的操作:转换(Transformation)和操作(Action)。转换指从现有RDD创建新RDD,操作指在RDD上运行计算并将计算结果返回驱动程序。例如,map是一个转换:它将RDD的所有元素都调用map函数进行转换处理,返回一个表示转换结果的新RDD;reduce是一个操作:它将RDD的所有元素都调用reduce函数进行聚合操作,将最终计算结果返回驱动程序。表示聚合处理的函数还有reduceByKey、reduceBy等。

Spark中的所有转换(Transformation)都是懒加载的,即不会立即执行转换操作,而是先记录RDD之间的转换关系,仅当触发操作(Action)时才会执行RDD的转换操作,并将计算结果返回驱动程序。这种懒加载的设计使Spark能够更加高效地运行。具体代码如下。

Spark原理及应用

上述代码通过sc.textFile()方法定义了名为lines的RDD,此时文件并没有加载到内存中,仅仅是指向文件的位置。通过lines.map()方法定义了名为lineLengths的map转换,同样由于懒加载机制,lineLengths不会立即执行计算。最终,当运行reduce操作时,Spark将RDD计算分解为不同Stage在不同机器上运行任务,每台机器都运行部分map数据集并将运行结果保存为本地的reduce,在各个节点都运算完成后将reduce结果返回driver程序并进行结果的合并。

6.RDD持久化的概念、级别和原则

Spark可以跨节点在内存中持久化RDD。当持久化RDD时,每个节点都会在内存中缓存计算后的分区数据,当其他操作需要使用该RDD时,可以直接重用该缓存数据,这使得之后的RDD计算速度更快(通常超过10倍)。缓存是迭代计算和交互式计算的关键。

应用程序可以使用persist()或cache()标记要缓存的RDD,当调用操作(Action)执行计算时,计算结果将被缓存在节点的内存中。Spark缓存具有容错性,如果RDD的某个分区丢失,则该RDD将被自动重新计算。

每个持久化RDD都可以使用不同存储级别进行存储,Spark允许将数据集存储在磁盘上或内存中。Spark将需要缓存的数据序列化为Java对象(序列化可以节省磁盘或内存空间),然后跨节点复制到其他节点上,以便其他节点重用该数据。Spark中缓存持久化级别是通过StorageLevel来设置的。具体代码如下。

(1)Spark持久化的级别

①MEMORY_ONLY:使用未经过序列化的Java对象在内存中存储RDD。当内存不够时,将不会进行持久化;当下次需要该RDD时,再从源头处重新计算。该策略是默认的持久化策略,当使用cache()时,使用的是该持久化策略。

②MEMORY_AND_DISK:使用未经过序列化的Java对象存储RDD,优先尝试将RDD保存在内存中。如果内存不够,则会将RDD写入磁盘文件;当下次需要该RDD时,从持久化的磁盘文件中读取该RDD即可。

③MEMORY_ONLY_SER:MEMORY_ONLY_SER的含义与MEMORY_ONLY类似,唯一区别是MEMORY_ONLY_SER会将RDD中的数据进行序列化。在序列化过程中,RDD的每个Partition都将会被序列化成一个字节数组,这种方式更加节省内存,从而避免持久化的RDD占用过多内存导致JVM频繁GC。

④MEMORY_AND_DISK_SER:MEMORY_AND_DISK_SER的含义与MEMORY_AND_DISK类似。唯一区别是MEMORY_AND_DISK_SER会将RDD中的数据进行序列化。在序列化过程中,RDD的每个Partition都会被序列化成一个字节数组。这种方式更加节省内存,从而避免持久化的RDD占用过多内存导致频繁GC。

⑤DISK_ONLY:使用未序列化的Java对象将RDD全部写入磁盘文件。

⑥MEMORY_ONLY_2和MEMORY_AND_DISK_2:对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本,则只能将这些数据从头重新计算一遍。

⑦OFF_HEAP:OFF_HEAP与MEMORY_ONLY_SER类似,但OFF_HEAP将数据存储在堆外内存中。该参数需要Spark启用堆外内存。

(2)持久化的原则

Spark提供了丰富的存储级别,旨在通过不同存储级别的设置实现内存和CPU的最佳使用,具体开发中该如何选择持久化方案呢?以下为Spark官方提供的缓存持久化的选择流程。

①如果RDD在默认存储级别(MEMORY_ONLY)下运行良好,则建议使用MEMORY_ONLY。该级别是CPU效率最高的类型,基于CPU快速计算可以使RDD上的操作尽可能快地运行。

②如果系统显示内存使用过高,则尝试使用MEMORY_ONLY_SER,并选择更快速的序列化库,以加快序列化时间和节省对象的存储空间。

③如果要快速恢复故障,则建议使用副本存储级别。其他存储级别需要通过重新计算丢失的数据来保障缓存的完整性,而副本存储级别可以在其缓存对应的副本节点上直接执行任务,不用等待重新计算丢失的分区数据。

(3)删除持久化缓存

Spark会自动监视每个节点上的缓存使用情况,并以LRU方式删除旧的数据分区。如果想手动删除RDD,则可通过RDD.unpersist()方法完成。

继续阅读