天天看點

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

文章目錄

  • 第1章 RDD概述
    • 1.1 什麼是RDD
    • 1.2 RDD特性
  • 第2章 RDD程式設計
    • 2.1 程式設計模型
    • 2.2 RDD的建立
      • 2.2.1 IDEA環境準備
      • 2.2.2 從集合中建立
      • 2.2.3 從外部存儲系統的資料集建立
      • 2.2.4 從其他RDD建立
      • 2.2.5建立IDEA快捷鍵
    • 2.3分區規則
      • 2.3.1預設分區源碼(RDD資料從集合中建立)
      • 2.3.2分區源碼(RDD資料從集合中建立)
      • 2.3.3分區源碼(RDD資料從檔案中讀取後建立)
    • 2.4行動算子 2.5 轉換算子
    • 2.6 RDD序列化(未寫完)
    • 2.7 RDD依賴關系
      • 2.7.1 檢視血緣關系
      • 2.7.2 檢視依賴關系
      • 2.7.3 窄依賴
      • 2.7.4 寬依賴
      • 2.7.5 Spark中的Job排程
      • 2.7.6 Stage任務劃分(面試重點)
    • 2.8 RDD持久化
      • 2.8.1 RDD Cache緩存
      • 2.8.2 RDD CheckPoint檢查點
      • 2.8.3 緩存和檢查點差別
      • 2.8.4 檢查點存儲到HDFS叢集
    • 2.9 鍵值對RDD資料分區
      • 2.9.1 Hash分區
      • 2.9.2 Ranger分區
      • 2.9.3 自定義分區
  • 第3章 資料讀取與儲存
    • 3.1 檔案類資料讀取與儲存
      • 3.1.1 Text檔案
      • 3.1.2 Json檔案
      • 3.1.3 Sequence檔案
      • 3.1.4 Object對象檔案
    • 3.2 檔案系統類資料讀取與儲存
      • 3.2.1 HDFS
      • 3.2.2 MySQL
  • 第4章 累加器
    • 4.1 系統累加器
    • 4.2 自定義累加器
  • 第5章 廣播變量

第1章 RDD概述

1.1 什麼是RDD

RDD(Resilient Distributed Dataset)叫做彈性分布式資料集,是Spark中最基本的資料抽象。

代碼中是一個抽象類,它代表一個彈性的、不可變、可分區、裡面的元素可并行計算的集合。

1)彈性

​ 存儲的彈性:記憶體與磁盤的自動切換;

​ 容錯的彈性:資料丢失可以自動恢複;

​ 計算的彈性:計算出錯重試機制;

​ 分片的彈性:可根據需要重新分片。

2)分布式

資料存儲在大資料叢集不同節點上

3)資料集

RDD封裝了計算邏輯,并不儲存資料

4)資料抽象

RDD是一個抽象類,需要子類具體實作

5)不可變

RDD封裝了計算邏輯,是不可以改變的,想要改變,隻能産生新的RDD,在新的RDD裡面封裝計算邏輯

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

6)可分區、并行計算

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

1.2 RDD特性

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

A list of partitions

多個分區,分區可以看成是資料集的基本組成機關

對于 RDD 來說, 每個分區都會被一個計算任務處理, 并決定了并行計算的粒度。

使用者可以在建立 RDD 時指定 RDD 的分區數, 如果沒有指定, 那麼就會采用預設值。 預設值就是程式所配置設定到的 CPU Core 的數目.

每個配置設定的存儲是由BlockManager 實作的, 每個分區都會被邏輯映射成 BlockManager 的一個 Block,,而這個 Block 會被一個 Task 負責計算。

A function for computing each split

計算每個切片(分區)的函數.

Spark 中 RDD 的計算是以分片為機關的,每個 RDD 都會實作compute函數以達到這個目的

A list of dependencies on other RDDs

與其他 RDD 之間的依賴關系

RDD 的每次轉換都會生成一個新的 RDD, 是以 RDD 之間會形成類似于流水線一樣的前後依賴關系。 在部分分區資料丢失時,Spark 可以通過這個依賴關系重新計算丢失的分區資料, 而不是對 RDD 的所有分區進行重新計算

Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

對存儲鍵值對的 RDD,還有一個可選的分區器

隻有對于 key-value的 RDD,才會有 Partitioner, 非key-value的 RDD 的 Partitioner 的值是 None;Partitiner 不但決定了 RDD 的本區數量, 也決定了 parent RDD Shuffle 輸出時的分區數量

Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

存儲每個切片優先(preferred location)位置的清單

比如對于一個 HDFS 檔案來說, 這個清單儲存的就是每個 Partition 所在檔案塊的位置. 按照“移動資料不如移動計算”的理念, Spark 在進行任務排程的時候, 會盡可能地将計算任務配置設定到其所要處理資料塊的存儲位置.

第2章 RDD程式設計

2.1 程式設計模型

在Spark中,RDD被表示為對象,通過對象上的方法調用來對RDD進行轉換。RDD經過一系列的transformations轉換定義之後,就可以調用actions觸發RDD的計算,action可以是向應用程式傳回結果,或者是向存儲系統儲存資料。在Spark中,隻有遇到action,才會執行RDD的計算(即延遲計算)。

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量
Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

算子:從認知心理學角度來講,解決問題其實是将問題的初始狀态,通過一系列的轉換操作(operator),變成解決狀态。

2.2 RDD的建立

在Spark中建立RDD的建立方式可以分為三種:從集合中建立RDD、從外部存儲建立RDD、從其他RDD建立。

2.2.1 IDEA環境準備

1.建立maven項目

2.引入依賴

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>
           

3.建立一個scala檔案夾,并把它修改為Source Root

4.添加scala 架構支援

2.2.2 從集合中建立

1)從集合中建立RDD,Spark主要提供了兩種函數:parallelize和makeRDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object createrdd01_array {

    def main(args: Array[String]): Unit = {

        //1.建立SparkConf并設定App名稱
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.建立SparkContext,該對象是送出Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.使用parallelize()建立rdd
        val rdd: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8))

        rdd.foreach(println)

        println("分區數:" + rdd.partitions.size)

        //4.使用makeRDD()建立rdd
        val rdd1: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8))

        rdd1.foreach(println)
        println("分區數:" + rdd1.partitions.size)

        sc.stop()
    }
}
           

2.2.3 從外部存儲系統的資料集建立

由外部存儲系統的資料集建立RDD包括:本地的檔案系統,還有所有Hadoop支援的資料集,比如HDFS、HBase等

1)資料準備

在建立的項目名稱上右鍵=》建立input檔案夾=》在input檔案夾上右鍵=》分别建立1.txt和2.txt。每個檔案裡面準備一些word單詞。

2)建立RDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object createrdd03_file {

    def main(args: Array[String]): Unit = {

        //1.建立SparkConf并設定App名稱
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.建立SparkContext,該對象是送出Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.讀取檔案。如果是叢集路徑:hdfs://hadoop001:9000/input
        val lineWordRdd: RDD[String] = sc.textFile("input")

        //4.列印
        lineWordRdd.foreach(println)

        //5.關閉
        sc.stop()
    }
}
           

2.2.4 從其他RDD建立

主要是通過一個RDD運算完後,再産生新的RDD。

詳見2.4節

2.2.5建立IDEA快捷鍵

1)點選File->Settings…->Editor->Live Templates->output->Live Template

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

2)點選左下角的Define->選擇Scala

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

3)在Abbreviation中輸入快捷鍵名稱scc,在Template text中填寫,輸入快捷鍵後生成的内容。

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量
//1.建立SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

//2.建立SparkContext,該對象是送出Spark App的入口
val sc: SparkContext = new SparkContext(conf)

//3.中間邏輯


//4.關閉連接配接
sc.stop()
           
Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

2.3分區規則

2.3.1預設分區源碼(RDD資料從集合中建立)

1)預設分區數源碼解讀

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

2.3.2分區源碼(RDD資料從集合中建立)

1)分區測試(RDD資料從集合中建立)

object partition02_Array {

    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCoreTest1")
        val sc: SparkContext = new SparkContext(conf)

        //1)4個資料,設定4個分區,輸出:0分區->1,1分區->2,2分區->3,3分區->4
        //val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 4)

        //2)4個資料,設定3個分區,輸出:0分區->1,1分區->2,2分區->3,4
        //val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 3)

        //3)5個資料,設定3個分區,輸出:0分區->1,1分區->2、3,2分區->4、5
        val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5), 3)

        rdd.saveAsTextFile("output")

        sc.stop()
    }
           

2)分區源碼

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

2.3.3分區源碼(RDD資料從檔案中讀取後建立)

1)分區測試

object partition03_file {

    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCoreTest1")
        val sc: SparkContext = new SparkContext(conf)

        //1)預設分區的數量:預設取值為目前核數和2的最小值
        //val rdd: RDD[String] = sc.textFile("input")

        //2)輸入資料1-4,每行一個數字;輸出:0=>{1、2} 1=>{3} 2=>{4} 3=>{空}
        //val rdd: RDD[String] = sc.textFile("input/3.txt",3)

        //3)輸入資料1-4,一共一行;輸出:0=>{1234} 1=>{空} 2=>{空} 3=>{空} 
        val rdd: RDD[String] = sc.textFile("input/4.txt",3)

        rdd.saveAsTextFile("output")

        sc.stop()
    }
}
           

2)源碼解析

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

注意:getSplits檔案傳回的是切片規劃,真正讀取是在compute方法中建立LineRecordReader讀取的,有兩個關鍵變量

start=split.getStart() end = start + split.getLength

2.4行動算子 2.5 轉換算子

https://blog.csdn.net/zmzdmx/article/details/109633799

飛機直達

2.6 RDD序列化(未寫完)

在實際開發中我們往往需要自己定義一些對于RDD的操作,那麼此時需要注意的是,初始化工作是在Driver端進行的,而實際運作程式是在Executor端進行的,這就涉及到了跨程序通信,是需要序列化的。下面我們看幾個例子:

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

暫未看懂

2.7 RDD依賴關系

2.7.1 檢視血緣關系

RDD隻支援粗粒度轉換,即在大量記錄上執行的單個操作。将建立RDD的一系列Lineage(血統)記錄下來,以便恢複丢失的分區。RDD的Lineage會記錄RDD的中繼資料資訊和轉換行為,當該RDD的部分分區資料丢失時,它可以根據這些資訊來重新運算和恢複丢失的資料分區。

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

1)代碼實作

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    //1.建立SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

    //2.建立SparkContext,該對象是送出Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    val rdd: RDD[String] = sc.textFile("in/word.txt")
    println(rdd.toDebugString)
    val flatRdd: RDD[String] = rdd.flatMap(_.split(" "))
    println(flatRdd.toDebugString)
    val mapRdd: RDD[(String, Int)] = flatRdd.map((_,1))
    println(mapRdd.toDebugString)
    val reduceRdd: RDD[(String, Int)] = mapRdd.reduceByKey(_+_)
    println(reduceRdd.toDebugString)

    reduceRdd.collect()

    //4.關閉連接配接
    sc.stop()
  }
}
           

2)列印結果

(2) in/word.txt MapPartitionsRDD[1] at textFile at ScalaRDD.scala:14 []
 |  in/word.txt HadoopRDD[0] at textFile at ScalaRDD.scala:14 []
(2) MapPartitionsRDD[2] at flatMap at ScalaRDD.scala:16 []
 |  in/word.txt MapPartitionsRDD[1] at textFile at ScalaRDD.scala:14 []
 |  in/word.txt HadoopRDD[0] at textFile at ScalaRDD.scala:14 []
(2) MapPartitionsRDD[3] at map at ScalaRDD.scala:18 []
 |  MapPartitionsRDD[2] at flatMap at ScalaRDD.scala:16 []
 |  in/word.txt MapPartitionsRDD[1] at textFile at ScalaRDD.scala:14 []
 |  in/word.txt HadoopRDD[0] at textFile at ScalaRDD.scala:14 []
(2) ShuffledRDD[4] at reduceByKey at ScalaRDD.scala:20 []
 +-(2) MapPartitionsRDD[3] at map at ScalaRDD.scala:18 []
    |  MapPartitionsRDD[2] at flatMap at ScalaRDD.scala:16 []
    |  in/word.txt MapPartitionsRDD[1] at textFile at ScalaRDD.scala:14 []
    |  in/word.txt HadoopRDD[0] at textFile at ScalaRDD.scala:14 []
           

注意:圓括号中的數字表示RDD的并行度,也就是有幾個分區

2.7.2 檢視依賴關系

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

1)代碼實作

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    //1.建立SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

    //2.建立SparkContext,該對象是送出Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    val rdd: RDD[String] = sc.textFile("in/word.txt")
    println(rdd.dependencies)
    val flatRdd: RDD[String] = rdd.flatMap(_.split(" "))
    println(flatRdd.dependencies)
    val mapRdd: RDD[(String, Int)] = flatRdd.map((_,1))
    println(mapRdd.dependencies)
    val reduceRdd: RDD[(String, Int)] = mapRdd.reduceByKey(_+_)
    println(reduceRdd.dependencies)

    reduceRdd.collect()

    //4.關閉連接配接
    sc.stop()
  }
}
           

2)列印結果

List(org.apache.spark.OneToOneDependency@79b08632)
List(org.apache.spark.OneToOneDependency@26d820eb)
List(org.apache.spark.OneToOneDependency@5ff60a8c)
List(org.apache.spark.ShuffleDependency@5e8cda75)
           

3)全局搜尋org.apache.spark.OneToOneDependency

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
    override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
           

注意:要想了解RDDS是如何工作的,最重要的就是了解Transformations。

RDD 之間的關系可以從兩個次元來了解: 一個是 RDD 是從哪些 RDD 轉換而來, 也就是 RDD 的 parent RDD(s)是什麼; 另一個就是 RDD 依賴于 parent RDD(s)的哪些 Partition(s). 這種關系就是 RDD 之間的依賴.

RDD和它依賴的父RDD(s)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

2.7.3 窄依賴

窄依賴表示每一個父RDD的Partition最多被子RDD的一個Partition使用,窄依賴我們形象的比喻為獨生子女。

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

2.7.4 寬依賴

寬依賴表示同一個父RDD的Partition被多個子RDD的Partition依賴,會引起Shuffle,總結:寬依賴我們形象的比喻為超生。

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

具有寬依賴的 transformations 包括: sort, reduceByKey, groupByKey, join, 和調用rePartition函數的任何操作.

寬依賴對 Spark 去評估一個 transformations 有更加重要的影響, 比如對性能的影響.

2.7.5 Spark中的Job排程

一個Spark應用包含一個驅動程序(driver process,在這個程序中寫Spark的邏輯代碼)和多個執行器程序(executor process,跨越叢集中的多個節點)。Spark 程式自己是運作在驅動節點, 然後發送指令到執行器節點。

一個Spark叢集可以同時運作多個Spark應用, 這些應用是由叢集管理器(cluster manager)來排程。

Spark應用可以并發的運作多個job, job對應着給定的應用内的在RDD上的每個 action操作。

Spark應用

一個Spark應用可以包含多個Spark job, Spark job是在驅動程式中由SparkContext 來定義的。

當啟動一個 SparkContext 的時候, 就開啟了一個 Spark 應用。 一個驅動程式被啟動了, 多個執行器在叢集中的多個工作節點(worker nodes)也被啟動了。 一個執行器就是一個 JVM, 一個執行器不能跨越多個節點, 但是一個節點可以包括多個執行器。

一個 RDD 會跨多個執行器被并行計算. 每個執行器可以有這個 RDD 的多個分區, 但是一個分區不能跨越多個執行器.

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

Spark Job 的劃分

由于Spark的懶執行, 在驅動程式調用一個action之前, Spark 應用不會做任何事情,

針對每個action,Spark 排程器就建立一個執行圖(execution graph)和啟動一個 Spark job。

每個 job 由多個stages 組成, 這些 stages 就是實作最終的 RDD 所需的資料轉換的步驟。一個寬依賴劃分一個stage。每個 stage 由多個 tasks 來組成, 這些 tasks 就表示每個并行計算, 并且會在多個執行器上執行。

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

2.7.6 Stage任務劃分(面試重點)

1)DAG有向無環圖

DAG(Directed Acyclic Graph)有向無環圖是由點和線組成的拓撲圖形,該圖形具有方向,不會閉環。原始的RDD通過一系列的轉換就形成了DAG,根據RDD之間的依賴關系的不同将DAG劃分成不同的Stage,對于窄依賴,partition的轉換處理在Stage中完成計算。對于寬依賴,由于有Shuffle的存在,隻能在parent RDD處理完成後,才能開始接下來的計算,是以寬依賴是劃分Stage的依據。例如,DAG記錄了RDD的轉換過程和任務的階段。

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

2)RDD任務切分中間分為:Application、Job、Stage和Task

(1)Application:初始化一個SparkContext即生成一個Application;

(2)Job:一個Action算子就會生成一個Job;

(3)Stage:Stage等于寬依賴的個數加1;

(4)Task:一個Stage階段中,最後一個RDD的分區個數就是Task的個數。

注意:Application->Job->Stage->Task每一層都是1對n的關系。

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

2.8 RDD持久化

2.8.1 RDD Cache緩存

RDD通過Cache或者Persist方法将前面的計算結果緩存,預設情況下會把資料以序列化的形式緩存在JVM的堆記憶體中。但是并不是這兩個方法被調用時立即緩存,而是觸發後面的action時,該RDD将會被緩存在計算節點的記憶體中,并供後面重用。

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

1)代碼實作

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[String] = sc.textFile("in/word.txt")
    val wordRdd: RDD[String] = rdd.flatMap(_.split(" "))
    val rdd1: RDD[(String, Int)] = wordRdd.map((_,1))

    //cache操作會增加血緣關系,不改變原有的血緣關系
    println(rdd1.toDebugString)

    //資料緩存
    rdd1.cache()
    //可以更改存儲級别
    //rdd1.persist(StorageLevel.MEMORY_AND_DISK_2)

    //觸發執行邏輯
    rdd1.collect()

    println("-----------")
    println(rdd1.toDebugString)

    //再次觸發執行邏輯
    rdd1.collect()

    sc.stop()
  }
}
           

2)源碼解析

mapRdd.cache()
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
           

注意:預設的存儲級别都是僅在記憶體存儲一份。在存儲級别的末尾加上“_2”表示持久化的資料存為兩份。

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

緩存有可能丢失,或者存儲于記憶體的資料由于記憶體不足而被删除,RDD的緩存容錯機制保證了即使緩存丢失也能保證計算的正确執行。通過基于RDD的一系列轉換,丢失的資料會被重算,由于RDD的各個Partition是相對獨立的,是以隻需要計算丢失的部分即可,并不需要重算全部Partition。

3)自帶緩存算子

Spark會自動對一些Shuffle操作的中間資料做持久化操作(比如:reduceByKey)。這樣做的目的是為了當一個節點Shuffle失敗了避免重新計算整個輸入。但是,在實際使用的時候,如果想重用資料,仍然建議調用persist或cache。

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[String] = sc.textFile("in/word.txt")
    val wordRdd: RDD[String] = rdd.flatMap(_.split(" "))
    val rdd1: RDD[(String, Int)] = wordRdd.map((_,1))

    //采用reduceByKey,自帶緩存
    val wordByKeyRdd: RDD[(String, Int)] = rdd1.reduceByKey(_+_)

    // cache操作會增加血緣關系,不改變原有的血緣關系
    println(wordByKeyRdd.toDebugString)

    wordByKeyRdd.cache()

    wordByKeyRdd.collect()


    println("--------")
    println(wordByKeyRdd.toDebugString)
    sc.stop()
  }
}
           

通路http://192.168.83.100:4040/jobs/頁面,檢視第一個和第二個job的DAG圖。說明:增加緩存後血緣依賴關系仍然有,但是,第二個job取的資料是從緩存中取的。

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量
Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

2.8.2 RDD CheckPoint檢查點

1)檢查點:是通過将RDD中間結果寫入磁盤。

2)為什麼要做檢查點?

由于血緣依賴過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果檢查點之後有節點出現問題,可以從檢查點開始重做血緣,減少了開銷。

3)檢查點存儲路徑:Checkpoint的資料通常是存儲在HDFS等容錯、高可用的檔案系統

4)檢查點資料存儲格式為:二進制的檔案

5)檢查點切斷血緣:在Checkpoint的過程中,該RDD的所有依賴于父RDD中的資訊将全部被移除。

6)檢查點觸發時間:對RDD進行checkpoint操作并不會馬上被執行,必須執行Action操作才能觸發。但是檢查點為了資料安全,會從血緣關系的最開始執行一遍

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

7)設定檢查點步驟

(1)設定檢查點資料存儲路徑:sc.setCheckpointDir("./checkpoint1")

(2)調用檢查點方法:wordToOneRdd.checkpoint()

8)代碼實作

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    // 需要設定路徑,否則抛異常:Checkpoint directory has not been set in the SparkContext
    sc.setCheckpointDir("in/testCheckpoint")

    val rdd: RDD[String] = sc.textFile("in/word.txt")

    val wordToOneRdd: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_,1))

    //增加緩存,避免再重新跑一個job做checkpoint
    wordToOneRdd.cache()

    //資料檢查點:針對wordToOneRdd做檢查點計算
    wordToOneRdd.checkpoint()

    wordToOneRdd.collect().foreach(println)
    //會立即啟動一個新的job來專門做checkpoint運算
      
      //再次觸發執行邏輯
        wordToOneRdd.collect().foreach(println)
        wordToOneRdd.collect().foreach(println)
    
    sc.stop()
  }
}
           

9)執行結果

通路http://192.168.83.100:4040/jobs/頁面,檢視4個job的DAG圖。其中第2個圖是checkpoint的job運作DAG圖。第3、4張圖說明,檢查點切斷了血緣依賴關系。

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量
Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量
Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量
Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

(1)隻增加checkpoint,沒有增加Cache緩存列印

第1個job執行完,觸發了checkpoint,第2個job運作checkpoint,并把資料存儲在檢查點上。第3、4個job,資料從檢查點上直接讀取。

(2)增加checkpoint,也增加Cache緩存列印

​ 第1個job執行完,資料就儲存到Cache裡面了,第2個job運作checkpoint,直接讀取Cache裡面的資料,并把資料存儲在檢查點上。第3、4個job,資料從檢查點上直接讀取

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

2.8.3 緩存和檢查點差別

1)Cache緩存隻是将資料儲存起來,不切斷血緣依賴。Checkpoint檢查點切斷血緣依賴。

2)Cache緩存的資料通常存儲在磁盤、記憶體等地方,可靠性低。Checkpoint的資料通常存儲在HDFS等容錯、高可用的檔案系統,可靠性高。

3)建議對checkpoint()的RDD使用Cache緩存,這樣checkpoint的job隻需從Cache緩存中讀取資料即可,否則需要再從頭計算一次RDD。

4)如果使用完了緩存,可以通過unpersist()方法釋放緩存

2.8.4 檢查點存儲到HDFS叢集

如果檢查點資料存儲到HDFS叢集,要注意配置通路叢集的使用者名。否則會報通路權限異常。

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    //設定通路hdfs叢集的使用者名
    System.setProperty("HADOOP_USER_NAME","root")

    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    //需要設定路徑,需要提前在hdfs叢集上建立檔案夾
    sc.setCheckpointDir("hdfs://hadoop100:9000/checkpoint")

    val rdd: RDD[String] = sc.textFile("in/word.txt")
    val wordToOneRdd: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_,1))

    wordToOneRdd.cache()

    wordToOneRdd.checkpoint()

    wordToOneRdd.collect().foreach(println)
    
    sc.stop()
  }
}
           

2.9 鍵值對RDD資料分區

Spark目前支援Hash分區和Range分區,和使用者自定義分區。Hash分區為目前的預設分區。分區器直接決定了RDD中分區的個數、RDD中每條資料經過Shuffle後進入哪個分區和Reduce的個數。

1)注意:

(1)隻有Key-Value類型的RDD才有分區器,非Key-Value類型的RDD分區的值是None

(2)每個RDD的分區ID範圍:0~numPartitions-1,決定這個值是屬于那個分區的。

2)擷取RDD分區

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1,1),(2,2),(3,3)))

    println(rdd.partitioner)
    val rdd2: RDD[(Int, Int)] = rdd.partitionBy(new HashPartitioner(3))
    //列印分區器
    println(rdd2.partitioner)

    sc.stop()
  }
}
           

2.9.1 Hash分區

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

2.9.2 Ranger分區

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

2.9.3 自定義分區

詳細檢視算子那一章節

https://blog.csdn.net/zmzdmx/article/details/109633799

https://blog.csdn.net/zmzdmx/article/details/109565914

第3章 資料讀取與儲存

Spark的資料讀取及資料儲存可以從兩個次元來作區分:檔案格式以及檔案系統。

檔案格式分為:Text檔案、Json檔案、Csv檔案、Sequence檔案以及Object檔案;

檔案系統分為:本地檔案系統、HDFS以及資料庫。

3.1 檔案類資料讀取與儲存

3.1.1 Text檔案

1)資料讀取:textFile(String)

2)資料儲存:saveAsTextFile(String)

3)代碼實作

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    //讀取檔案
    val rdd: RDD[String] = sc.textFile("in/word.txt")
    //儲存檔案
    rdd.saveAsTextFile("in/word")
    sc.stop()
  }
}
           

4)注意:如果是叢集路徑:hdfs://hadoop100:9000/input/1.txt

3.1.2 Json檔案

如果JSON檔案中每一行就是一個JSON記錄,那麼可以通過将JSON檔案當做文本檔案來讀取,然後利用相關的JSON庫對每一條資料進行JSON解析。

1)資料準備

​ 在input目錄下建立1.txt檔案,裡面存儲如下内容

{"username": "zhangsan","age": 20}
{"username": "lisi","age": 18}
{"username": "wangwu","age": 16}
           

2)代碼實作

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    //讀取檔案
    val rdd: RDD[String] = sc.textFile("in/word.json")
    //導入解析Json所需的包并解析Json
    import scala.util.parsing.json._

    val res: RDD[Option[Any]] = rdd.map(JSON.parseFull)
    
    res.collect().foreach(println)
    sc.stop()
  }
}
           

3)修改輸入檔案格式

[{"username": "zhangsan","age": 20},
{"username": "lisi","age": 18},
{"username": "wangwu","age": 16}
]
           

再次執行程式,發現解析失敗。原因是一行一行的讀取檔案。

注意:使用RDD讀取JSON檔案處理很複雜,同時SparkSQL內建了很好的處理JSON檔案的方式,是以應用中多是采用SparkSQL處理JSON檔案。

3.1.3 Sequence檔案

SequenceFile檔案是Hadoop用來存儲二進制形式的key-value對而設計的一種平面檔案(Flat File)。在SparkContext中,可以調用sequenceFile[keyClass, valueClass](path)。

1)代碼實作

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[(Int, Int)] = sc.makeRDD(Array((1,2),(2,3),(5,6)))
    //儲存資料為SequenceFile
    rdd.saveAsSequenceFile("output")

    //讀取SequenceFile檔案
    sc.sequenceFile[Int,Int]("output").collect().foreach(println)
    sc.stop()
  }
}
           

2)注意:SequenceFile檔案隻針對PairRDD

3.1.4 Object對象檔案

對象檔案是将對象序列化後儲存的檔案,采用Java的序列化機制。可以通過objectFile[k,v](path)函數接收一個路徑,讀取對象檔案,傳回對應的RDD,也可以通過調用saveAsObjectFile()實作對對象檔案的輸出。因為是序列化是以要指定類型。

1)代碼實作

object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[(Int, Int)] = sc.makeRDD(Array((1,2),(2,3),(5,6)))
    rdd.saveAsObjectFile("output")
    sc.objectFile("output").collect().foreach(println)
    sc.stop()
  }
}
           

3.2 檔案系統類資料讀取與儲存

3.2.1 HDFS

Spark的整個生态系統與Hadoop是完全相容的,是以對于Hadoop所支援的檔案類型或者資料庫類型,Spark也同樣支援。另外,由于Hadoop的API有新舊兩個版本,是以Spark為了能夠相容Hadoop所有的版本,也提供了兩套建立操作接口。對于外部存儲建立操作而言,hadoopRDD和newHadoopRDD是最為抽象的兩個函數接口

3.2.2 MySQL

支援通過Java JDBC通路關系型資料庫。需要通過JdbcRDD進行,示例如下:

(1)添加依賴

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.38</version>
</dependency>
           

(2)從Mysql讀取資料

sc: SparkContext,   Spark程式執行的入口,上下文對象
    getConnection: () => Connection,  擷取資料庫連接配接
    sql: String,  執行SQL語句
    lowerBound: Long, 查詢的起始位置
    upperBound: Long, 查詢的結束位置
    numPartitions: Int,分區數
    mapRow: (ResultSet) => T   對結果集的處理
           
object ScalaRDD {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    //定義連接配接mysql的參數
    val driver= "com.mysql.jdbc.Driver"
    val url="jdbc:mysql://hadoop100:3306/test"
    val username="root"
    val password="ok"

    //建立jdbc
    val rdd = new JdbcRDD(sc, () => {
      Class.forName(driver)
      DriverManager.getConnection(url, username, password)
    }, "SELECT * FROM `course` WHERE cno>=? and cno<=?", 1001, 10010, 1,
      rs=>(rs.getInt(1),rs.getString(2),rs.getString(3)))
    rdd.foreach(println)

    sc.stop()
  }
}
           

(3)往Mysql寫入資料

//在循環中建立對象,效率低
    rdd.foreach{
      case (name,age)=>{
        //注冊驅動
        Class.forName(driver)
        //擷取連接配接
        val conn: Connection = DriverManager.getConnection(url,userName,passWd)
        //執行的sql
        var sql:String = "insert into user(name,age) values(?,?)"
        //擷取資料庫操作對象
        val ps: PreparedStatement = conn.prepareStatement(sql)
        //給參數指派
        ps.setString(1,name)
        ps.setInt(2,age)
        //執行sql語句
        ps.executeUpdate()
        //釋放資源
        ps.close()
        conn.close()
      }
    
           
object MySQL {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("mysql").setMaster("local[3]")
    val sc = new SparkContext(conf)
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    import spark.implicits._
    val url="jdbc:mysql://192.168.83.100:3306/test"
    val driver="com.mysql.jdbc.Driver"
    val user="root"
    val pwd="ok"

    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("lisi",20),("wangwu",30)))

    //下面這段代碼需要ps實作序列化,但是ps不是自己定義的類型,需要建立一個對象,沒有辦法實作
    //注冊驅動
    Class.forName(driver)
    //擷取連接配接
    val conn: Connection = DriverManager.getConnection(url, user, pwd)
    //聲明資料庫操作的sql
    var sql = "insert into user(name,age) values(?,?)"
    //建立資料庫操作對象preparestatement
    val ps: PreparedStatement = conn.prepareStatement(sql)
    rdd.foreach {
      case (name, age) => {
        //給參數指派
        ps.setString(1, name)
        ps.setInt(2, age)
        //執行sql
        ps.executeUpdate()
      }
    }
    //關閉連接配接
    ps.close()
    
  }
}
           

推薦使用foreachPartition,一個分區建立一個連結

object MySQL {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("mysql").setMaster("local[3]")
    val sc = new SparkContext(conf)
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    import spark.implicits._
    val url="jdbc:mysql://192.168.83.100:3306/test"
    val driver="com.mysql.jdbc.Driver"
    val user="root"
    val pwd="ok"

    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("lisi",20),("wangwu",30)))

    rdd.foreachPartition{
      //datas是rdd一個分區的資料
      datas=>{
        //注冊驅動
        Class.forName(driver)
        //擷取連接配接
        val conn: Connection = DriverManager.getConnection(url, user, pwd)
        //聲明資料庫操作的sql
        var sql = "insert into user(name,age) values(?,?)"
        //建立資料庫操作對象preparestatement
        val ps: PreparedStatement = conn.prepareStatement(sql)

        //對目前分區内的資料進行周遊
        //注意:這個foreach不是算子,是集合的方法
        datas.foreach{
          case (name, age) => {
            //給參數指派
            ps.setString(1, name)
            ps.setInt(2, age)
            //執行sql
            ps.executeUpdate()
          }
        }
        ps.close()
        conn.close()
      }
    }
    //關閉連接配接
    sc.stop()

  }
}
           

第4章 累加器

累加器:分布式共享隻寫變量。(Task和Task之間不能讀資料)

累加器用來對資訊進行聚合,通常在向Spark傳遞函數時,比如使用map()函數或者用 filter()傳條件時,可以使用驅動器程式中定義的變量,但是叢集中運作的每個任務都會得到這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。如果我們想實作所有分片處理時更新共享變量的功能,那麼累加器可以實作我們想要的效果。

4.1 系統累加器

1)代碼實作

object accumulator01 {

    def main(args: Array[String]): Unit = {

        //1.建立SparkConf并設定App名稱
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")

        //2.建立SparkContext,該對象是送出Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.建立RDD
        val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))

        //3.1 列印單詞出現的次數(a,10) 代碼執行了shuffle
        dataRDD.reduceByKey(_ + _).collect().foreach(println)

        //3.2 如果不用shuffle,怎麼處理呢?
        var sum = 0
        // 列印是在Executor端
        dataRDD.foreach {
            case (a, count) => {
                sum = sum + count
                println("sum=" + sum)
            }
        }
        // 列印是在Driver端
        println(("a", sum))

        //3.3 使用累加器實作資料的聚合功能
        // Spark自帶常用的累加器
        //3.3.1 聲明累加器
        val sum1: LongAccumulator = sc.longAccumulator("sum1")

        dataRDD.foreach{
            case (a, count)=>{
                //3.3.2 使用累加器
                sum1.add(count)
            }
        }

        //3.3.3 擷取累加器
        println(sum1.value)

        //4.關閉連接配接
        sc.stop()
    }
           

通過在驅動器中調用SparkContext.accumulator(initialValue)方法,建立出存有初始值的累加器。傳回值為org.apache.spark.Accumulator[T]對象,其中T是初始值initialValue的類型。Spark閉包裡的執行器代碼可以使用累加器的+=方法(在Java中是 add)增加累加器的值。驅動器程式可以調用累加器的value屬性(在Java中使用value()或setValue())來通路累加器的值。

注意:

(1)工作節點上的任務不能互相通路累加器的值。從這些任務的角度來看,累加器是一個隻寫變量。

(2)對于要在行動操作中使用的累加器,Spark隻會把每個任務對各累加器的修改應用一次。是以,如果想要一個無論在失敗還是重複計算時都絕對可靠的累加器,我們必須把它放在foreach()這樣的行動操作中。轉化操作中累加器可能會發生不止一次更新。

4.2 自定義累加器

自定義累加器類型的功能在1.X版本中就已經提供了,但是使用起來比較麻煩,在2.0版本後,累加器的易用性有了較大的改進,而且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實作方式。

1)自定義累加器步驟

(1)繼承AccumulatorV2,設定輸入、輸出泛型

(2)重寫方法

2)需求:自定義累加器,統計集合中首字母為“H”單詞出現的次數。

List(“Hello”, “Hello”, “Hello”, “Hello”, “Hello”, “Spark”, “Spark”)

3)代碼實作

object accumulator_define {

    def main(args: Array[String]): Unit = {

        //1.建立SparkConf并設定App名稱
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.建立SparkContext,該對象是送出Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3. 建立RDD
        val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hello", "Hello", "Hello", "Hello", "Spark", "Spark"))

        //3.1 建立累加器
        val accumulator1: MyAccumulator = new MyAccumulator()

        //3.2 注冊累加器
        sc.register(accumulator1,"wordcount")

        //3.3 使用累加器
        rdd.foreach(
            word =>{
                accumulator1.add(word)
            }
        )

        //3.4 擷取累加器的累加結果
        println(accumulator1.value)


        //4.關閉連接配接
        sc.stop()
    }
}

// 聲明累加器
// 1.繼承AccumulatorV2,設定輸入、輸出泛型
// 2.重新方法
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {

    // 定義輸出資料集合
    var map = mutable.Map[String, Long]()

    // 是否為初始化狀态,如果集合資料為空,即為初始化狀态
    override def isZero: Boolean = map.isEmpty

    // 複制累加器
    override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
        new MyAccumulator()
    }

    // 重置累加器
    override def reset(): Unit = map.clear()

    // 增加資料
    override def add(v: String): Unit = {
        // 業務邏輯
        if (v.startsWith("H")) {
            map(v) = map.getOrElse(v, 0L) + 1L
        }
    }

    // 合并累加器
    override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {

        var map1 = map
        var map2 = other.value

        map = map1.foldLeft(map2)(
            (map,kv)=>{
                map(kv._1) = map.getOrElse(kv._1, 0L) + kv._2
                map
            }
        )
    }

    // 累加器的值,其實就是累加器的傳回結果
    override def value: mutable.Map[String, Long] = map
}
           

第5章 廣播變量

廣播變量:分布式共享隻讀變量。

在多個并行操作中(Executor)使用同一個變量,Spark預設會為每個任務(Task)分别發送,這樣如果共享比較大的對象,會占用很大工作節點的記憶體。

廣播變量用來高效分發較大的對象。向所有工作節點發送一個較大的隻讀值,以供一個或多個Spark操作使用。比如,如果你的應用需要向所有節點發送一個較大的隻讀查詢表,甚至是機器學習算法中的一個很大的特征向量,廣播變量用起來都很順手。

1)使用廣播變量步驟:

(1)通過對一個類型T的對象調用SparkContext.broadcast建立出一個Broadcast[T]對象,任何可序列化的類型都可以這麼實作。

(2)通過value屬性通路該對象的值(在Java中為value()方法)。

(3)變量隻會被發到各個節點一次,應作為隻讀值處理(修改這個值不會影響到别的節點)。

2)原理說明

Spark 之 SparkCore(未寫完)第1章 RDD概述第2章 RDD程式設計第3章 資料讀取與儲存第4章 累加器第5章 廣播變量

3)代碼實作

object broadcast {

    def main(args: Array[String]): Unit = {

        //1.建立SparkConf并設定App名稱
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.建立SparkContext,該對象是送出Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.建立RDD
        //val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
        //val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))

        //3.1 采用RDD的方式實作 rdd1 join rdd2,用到Shuffle,性能比較低
        //rdd1.join(rdd2).collect().foreach(println)

        //3.2 采用集合的方式,實作rdd1和list的join
        val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
        val list: List[(String, Int)] = List(("a", 4), ("b", 5), ("c", 6))

        // 聲明廣播變量
        val broadcastList: Broadcast[List[(String, Int)]] = sc.broadcast(list)

        val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
            case (k1, v1) => {

                var v2: Int = 0

                // 使用廣播變量
                //for ((k3, v3) <- list.value) {
                for ((k3, v3) <- broadcastList.value) {
                    if (k1 == k3) {
                        v2 = v3
                    }
                }

                (k1, (v1, v2))
            }
        }
        resultRDD.foreach(println)

        //4.關閉連接配接
        sc.stop()
    }
}
           

繼續閱讀