天天看点

Spark的笔记

#Spark中一些重要的知识点

1、spark-on-yarn的应用场景

​ 1.因为历史原因,方便运维部门维护,之前用的hadoop

2.用yarn来运行各种任务,相比其他的资源调度系统更稳定,便于升级优化

2、spark-on-yarn的执行流程

​ spark-on-yarn分为两种运行模式:client和cluster

client:

  1. 客户端提交一个Application,在客户端启动一个Driver进程。
  2. Driver进程会向RS(ResourceManager)发送请求,启动AM(ApplicationMaster)的资源。
  3. RS收到请求,随机选择一台NM(NodeManager)启动AM。这里的NM相当于Standalone中的Worker节点。
  4. AM启动后,会向RS请求一批container资源,用于启动Executor.
  5. RS会找到一批NM返回给AM,用于启动Executor。
  6. AM会向NM发送命令启动Executor。
  7. Executor启动后,会反向注册给Driver,Driver发送task到Executor,执行情况和结果返回给Driver端。
    cluster模式:
               

    1.由client向RM提交请求,并上传jar到hdfs上

    2.RM想NM申请资源,创建SparkApplicationMaster

    3.NM启动AppMaster,并想RM中的ApplicationsManager注册

    4.AM从HDFS中 找到jar文件,启动SparkContext,DAGscheduler和yarnClusterScheduler

    5.RM向RM中的AsM注册申请container资源

    6.RM通知NM分配Container,这时可以收到来自AsM关于container的报告

    7.SparkApplicationMaster直接和container进行交互,完成这个分布式任务

    3、集群启动流程

    ​ Master:

    1.初始化一些用于启动Master的参数

    2.创建ActorSystem对象,并启动Actor

    3.调用工具类AkkaUtils工具类来创建actorSystem(用来创建Actor的对象)

    4.调用actorOf方法创建属于Master的actor,在创建actor的同时,会初始化Master

    5.生命周期方法(preStart)是在构造器之后,receive方法之前执行,只会执行一次

    6.启动一个定时器,定时检查超时的Worker

    7.获取到超时的Worker然后把他移除掉(删除内存和磁盘里的workerinfo)

    8.启动receive方法,会不断的执行,用于接收actor发送过来的请求

    Worker:(与Master类似)

    1.创建ActorSystem对象

    2.创建了属于Worker的Actor

    3.启动生命周期方法(preStart),向Master进行注册,通过Master的Url获取Master的actor,

    4.Worker接收到Master发送过来的注册成功信息,然后更新url

    5.调用定时器,定时地向Master发送心跳信息

    4、SparkContext初始化时做的三件事

    ​ 主要三件事

    1,创建SparkEnv,通过SparkEnv对象的createDriverEnv创建了ActorSystem对象

    2,创建一个TaskScheduler,用来生成并发送task给Executor

    3,创建DAGScheduler用来划分stage

    5、后端调度器创建了那两个actor,分别是和谁进行交互的

    ​ Driver端:DriverActor跟Executors交互,发送实际的任务

    Driver端:ClientActor跟Master通信,发送任务信息

    6、clientactor是怎么向master注册的

    ​ 1.Driver:尝试注册,ClientActor发送任务信息给masterActor

    2.Master:把任务信息保存到内存,保存到磁盘

    4.Master:向Driver端响应信息(任务id,masterURL)

    5.Master:Schedule调用资源(集群资源发生改变的时候调用Schedule)

    6.Driver:监听器监听任务的运行情况

    .Yarn的任务提交流程

    1、Client端请求ResourceManage进行提交任务

    2、ResourceManage和NodeManage进行通信

    3、NodeManager会在其中一个节点启动ApplicationMaster , ApplicationMaster相当于当前任务调度的官家

    4、ApplicationMaste开始向ResourceManager申请资源

    5、ApplicationManage负责资源的调度 , 通知相应的NodeManager来启动YarnChild

    6、 YarnChild和ApplicationMaste进行通信 , Applic.ationMaster对YarnChild进行监控

    1. union all和 union的区别

    union和union all的区别是,union会自动压缩多个结果集合中的重复结果,而union all则将所有的结果全部显示出来,不管是不是重复。

    Union因为要进行重复值扫描,所以效率低。如果合并没有刻意要删除重复行,那么就使用Union All

    任务提交执行流程:

    Spark任务的本质是对我们编写的RDD的依赖关系切分成一个个Stage,将Stage按照分区分批次的生成TaskSet发送到Executor进行任务的执行

    Spark任务分两种:

    1、shuffleMapTask:shuffle之后的task

    2、resultTask:shuffle之前的task

    Spark任务分配有两种方式:

    1,尽量打撒方式(系统默认)

    2,尽量集中方式

    首先把App打包上传到集群上,并开始分配资源及调用包中的主类

    然后

    1. 在Driver端,调用SparkSubmit类,内部执行submit–>doRunMain–>通过反射获取应用程序的主类对象(远程代理对象)–>执行主类的main方法,这是提交,
    2. Driver端构建SparkConf和SparkContext对象,在SparkContext入口类做了三件事,创建了SparkEnv对象(创建了ActorSystem对象)、TaskScheduler(用来生成并发送task给Executor)、DAGScheduler(用来划分Stage)
    3. ClientActor将任务信息封装到ApplicationDescription对象里并且提交给Master
    4. Master收到ClientActor提交的任务信息后,把任务信息存到内存中,然后又将任务信息放到队列中(waitingApps)
    5. 当开始执行这个任务信息时,调用scheduler方法,进行资源调度。
    6. 将调度好的资源封装到LaunchExecutor并发送给对应的Worker
    7. Worker接收到Master发送过来的调度信息(LaunchExecutor)后,将信息封装成一个ExecutorRunner对象
    8. 封装成ExecutorRunner后,调用ExecutorRunner的Start方法,开始启动CoarseGrainedExecutorBackend对象(启动Executor)
    9. Executor启动后向DriverActor进行反向注册
    10. 与DriverActor注册成功后,创建一个线程池(ThreadPool),用来执行任务
    11. 当所有Executor注册完成后,意味着作业环境准备好了,Driver端会结束与SparkContext对象的初始化
    12. 当Driver初始化完成后(创建了一个sc示例),会持续执行我们自己提交的App的代码,当触发了Action的RDD算子时,就触发了一个job,这时会调用DAGScheduler对象进行Stage划分
    13. DAGScheduler开始进行Stage划分
    14. 将划分好的Stage按照分区生成一个一个的task,并且封装到TaskSet对象,然后TaskSet提交到TaskScheduler
    15. TaskScheduler接收到提交过来的TaskSet,拿到一个序列化器对TaskSet序列化,将序列化好的TaskSet封装到LaunchExecutor并提交到DriverActor
    16. DriverActor把LaunchExecutor发送到Executor上
    17. Executor接收到DriverActor发送过来的任务(LaunchExecutor),会将其封装成TaskRunner,然后从线程池中获取线程来执行TaskRunner
    18. TaskRunner拿到反序列化器,反序列化TaskSet,然后执行App代码,也就是对RDD分区上执行的算子和自定义函数

    ------------任务提交流程---------

    ​ 将任务信息封装在appArgs里 --调用action进行提交

    –拿到远程代理对象 --创建一个线程执行runmain方法

    –通过反射拿到入口类,然后通过入口类拿到main方法,执行main方法

    一、 DStream的概念:

    DStream一个离散流,是spark基本的数据抽象,它里面包含了RDD,

    我们在操作DStream的过程中其实就是操作里面的RDD,它有以下一个特性

    1,一个放了多个DStream的列表,并且DStream之间是有依赖关系的

    2,每隔一段时间(时间间隔)就会生成一个RDD

    3,每隔一段时间生成的RDD会有一个函数作用在这个RDD上

    在我们操作sparkstreaming的过程中,需要操作的是DStream,而不是rdd了

    其中transformation中有三个特殊的原语,updateStateByKey,transform,window operations

    在特殊的情况下用到,经常用到

    二、 updateStateByKey的应用场景和使用

    应用场景: 把历史批次结果应用到当前批次结果来进行计算时候需要用到它

    k,v的形式,相同k的value进行聚合,调用的时候需要传入一个自定义的聚合的函数,

    和一个HashPartitioner分区器,

    再加一个true or false ,表示是否在生成的rdds中记住partitioner对象

    其中自定义函数中有三个类型的参数 String代表元组中的key,就是每个单词

    Seq[Int]代表当前批次中相同key对应的value,seq(1,1,1,1)

    Option[Int]代表上一次的相同key对应的value结果,有可能有值,有可能没值

    并且他们封装在Iterator中,例:(it:Iterator[(String,Seq[Int],Option[Int])])

    注意:必须checkpoint

    三、transform的应用场景和使用

    它可以不用去直接操作DStream,而是直接去操作DStream里的RDD,

    极大的丰富了我们平时调用的api

    四、window operations的应用场景和使用

    每次展示的结果范围是多个批次的结果范围,当指定批次间隔,但是每次展示的

    并不是一次批次间隔的结果,而是需要展示多次批次间隔的范围结果,我们就用窗口

    操作,当中有两个重要的参数,一个是窗口长度,一个是滑动间隔,窗口长度是每一次

    展示结果的范围,滑动间隔指的是上一次结果范围和下一次结果范围之间的时间间隔

    注意:这两个参数,必须是批次间隔的倍数

    五、Streaming提供的两种获取kafka的方式的优缺点

    1.receiver,调用了kafka中高阶的api,是将数据获取到缓存当中以便调用,

    但是,这样将造成数据的不安全,如果想要实现数据安全,可以做checkpoint操作和writeAheadLog方式,

    预写日志的方式来实现数据安全,但是此方式需要保存两份数据,

    浪费空间,且影响性能和效率,所以一般公司不会用它。

    2,Direct 调用了kafka底层的api,直接拉取数据进行处理,相对receiver,效率要高很多,而且

    可以手动的控制offset值,receiver无法手动控制offset值,保证了安全,

    六、通过哪种方式可以手动管理offset

    checkpoint(不常用),经常用到手动的维护在zookeeper里

    还有维护在hbase里,不常用,还有hdfs里,可以实现,但是没人愿意用,因为存在一些小文件的问题

    还有存储到kafka中

    1 Spark SQL存储默认的格式是什么?

    parquet

    2 例举Spark中几个发生shuffle的算子

    reducebykey,countbykey,groupbykey ,aggregate,aggregateByKey,join

    3 Spark2.x版本中,DataSet和DataFrame的关系以及你对DataSet的理解

    共同点:

    1 DataFrame、Dataset都是数据集,spark平台下的分布式弹性,为处理大学数据提供便利

    2 两者都是惰性机制,在创建、转换,如map方法时,不会立即执行,只有在遇到Action(如doreach)时,才会遍历运算,

    3 ,他们会根据spark的内存情况自动缓冲运算,这样即使数据量很大,也不会出现内存溢出

    4,都有partition的概念

    5、三者有许多共同的函数,如filter,排序等

    6、在对DataFrame和Dataset进行操作许多操作都需要导入隐式转换

    7,都可以使用模式匹配获取各个字段的值和类型

    8,都支持sparksql操作,

    区别:

    9,DataFrame每一行的类型固定为Row,只有通过解析,(使用getAs)才能获取当中的值

    10 ,DdataSet,

    11,他们支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然

    利用这样的保存方式,可以方便的获得字段名和列的对应,而且分隔符(delimiter)可以自由指定

    12,DataFrame也可以叫Dataset[Row],每一行的类型是Row ,而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息

    4 描述Dstream的概念及特性

    DStream一个离散流,是spark基本的数据抽象,它里面包含了RDD,

    我们在操作DStream的过程中其实就是操作里面的RDD,它有以下一个特性

    1,一个放了多个DStream的列表,并且DStream之间是有依赖关系的

    2,每隔一段时间(时间间隔)就会生成一个RDD

    3,每隔一段时间生成的RDD会有一个函数作用在这个RDD上

    在我们操作sparkstreaming的过程中,需要操作的是DStream,而不是rdd了

    5 描述Spark Streaming的窗口操作应用场景

    每次展示的结果范围是多个批次的结果范围,当指定批次间隔,但是每次展示的

    并不是一次批次间隔的结果,而是需要展示多次批次间隔的范围结果,我们就用窗口

    操作,当中有两个重要的参数,一个是窗口长度,一个是滑动间隔,窗口长度是每一次

    展示结果的范围,滑动间隔指的是上一次结果范围和下一次结果范围之间的时间间隔

    注意:这两个参数,必须是批次间隔的倍数

    6 用Spark Streaming实时读取NetCat的数据实现WordCount,并用updateStateByKey实现an批次累加功能

    object StreamingWCACC {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("StreamingWCACC").setMaster("local[2]")
        val sc = new SparkContext(conf)
        val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
    // 设置checkpoint用于记录历史结果,要设置到HDFS
    ssc.checkpoint("d://cp-20181112-1")
    
    // 获取数据
    val dStream = ssc.socketTextStream("node01", 6666)
    
    // 进行计算
    val tups = dStream.flatMap(_.split(" ")).map((_, 1))
    val res: DStream[(String, Int)] =
    tups.updateStateByKey(func, new HashPartitioner(3), true)
    res.print()
    ssc.start()
    ssc.awaitTermination()
      }
       val func = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
        it.map(x => {
          (x._1, x._2.sum + x._3.getOrElse(0))
        })
      }
    }
               

    7 Spark Streaming消费Kafka的数据有哪几种方式,各自的优缺点是什么?

    1.receiver,调用了kafka中高阶的api,是将数据获取到缓存当中以便调用,但是,这样将造成数据的不安全,如果想要实现数据安全,可以做checkpoint操作和writeAheadLog方式,预写日志的方式来实现数据安全,但是此方式需要保存两份数据,浪费空间,且影响性能和效率,所以一般公司不会用它。

    2,Direct 调用了kafka底层的api,直接拉取数据进行处理,相对receiver,效率要高很多,而且

    可以手动的控制offset值,receiver无法手动控制offset值,保证了安全,

    8 Spark Streaming消费Kafka的数据时,什么情况会重复消费数据或丢数据?

    1,强行kill线程,导致消费后的数据,offset没有提交,但是数据未处理,导致这部分内存中的数据丢失

    2,设置offset为自动提交,关闭kafka时,如果在close之前,调用consumer.unsubscribe()则有可能部分offset没提交,下次重启会重复消费

    3,消费后的数据,当offset还没有提交时,partition就会断开连接,比如,(当遇到消费的数据很耗时,导致超过了Kafka的session timeout时间,那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费)

    4,

    9 Spark Streaming消费Kafka的数据时 ,怎么保证数据零丢失

    1 通过offset commit来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,继续上次的offset进行消费即可。

    2 输入的数据来自可靠的数据源和可靠的接收器

    3 元数据持久化 checkpointing

    4 启用了WAL特性(Write ahead log)

    5 at-least-once语义,一次仅一次语义

    6 Kafka direct API 可以手动控制offset值,

    10 SparkContext初始化过程中做了那三步重要操作

    ​ 1,创建SparkEnv,通过SparkEnv对象的createDriverEnv创建了ActorSystem对象

    2,创建一个TaskScheduler,用来生成并发送task给Executor

    3,创建DAGScheduler用来划分stage

    11关于Spark的shuffle

    Spark的Shuffle过程:

    shuffle操作,是在Spark操作中调用了一些特殊的算子才会触发的一种操作,

    shuffle操作,会导致大量的数据在不同的节点之间进行传输,

    因此,shuffle过程是Spark中最复杂、最消耗性能的一种操作

比如:reduceByKey算子会将上一个RDD中的每个key对应的所有value都聚合成一个value,然后生成一个新的RDD,新的RDD的元素类型就是<key, value>的格式,每个key对应一个聚合起来的value,在这里,最大的问题在于,对于上一个RDD来说,并不是一个key对应的所有的value都在一个partition中的,更不是不太可能key的所有value都在一个节点上,对于这种情况,就必须在集群中将各个节点上同一个key对应的values统一传输到一个节点上进行聚合处理,这个过程势必会发生大量的网络IO。

shuffle过程中分为shuffle write和shuffle read,而且会在不同的stage中进行的

在进行一个key对应的values的聚合时, 首先,上一个stage的每个map task就必须保证将自己处理的当前分区中的数据相同key写入一个分区文件中,可能会多个不同的分区文件,接着下一个stage的reduce task就必须从上一个stage的所有task所在的节点上,将各个task写入的多个分区文件中找到属于自己的分区文件,然后将属于自己的分区数据拉取过来,这样就可以保证每个key对应的所有values都汇聚到一个节点上进行处理和聚合, 这个过程就称之为shuffle!!!

shuffle过程中的分区排序问题

默认情况下,shuffle操作是不会对每个分区中的数据进行排序的

如果想要对每个分区中的数据进行排序,可以使用三种方法:

1、使用mapPartitions算子把每个partition取出来进行排序

rdd.mapPartitons(.toList.sortBy(._2).iterator)

2、使用repartitionAndSortWithinPartitions(该算子是对RDD进行重分区的算子),在重分区的过程中同时就进行分区内数据的排序

3、使用sortByKey对所有分区的数据进行全局排序

以上三种方法,mapPartitions代价比较小,因为不需要进行额外的shuffle操作,

repartitionAndSortWithinPartitions和sortByKey可能会进行额外的shuffle操作,因此性能并不是很高

会导致shuffle的算子

1、byKey类的算子:比如reduceByKey、groupByKey、sortByKey、aggregateByKey、combineByKey

2、repartition类的算子:比如repartition(少量分区变成多个分区会发生shuffle)、repartitionAndSortWithinPartitions、coalesce(需要指定是否发生shuffle)、partitionBy

3、join类的算子:比如join(先groupByKey后再join就不会发生shuffle)、cogroup

注意:首先对于上述操作,能不用shuffle操作,就尽量不用,尽量使用不发生shuffle的操作。

其次,如果使用了shuffle操作,那么肯定要进行shuffle的调优,甚至是解决遇到的数据倾斜问题。

shuffle操作是spark中唯一最消耗性能的过程

因此也就成了最需要进行性能调优的地方,最需要解决线上报错的地方,也就是唯一可能出现数据倾斜的地方

为了实时shuffle操作,spark才有stage的概念,在发生shuffle操作的算子中,需要进行stage的划分

shuffle操作的前半部分,属于上一个stage的范围,通常称之为map task,

shuffle操作的后半部分,属于下一个stage的范围,通常称之为reduce task,

其中map task负责数据的组织,也就是将同一个key对应的value都写入同一个下游task对应的分区文件中,

其中reduce task负责数据的聚合,也就是将上一个stage的task所在的节点上,将属于自己的各个分区文件都拉取过来进行聚合

map task会将数据先保存在内存中,如果内存不够时,就溢写到磁盘文件中,

reduce task会读取各个节点上属于自己的分区磁盘文件到自己节点的内存中进行聚合。

由此可见,shuffle操作会消耗大量的内存,因为无论是网络传输数据之前还是之后,

都会使用大量内存中数据结构来实施聚合操作,

在聚合过程中,如果内存不够,只能溢写到磁盘文件中去,

此时就会发生大量的网络IO,降低性能。

此外,shuffle过程中,会产生大量的中间文件,也就是map side写入的大量分区文件,

这些文件会一直保留着,直到RDD不再被使用,而且被gc回收掉了,才会去清理中间文件,

这主要是为了:如果要重新计算shuffle后RDD,那么map side不需要重新再做一次磁盘写操作,

但是这种情况下,如果在应用程序中一直保留着对RDD的引用,

导致很长的时间以后才会进行回收操作,

保存中间文件的目录,由spark.local.dir属性指定

所以,spark性能的消耗体现在:内存的消耗、磁盘IO、网络的IO

12 Driver端的后端调度器创建了哪两个重要的Actor,分别负责向哪些组件的Actor进行通信

​ 1 DriverActor跟Executors交互,发送实际的任务

2 ClientActor跟Master通信,发送任务信息

13 描述Stage划分过程,主要调用什么方法

Spark Application只有遇到action操作时才会真正的提交任务并进行计算,DAGScheduler 会根据各个RDD之间的依赖关系形成一个DAG,并根据ShuffleDependency来进行stage的划分,stage包含多个tasks,个数由该stage的finalRDD决定,stage里面的task完全相同,DAGScheduler 完成stage的划分后基于每个Stage生成TaskSet,并提交给TaskScheduler,TaskScheduler负责具体的task的调度,在Worker节点上启动task。

14.spark调优

spark.shuffle.file.buffer 默认值是32k

map side task的内存buffer大小,写数据到磁盘文件之前,会先保存在缓冲中,如果内存充足,可以适当加大,从而减少map side磁盘IO次数,提升性能

spark.reducer.maxSizeInFlight 默认值是48m

reduce task的buffer缓冲,代表了每个reduce task每次能够拉取的map side数据最大大小,如果内存充足,可以考虑加大,从而减少网络传输次数,提升性能

spark.shuffle.blockTransferService 默认值 netty

shuffle过程中,传输数据的方式,两种选项,netty或nio,spark 1.2开始,默认就是netty,比较简单而且性能较高,spark 1.5开始nio就是过期的了,而且spark 1.6中会去除掉

spark.shuffle.compress 默认值是 true

是否对map side输出的文件进行压缩,默认是启用压缩的,压缩器是由spark.io.compression.codec属性指定的,默认是snappy压缩器,该压缩器强调的是压缩速度,而不是压缩率

spark.shuffle.consolidateFiles 默认值是 false

默认为false,如果设置为true,那么就会合并map side输出文件,对于reduce task数量特别的情况下,可以极大减少磁盘IO开销,提升性能

spark.shuffle.io.maxRetries 默认值是 3

网络传输数据过程中,如果出现了网络IO异常,重试拉取数据的次数,默认是3次,对于耗时的shuffle操作,建议加大次数,以避免full gc或者网络不通常导致的数据拉取失败,进而导致task lost,增加shuffle操作的稳定性

spark.shuffle.io.retryWait 默认值是 5 s

每次重试拉取数据的等待间隔,默认是5s,建议加大时长,理由同上,保证shuffle操作的稳定性

spark.shuffle.io.numConnectionsPerPeer 默认值是1

机器之间的可以重用的网络连接,主要用于在大型集群中减小网络连接的建立开销,如果一个集群的机器并不多,可以考虑增加这个值

spark.shuffle.io.preferDirectBufs 默认值是true

启用堆外内存,可以避免shuffle过程的频繁gc,如果堆外内存非常紧张,则可以考虑关闭这个选项

spark.shuffle.manager 默认值是 sort

ShuffleManager,Spark 1.5以后,有三种可选的,hash、sort和tungsten-sort,sort-based ShuffleManager会更高效实用内存,并且避免产生大量的map side磁盘文件,从Spark 1.2开始就是默认的选项,tungsten-sort与sort类似,但是内存性能更高

spark.shuffle.memoryFraction 默认值是 0.2

如果spark.shuffle.spill属性为true,那么该选项生效,代表了executor内存中,用于进行shuffle reduce side聚合的内存比例,默认是20%,如果内存充足,建议调高这个比例,给reduce聚合更多内存,避免内存不足频繁读写磁盘

spark.shuffle.service.enabled 默认值是 false

启用外部shuffle服务,这个服务会安全地保存shuffle过程中,executor写的磁盘文件,因此executor即使挂掉也不要紧,必须配合spark.dynamicAllocation.enabled属性设置为true,才能生效,而且外部shuffle服务必须进行安装和启动,才能启用这个属性

spark.shuffle.service.port 默认值是 7337

外部shuffle服务的端口号,具体解释同上

spark.shuffle.sort.bypassMergeThreshold 默认值是200

对于sort-based ShuffleManager,如果没有进行map side聚合,而且reduce task数量少于这个值,那么就不会进行排序,如果你使用sort ShuffleManager,而且不需要排序,那么可以考虑将这个值加大,直到比你指定的所有task数量都打,以避免进行额外的sort,从而提升性能

spark.shuffle.spill 默认值是 true

当reduce side的聚合内存使用量超过了spark.shuffle.memoryFraction指定的比例时,就进行磁盘的溢写操作

spark.shuffle.spill.compress 默认值是true

同上,进行磁盘溢写时,是否进行文件压缩,使用spark.io.compression.codec属性指定的压缩器,默认是snappy,速度优先

继续阅读