天天看点

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

文章目录

  • 第1章 RDD概述
    • 1.1 什么是RDD
    • 1.2 RDD特性
  • 第2章 RDD编程
    • 2.1 编程模型
    • 2.2 RDD的创建
      • 2.2.1 IDEA环境准备
      • 2.2.2 从集合中创建
      • 2.2.3 从外部存储系统的数据集创建
      • 2.2.4 从其他RDD创建
      • 2.2.5创建IDEA快捷键
    • 2.3分区规则
      • 2.3.1默认分区源码(RDD数据从集合中创建)
      • 2.3.2分区源码(RDD数据从集合中创建)
      • 2.3.3分区源码(RDD数据从文件中读取后创建)
    • 2.4行动算子 2.5 转换算子
    • 2.6 RDD序列化(未写完)
    • 2.7 RDD依赖关系
      • 2.7.1 查看血缘关系
      • 2.7.2 查看依赖关系
      • 2.7.3 窄依赖
      • 2.7.4 宽依赖
      • 2.7.5 Spark中的Job调度
      • 2.7.6 Stage任务划分(面试重点)
    • 2.8 RDD持久化
      • 2.8.1 RDD Cache缓存
      • 2.8.2 RDD CheckPoint检查点
      • 2.8.3 缓存和检查点区别
      • 2.8.4 检查点存储到HDFS集群
    • 2.9 键值对RDD数据分区
      • 2.9.1 Hash分区
      • 2.9.2 Ranger分区
      • 2.9.3 自定义分区
  • 第3章 数据读取与保存
    • 3.1 文件类数据读取与保存
      • 3.1.1 Text文件
      • 3.1.2 Json文件
      • 3.1.3 Sequence文件
      • 3.1.4 Object对象文件
    • 3.2 文件系统类数据读取与保存
      • 3.2.1 HDFS
      • 3.2.2 MySQL
  • 第4章 累加器
    • 4.1 系统累加器
    • 4.2 自定义累加器
  • 第5章 广播变量

第1章 RDD概述

1.1 什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。

代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

1)弹性

​ 存储的弹性:内存与磁盘的自动切换;

​ 容错的弹性:数据丢失可以自动恢复;

​ 计算的弹性:计算出错重试机制;

​ 分片的弹性:可根据需要重新分片。

2)分布式

数据存储在大数据集群不同节点上

3)数据集

RDD封装了计算逻辑,并不保存数据

4)数据抽象

RDD是一个抽象类,需要子类具体实现

5)不可变

RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

6)可分区、并行计算

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

1.2 RDD特性

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

A list of partitions

多个分区,分区可以看成是数据集的基本组成单位

对于 RDD 来说, 每个分区都会被一个计算任务处理, 并决定了并行计算的粒度。

用户可以在创建 RDD 时指定 RDD 的分区数, 如果没有指定, 那么就会采用默认值。 默认值就是程序所分配到的 CPU Core 的数目.

每个分配的存储是由BlockManager 实现的, 每个分区都会被逻辑映射成 BlockManager 的一个 Block,,而这个 Block 会被一个 Task 负责计算。

A function for computing each split

计算每个切片(分区)的函数.

Spark 中 RDD 的计算是以分片为单位的,每个 RDD 都会实现compute函数以达到这个目的

A list of dependencies on other RDDs

与其他 RDD 之间的依赖关系

RDD 的每次转换都会生成一个新的 RDD, 所以 RDD 之间会形成类似于流水线一样的前后依赖关系。 在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据, 而不是对 RDD 的所有分区进行重新计算

Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

对存储键值对的 RDD,还有一个可选的分区器

只有对于 key-value的 RDD,才会有 Partitioner, 非key-value的 RDD 的 Partitioner 的值是 None;Partitiner 不但决定了 RDD 的本区数量, 也决定了 parent RDD Shuffle 输出时的分区数量

Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

存储每个切片优先(preferred location)位置的列表

比如对于一个 HDFS 文件来说, 这个列表保存的就是每个 Partition 所在文件块的位置. 按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候, 会尽可能地将计算任务分配到其所要处理数据块的存储位置.

第2章 RDD编程

2.1 编程模型

在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。RDD经过一系列的transformations转换定义之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果,或者是向存储系统保存数据。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算)。

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量
Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

算子:从认知心理学角度来讲,解决问题其实是将问题的初始状态,通过一系列的转换操作(operator),变成解决状态。

2.2 RDD的创建

在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。

2.2.1 IDEA环境准备

1.创建maven项目

2.引入依赖

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
           

3.创建一个scala文件夹,并把它修改为Source Root

4.添加scala 框架支持

2.2.2 从集合中创建

1)从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object createrdd01_array {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.使用parallelize()创建rdd
        val rdd: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8))

        rdd.foreach(println)

        println("分区数:" + rdd.partitions.size)

        //4.使用makeRDD()创建rdd
        val rdd1: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8))

        rdd1.foreach(println)
        println("分区数:" + rdd1.partitions.size)

        sc.stop()
    }
}
           

2.2.3 从外部存储系统的数据集创建

由外部存储系统的数据集创建RDD包括:本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、HBase等

1)数据准备

在新建的项目名称上右键=》新建input文件夹=》在input文件夹上右键=》分别新建1.txt和2.txt。每个文件里面准备一些word单词。

2)创建RDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object createrdd03_file {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.读取文件。如果是集群路径:hdfs://hadoop001:9000/input
        val lineWordRdd: RDD[String] = sc.textFile("input")

        //4.打印
        lineWordRdd.foreach(println)

        //5.关闭
        sc.stop()
    }
}
           

2.2.4 从其他RDD创建

主要是通过一个RDD运算完后,再产生新的RDD。

详见2.4节

2.2.5创建IDEA快捷键

1)点击File->Settings…->Editor->Live Templates->output->Live Template

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

2)点击左下角的Define->选择Scala

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

3)在Abbreviation中输入快捷键名称scc,在Template text中填写,输入快捷键后生成的内容。

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)

//3.中间逻辑


//4.关闭连接
sc.stop()
           
Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

2.3分区规则

2.3.1默认分区源码(RDD数据从集合中创建)

1)默认分区数源码解读

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

2.3.2分区源码(RDD数据从集合中创建)

1)分区测试(RDD数据从集合中创建)

object partition02_Array {

    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCoreTest1")
        val sc: SparkContext = new SparkContext(conf)

        //1)4个数据,设置4个分区,输出:0分区->1,1分区->2,2分区->3,3分区->4
        //val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 4)

        //2)4个数据,设置3个分区,输出:0分区->1,1分区->2,2分区->3,4
        //val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 3)

        //3)5个数据,设置3个分区,输出:0分区->1,1分区->2、3,2分区->4、5
        val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5), 3)

        rdd.saveAsTextFile("output")

        sc.stop()
    }
           

2)分区源码

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

2.3.3分区源码(RDD数据从文件中读取后创建)

1)分区测试

object partition03_file {

    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCoreTest1")
        val sc: SparkContext = new SparkContext(conf)

        //1)默认分区的数量:默认取值为当前核数和2的最小值
        //val rdd: RDD[String] = sc.textFile("input")

        //2)输入数据1-4,每行一个数字;输出:0=>{1、2} 1=>{3} 2=>{4} 3=>{空}
        //val rdd: RDD[String] = sc.textFile("input/3.txt",3)

        //3)输入数据1-4,一共一行;输出:0=>{1234} 1=>{空} 2=>{空} 3=>{空} 
        val rdd: RDD[String] = sc.textFile("input/4.txt",3)

        rdd.saveAsTextFile("output")

        sc.stop()
    }
}
           

2)源码解析

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

注意:getSplits文件返回的是切片规划,真正读取是在compute方法中创建LineRecordReader读取的,有两个关键变量

start=split.getStart() end = start + split.getLength

2.4行动算子 2.5 转换算子

https://blog.csdn.net/zmzdmx/article/details/109633799

飞机直达

2.6 RDD序列化(未写完)

在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要注意的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。下面我们看几个例子:

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

暂未看懂

2.7 RDD依赖关系

2.7.1 查看血缘关系

RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

1)代码实现

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    val rdd: RDD[String] = sc.textFile("in/word.txt")
    println(rdd.toDebugString)
    val flatRdd: RDD[String] = rdd.flatMap(_.split(" "))
    println(flatRdd.toDebugString)
    val mapRdd: RDD[(String, Int)] = flatRdd.map((_,1))
    println(mapRdd.toDebugString)
    val reduceRdd: RDD[(String, Int)] = mapRdd.reduceByKey(_+_)
    println(reduceRdd.toDebugString)

    reduceRdd.collect()

    //4.关闭连接
    sc.stop()
  }
}
           

2)打印结果

(2) in/word.txt MapPartitionsRDD[1] at textFile at ScalaRDD.scala:14 []
 |  in/word.txt HadoopRDD[0] at textFile at ScalaRDD.scala:14 []
(2) MapPartitionsRDD[2] at flatMap at ScalaRDD.scala:16 []
 |  in/word.txt MapPartitionsRDD[1] at textFile at ScalaRDD.scala:14 []
 |  in/word.txt HadoopRDD[0] at textFile at ScalaRDD.scala:14 []
(2) MapPartitionsRDD[3] at map at ScalaRDD.scala:18 []
 |  MapPartitionsRDD[2] at flatMap at ScalaRDD.scala:16 []
 |  in/word.txt MapPartitionsRDD[1] at textFile at ScalaRDD.scala:14 []
 |  in/word.txt HadoopRDD[0] at textFile at ScalaRDD.scala:14 []
(2) ShuffledRDD[4] at reduceByKey at ScalaRDD.scala:20 []
 +-(2) MapPartitionsRDD[3] at map at ScalaRDD.scala:18 []
    |  MapPartitionsRDD[2] at flatMap at ScalaRDD.scala:16 []
    |  in/word.txt MapPartitionsRDD[1] at textFile at ScalaRDD.scala:14 []
    |  in/word.txt HadoopRDD[0] at textFile at ScalaRDD.scala:14 []
           

注意:圆括号中的数字表示RDD的并行度,也就是有几个分区

2.7.2 查看依赖关系

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

1)代码实现

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConf并设置App名称
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

    //2.创建SparkContext,该对象是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    val rdd: RDD[String] = sc.textFile("in/word.txt")
    println(rdd.dependencies)
    val flatRdd: RDD[String] = rdd.flatMap(_.split(" "))
    println(flatRdd.dependencies)
    val mapRdd: RDD[(String, Int)] = flatRdd.map((_,1))
    println(mapRdd.dependencies)
    val reduceRdd: RDD[(String, Int)] = mapRdd.reduceByKey(_+_)
    println(reduceRdd.dependencies)

    reduceRdd.collect()

    //4.关闭连接
    sc.stop()
  }
}
           

2)打印结果

List(org.apache.spark.OneToOneDependency@79b08632)
List(org.apache.spark.OneToOneDependency@26d820eb)
List(org.apache.spark.OneToOneDependency@5ff60a8c)
List(org.apache.spark.ShuffleDependency@5e8cda75)
           

3)全局搜索org.apache.spark.OneToOneDependency

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
    override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
           

注意:要想理解RDDS是如何工作的,最重要的就是理解Transformations。

RDD 之间的关系可以从两个维度来理解: 一个是 RDD 是从哪些 RDD 转换而来, 也就是 RDD 的 parent RDD(s)是什么; 另一个就是 RDD 依赖于 parent RDD(s)的哪些 Partition(s). 这种关系就是 RDD 之间的依赖.

RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

2.7.3 窄依赖

窄依赖表示每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女。

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

2.7.4 宽依赖

宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖,会引起Shuffle,总结:宽依赖我们形象的比喻为超生。

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

具有宽依赖的 transformations 包括: sort, reduceByKey, groupByKey, join, 和调用rePartition函数的任何操作.

宽依赖对 Spark 去评估一个 transformations 有更加重要的影响, 比如对性能的影响.

2.7.5 Spark中的Job调度

一个Spark应用包含一个驱动进程(driver process,在这个进程中写Spark的逻辑代码)和多个执行器进程(executor process,跨越集群中的多个节点)。Spark 程序自己是运行在驱动节点, 然后发送指令到执行器节点。

一个Spark集群可以同时运行多个Spark应用, 这些应用是由集群管理器(cluster manager)来调度。

Spark应用可以并发的运行多个job, job对应着给定的应用内的在RDD上的每个 action操作。

Spark应用

一个Spark应用可以包含多个Spark job, Spark job是在驱动程序中由SparkContext 来定义的。

当启动一个 SparkContext 的时候, 就开启了一个 Spark 应用。 一个驱动程序被启动了, 多个执行器在集群中的多个工作节点(worker nodes)也被启动了。 一个执行器就是一个 JVM, 一个执行器不能跨越多个节点, 但是一个节点可以包括多个执行器。

一个 RDD 会跨多个执行器被并行计算. 每个执行器可以有这个 RDD 的多个分区, 但是一个分区不能跨越多个执行器.

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

Spark Job 的划分

由于Spark的懒执行, 在驱动程序调用一个action之前, Spark 应用不会做任何事情,

针对每个action,Spark 调度器就创建一个执行图(execution graph)和启动一个 Spark job。

每个 job 由多个stages 组成, 这些 stages 就是实现最终的 RDD 所需的数据转换的步骤。一个宽依赖划分一个stage。每个 stage 由多个 tasks 来组成, 这些 tasks 就表示每个并行计算, 并且会在多个执行器上执行。

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

2.7.6 Stage任务划分(面试重点)

1)DAG有向无环图

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。原始的RDD通过一系列的转换就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。例如,DAG记录了RDD的转换过程和任务的阶段。

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

2)RDD任务切分中间分为:Application、Job、Stage和Task

(1)Application:初始化一个SparkContext即生成一个Application;

(2)Job:一个Action算子就会生成一个Job;

(3)Stage:Stage等于宽依赖的个数加1;

(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

注意:Application->Job->Stage->Task每一层都是1对n的关系。

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

2.8 RDD持久化

2.8.1 RDD Cache缓存

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

1)代码实现

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[String] = sc.textFile("in/word.txt")
    val wordRdd: RDD[String] = rdd.flatMap(_.split(" "))
    val rdd1: RDD[(String, Int)] = wordRdd.map((_,1))

    //cache操作会增加血缘关系,不改变原有的血缘关系
    println(rdd1.toDebugString)

    //数据缓存
    rdd1.cache()
    //可以更改存储级别
    //rdd1.persist(StorageLevel.MEMORY_AND_DISK_2)

    //触发执行逻辑
    rdd1.collect()

    println("-----------")
    println(rdd1.toDebugString)

    //再次触发执行逻辑
    rdd1.collect()

    sc.stop()
  }
}
           

2)源码解析

mapRdd.cache()
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
           

注意:默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

3)自带缓存算子

Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[String] = sc.textFile("in/word.txt")
    val wordRdd: RDD[String] = rdd.flatMap(_.split(" "))
    val rdd1: RDD[(String, Int)] = wordRdd.map((_,1))

    //采用reduceByKey,自带缓存
    val wordByKeyRdd: RDD[(String, Int)] = rdd1.reduceByKey(_+_)

    // cache操作会增加血缘关系,不改变原有的血缘关系
    println(wordByKeyRdd.toDebugString)

    wordByKeyRdd.cache()

    wordByKeyRdd.collect()


    println("--------")
    println(wordByKeyRdd.toDebugString)
    sc.stop()
  }
}
           

访问http://192.168.83.100:4040/jobs/页面,查看第一个和第二个job的DAG图。说明:增加缓存后血缘依赖关系仍然有,但是,第二个job取的数据是从缓存中取的。

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量
Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

2.8.2 RDD CheckPoint检查点

1)检查点:是通过将RDD中间结果写入磁盘。

2)为什么要做检查点?

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

3)检查点存储路径:Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统

4)检查点数据存储格式为:二进制的文件

5)检查点切断血缘:在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。

6)检查点触发时间:对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

7)设置检查点步骤

(1)设置检查点数据存储路径:sc.setCheckpointDir("./checkpoint1")

(2)调用检查点方法:wordToOneRdd.checkpoint()

8)代码实现

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    // 需要设置路径,否则抛异常:Checkpoint directory has not been set in the SparkContext
    sc.setCheckpointDir("in/testCheckpoint")

    val rdd: RDD[String] = sc.textFile("in/word.txt")

    val wordToOneRdd: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_,1))

    //增加缓存,避免再重新跑一个job做checkpoint
    wordToOneRdd.cache()

    //数据检查点:针对wordToOneRdd做检查点计算
    wordToOneRdd.checkpoint()

    wordToOneRdd.collect().foreach(println)
    //会立即启动一个新的job来专门做checkpoint运算
      
      //再次触发执行逻辑
        wordToOneRdd.collect().foreach(println)
        wordToOneRdd.collect().foreach(println)
    
    sc.stop()
  }
}
           

9)执行结果

访问http://192.168.83.100:4040/jobs/页面,查看4个job的DAG图。其中第2个图是checkpoint的job运行DAG图。第3、4张图说明,检查点切断了血缘依赖关系。

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量
Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量
Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量
Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

(1)只增加checkpoint,没有增加Cache缓存打印

第1个job执行完,触发了checkpoint,第2个job运行checkpoint,并把数据存储在检查点上。第3、4个job,数据从检查点上直接读取。

(2)增加checkpoint,也增加Cache缓存打印

​ 第1个job执行完,数据就保存到Cache里面了,第2个job运行checkpoint,直接读取Cache里面的数据,并把数据存储在检查点上。第3、4个job,数据从检查点上直接读取

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

2.8.3 缓存和检查点区别

1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。

2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。

3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。

4)如果使用完了缓存,可以通过unpersist()方法释放缓存

2.8.4 检查点存储到HDFS集群

如果检查点数据存储到HDFS集群,要注意配置访问集群的用户名。否则会报访问权限异常。

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    //设置访问hdfs集群的用户名
    System.setProperty("HADOOP_USER_NAME","root")

    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    //需要设置路径,需要提前在hdfs集群上创建文件夹
    sc.setCheckpointDir("hdfs://hadoop100:9000/checkpoint")

    val rdd: RDD[String] = sc.textFile("in/word.txt")
    val wordToOneRdd: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_,1))

    wordToOneRdd.cache()

    wordToOneRdd.checkpoint()

    wordToOneRdd.collect().foreach(println)
    
    sc.stop()
  }
}
           

2.9 键值对RDD数据分区

Spark目前支持Hash分区和Range分区,和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。

1)注意:

(1)只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None

(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

2)获取RDD分区

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1,1),(2,2),(3,3)))

    println(rdd.partitioner)
    val rdd2: RDD[(Int, Int)] = rdd.partitionBy(new HashPartitioner(3))
    //打印分区器
    println(rdd2.partitioner)

    sc.stop()
  }
}
           

2.9.1 Hash分区

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

2.9.2 Ranger分区

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

2.9.3 自定义分区

详细查看算子那一章节

https://blog.csdn.net/zmzdmx/article/details/109633799

https://blog.csdn.net/zmzdmx/article/details/109565914

第3章 数据读取与保存

Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

文件格式分为:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;

文件系统分为:本地文件系统、HDFS以及数据库。

3.1 文件类数据读取与保存

3.1.1 Text文件

1)数据读取:textFile(String)

2)数据保存:saveAsTextFile(String)

3)代码实现

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    //读取文件
    val rdd: RDD[String] = sc.textFile("in/word.txt")
    //保存文件
    rdd.saveAsTextFile("in/word")
    sc.stop()
  }
}
           

4)注意:如果是集群路径:hdfs://hadoop100:9000/input/1.txt

3.1.2 Json文件

如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。

1)数据准备

​ 在input目录下创建1.txt文件,里面存储如下内容

{"username": "zhangsan","age": 20}
{"username": "lisi","age": 18}
{"username": "wangwu","age": 16}
           

2)代码实现

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    //读取文件
    val rdd: RDD[String] = sc.textFile("in/word.json")
    //导入解析Json所需的包并解析Json
    import scala.util.parsing.json._

    val res: RDD[Option[Any]] = rdd.map(JSON.parseFull)
    
    res.collect().foreach(println)
    sc.stop()
  }
}
           

3)修改输入文件格式

[{"username": "zhangsan","age": 20},
{"username": "lisi","age": 18},
{"username": "wangwu","age": 16}
]
           

再次执行程序,发现解析失败。原因是一行一行的读取文件。

注意:使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件。

3.1.3 Sequence文件

SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFile[keyClass, valueClass](path)。

1)代码实现

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[(Int, Int)] = sc.makeRDD(Array((1,2),(2,3),(5,6)))
    //保存数据为SequenceFile
    rdd.saveAsSequenceFile("output")

    //读取SequenceFile文件
    sc.sequenceFile[Int,Int]("output").collect().foreach(println)
    sc.stop()
  }
}
           

2)注意:SequenceFile文件只针对PairRDD

3.1.4 Object对象文件

对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v](path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。

1)代码实现

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[(Int, Int)] = sc.makeRDD(Array((1,2),(2,3),(5,6)))
    rdd.saveAsObjectFile("output")
    sc.objectFile("output").collect().foreach(println)
    sc.stop()
  }
}
           

3.2 文件系统类数据读取与保存

3.2.1 HDFS

Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口。对于外部存储创建操作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口

3.2.2 MySQL

支持通过Java JDBC访问关系型数据库。需要通过JdbcRDD进行,示例如下:

(1)添加依赖

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.38</version>
</dependency>
           

(2)从Mysql读取数据

sc: SparkContext,   Spark程序执行的入口,上下文对象
    getConnection: () => Connection,  获取数据库连接
    sql: String,  执行SQL语句
    lowerBound: Long, 查询的起始位置
    upperBound: Long, 查询的结束位置
    numPartitions: Int,分区数
    mapRow: (ResultSet) => T   对结果集的处理
           
object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    //定义连接mysql的参数
    val driver= "com.mysql.jdbc.Driver"
    val url="jdbc:mysql://hadoop100:3306/test"
    val username="root"
    val password="ok"

    //创建jdbc
    val rdd = new JdbcRDD(sc, () => {
      Class.forName(driver)
      DriverManager.getConnection(url, username, password)
    }, "SELECT * FROM `course` WHERE cno>=? and cno<=?", 1001, 10010, 1,
      rs=>(rs.getInt(1),rs.getString(2),rs.getString(3)))
    rdd.foreach(println)

    sc.stop()
  }
}
           

(3)往Mysql写入数据

//在循环中创建对象,效率低
    rdd.foreach{
      case (name,age)=>{
        //注册驱动
        Class.forName(driver)
        //获取连接
        val conn: Connection = DriverManager.getConnection(url,userName,passWd)
        //执行的sql
        var sql:String = "insert into user(name,age) values(?,?)"
        //获取数据库操作对象
        val ps: PreparedStatement = conn.prepareStatement(sql)
        //给参数赋值
        ps.setString(1,name)
        ps.setInt(2,age)
        //执行sql语句
        ps.executeUpdate()
        //释放资源
        ps.close()
        conn.close()
      }
    
           
object MySQL {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("mysql").setMaster("local[3]")
    val sc = new SparkContext(conf)
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    import spark.implicits._
    val url="jdbc:mysql://192.168.83.100:3306/test"
    val driver="com.mysql.jdbc.Driver"
    val user="root"
    val pwd="ok"

    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("lisi",20),("wangwu",30)))

    //下面这段代码需要ps实现序列化,但是ps不是自己定义的类型,需要创建一个对象,没有办法实现
    //注册驱动
    Class.forName(driver)
    //获取连接
    val conn: Connection = DriverManager.getConnection(url, user, pwd)
    //声明数据库操作的sql
    var sql = "insert into user(name,age) values(?,?)"
    //创建数据库操作对象preparestatement
    val ps: PreparedStatement = conn.prepareStatement(sql)
    rdd.foreach {
      case (name, age) => {
        //给参数赋值
        ps.setString(1, name)
        ps.setInt(2, age)
        //执行sql
        ps.executeUpdate()
      }
    }
    //关闭连接
    ps.close()
    
  }
}
           

推荐使用foreachPartition,一个分区创建一个链接

object MySQL {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("mysql").setMaster("local[3]")
    val sc = new SparkContext(conf)
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    import spark.implicits._
    val url="jdbc:mysql://192.168.83.100:3306/test"
    val driver="com.mysql.jdbc.Driver"
    val user="root"
    val pwd="ok"

    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("lisi",20),("wangwu",30)))

    rdd.foreachPartition{
      //datas是rdd一个分区的数据
      datas=>{
        //注册驱动
        Class.forName(driver)
        //获取连接
        val conn: Connection = DriverManager.getConnection(url, user, pwd)
        //声明数据库操作的sql
        var sql = "insert into user(name,age) values(?,?)"
        //创建数据库操作对象preparestatement
        val ps: PreparedStatement = conn.prepareStatement(sql)

        //对当前分区内的数据进行遍历
        //注意:这个foreach不是算子,是集合的方法
        datas.foreach{
          case (name, age) => {
            //给参数赋值
            ps.setString(1, name)
            ps.setInt(2, age)
            //执行sql
            ps.executeUpdate()
          }
        }
        ps.close()
        conn.close()
      }
    }
    //关闭连接
    sc.stop()

  }
}
           

第4章 累加器

累加器:分布式共享只写变量。(Task和Task之间不能读数据)

累加器用来对信息进行聚合,通常在向Spark传递函数时,比如使用map()函数或者用 filter()传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。

4.1 系统累加器

1)代码实现

object accumulator01 {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.创建RDD
        val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))

        //3.1 打印单词出现的次数(a,10) 代码执行了shuffle
        dataRDD.reduceByKey(_ + _).collect().foreach(println)

        //3.2 如果不用shuffle,怎么处理呢?
        var sum = 0
        // 打印是在Executor端
        dataRDD.foreach {
            case (a, count) => {
                sum = sum + count
                println("sum=" + sum)
            }
        }
        // 打印是在Driver端
        println(("a", sum))

        //3.3 使用累加器实现数据的聚合功能
        // Spark自带常用的累加器
        //3.3.1 声明累加器
        val sum1: LongAccumulator = sc.longAccumulator("sum1")

        dataRDD.foreach{
            case (a, count)=>{
                //3.3.2 使用累加器
                sum1.add(count)
            }
        }

        //3.3.3 获取累加器
        println(sum1.value)

        //4.关闭连接
        sc.stop()
    }
           

通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。Spark闭包里的执行器代码可以使用累加器的+=方法(在Java中是 add)增加累加器的值。驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue())来访问累加器的值。

注意:

(1)工作节点上的任务不能相互访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。

(2)对于要在行动操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动操作中。转化操作中累加器可能会发生不止一次更新。

4.2 自定义累加器

自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。

1)自定义累加器步骤

(1)继承AccumulatorV2,设定输入、输出泛型

(2)重写方法

2)需求:自定义累加器,统计集合中首字母为“H”单词出现的次数。

List(“Hello”, “Hello”, “Hello”, “Hello”, “Hello”, “Spark”, “Spark”)

3)代码实现

object accumulator_define {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3. 创建RDD
        val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hello", "Hello", "Hello", "Hello", "Spark", "Spark"))

        //3.1 创建累加器
        val accumulator1: MyAccumulator = new MyAccumulator()

        //3.2 注册累加器
        sc.register(accumulator1,"wordcount")

        //3.3 使用累加器
        rdd.foreach(
            word =>{
                accumulator1.add(word)
            }
        )

        //3.4 获取累加器的累加结果
        println(accumulator1.value)


        //4.关闭连接
        sc.stop()
    }
}

// 声明累加器
// 1.继承AccumulatorV2,设定输入、输出泛型
// 2.重新方法
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {

    // 定义输出数据集合
    var map = mutable.Map[String, Long]()

    // 是否为初始化状态,如果集合数据为空,即为初始化状态
    override def isZero: Boolean = map.isEmpty

    // 复制累加器
    override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
        new MyAccumulator()
    }

    // 重置累加器
    override def reset(): Unit = map.clear()

    // 增加数据
    override def add(v: String): Unit = {
        // 业务逻辑
        if (v.startsWith("H")) {
            map(v) = map.getOrElse(v, 0L) + 1L
        }
    }

    // 合并累加器
    override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {

        var map1 = map
        var map2 = other.value

        map = map1.foldLeft(map2)(
            (map,kv)=>{
                map(kv._1) = map.getOrElse(kv._1, 0L) + kv._2
                map
            }
        )
    }

    // 累加器的值,其实就是累加器的返回结果
    override def value: mutable.Map[String, Long] = map
}
           

第5章 广播变量

广播变量:分布式共享只读变量。

在多个并行操作中(Executor)使用同一个变量,Spark默认会为每个任务(Task)分别发送,这样如果共享比较大的对象,会占用很大工作节点的内存。

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。

1)使用广播变量步骤:

(1)通过对一个类型T的对象调用SparkContext.broadcast创建出一个Broadcast[T]对象,任何可序列化的类型都可以这么实现。

(2)通过value属性访问该对象的值(在Java中为value()方法)。

(3)变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。

2)原理说明

Spark 之 SparkCore(未写完)第1章 RDD概述第2章 RDD编程第3章 数据读取与保存第4章 累加器第5章 广播变量

3)代码实现

object broadcast {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.创建RDD
        //val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
        //val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))

        //3.1 采用RDD的方式实现 rdd1 join rdd2,用到Shuffle,性能比较低
        //rdd1.join(rdd2).collect().foreach(println)

        //3.2 采用集合的方式,实现rdd1和list的join
        val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
        val list: List[(String, Int)] = List(("a", 4), ("b", 5), ("c", 6))

        // 声明广播变量
        val broadcastList: Broadcast[List[(String, Int)]] = sc.broadcast(list)

        val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
            case (k1, v1) => {

                var v2: Int = 0

                // 使用广播变量
                //for ((k3, v3) <- list.value) {
                for ((k3, v3) <- broadcastList.value) {
                    if (k1 == k3) {
                        v2 = v3
                    }
                }

                (k1, (v1, v2))
            }
        }
        resultRDD.foreach(println)

        //4.关闭连接
        sc.stop()
    }
}
           

继续阅读