天天看點

Spark 累加器、廣播變量1. 累加器2. 廣播變量

1. 累加器

Apache Spark 使用共享變量。當驅動程式向叢集執行器發送任務時,叢集的每個節點都會收到一份共享變量的副本。如果我們想實作向 MapReduce 上的計數器,顯然是不可以的;如果我們想要更新這些副本的值,也無法影響驅動器的對中應變量。Apache Spark 支援兩種基本類型的共享變量——累加器和廣播。

當我們想要對資料進行關聯操作時,可以使用累加器。累加器通過關聯和互動操作,可實作計數、求和或求平均的功能。

累加器有兩個實作類:

  • LongAccumulator

    :用于計算64位整數的總和、計數和平均值的累加器。
  • DoubleAccumulator

    :用于計算雙精度浮點數的和、計數和平均數。

1.1 使用累加器

累加器實作步驟:

  1. 執行個體化累加器對象
  2. 使用

    SparkContext

    注冊累加器對象
  3. 使用累加器進行資料的添加
  4. 使用

    value()

    擷取累加器的數值
def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("Scala_Accumulator")
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(1 to 10)

    // 1. 執行個體化累加器對象
    var acc = new LongAccumulator()

    // 2. 使用 `SparkContext` 注冊累加器對象
    sc.register(acc)

    // 3. 使用累加器進行資料的添加
    rdd.map(x => {
        acc.add(1)
    }).collect().toList

    // 4. 使用 `value()` 擷取累加器的數值
    println(acc.value)

}
           

1.2 自定義累加器

自定義累加器的功能提供在在

1.x

版本之後,但是在

2.0

版本之後,累加器的易用性有了較大的改進,并提供了

AccumulatorV2

抽象類。是以自定義累加器,可繼承該類,并實作其中的方法。

class My_Accumulator extends AccumulatorV2[Int, Long] {
    // 建立成員屬性用于記錄目前累加器的值
    var count: Long = 0L

    /**
   * 用于判斷目前累加器是否為初始狀态
   *
   * @return
   */
    override def isZero: Boolean = this.count == 0

    /**
   * 複制目前累加器的狀态
   */
    override def copy(): AccumulatorV2[Int, Long] = {
        val acc = new My_Accumulator
        acc.count = this.count
        acc
    }

    /**
   * 重置目前累加器的值
   */
    override def reset(): Unit = this.count = 0

    /**
   * 将傳入的對象添加到目前的累加器值中
   *
   * @param v 累加的參數
   */
    override def add(v: Int): Unit = this.count += v

    /**
   * 将其他分區的累加器傳入merge 并将所有累加器的值進行合并
   *
   * @param other 其他分區的累加器
   */
    override def merge(other: AccumulatorV2[Int, Long]): Unit = {
        val o = other.asInstanceOf[My_Accumulator]
        this.count += o.count
    }

    /**
   * @return 傳回目前累加器的值
   */
    override def value: Long = this.count

}
           
def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("Scala_Accumulator")
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(1 to 10)

    // 1. 執行個體化自定義的累加器對象
    var acc = new My_Accumulator

    // 2. 使用 `SparkContext` 注冊自定義累加器對象
    sc.register(acc)

    // 3. 使用累加器進行資料的添加
    rdd.map(x => {
        acc.add(1)
    }).collect().toList

    // 4. 使用 `value()` 擷取累加器的數值
    println(acc.value)

}
           

2. 廣播變量

廣播變量(Broadcast)允許 Spark 的不同節點上儲存一個安全的隻讀緩存變量,通過廣播變量可高效分發較大的對象。

使用廣播變量後,使得每個節點的 executor 中的 cache 才存儲副本,就不同為每個 task 建立副本了。

2.1 使用廣播變量

使用廣播變量:

  • 對一個類型為 T 的對象調用

    SparkContext.broadcast

    建立出一個

    Broadcast[T]

    對象
  • 通過

    value

    屬性通路該對象的值
  • 變量隻會被發送到各個節點一次,為避免副本被更改,應當作為隻讀處理
def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("Demo14")
    val sc = new SparkContext(conf)


    // 加載黑名單檔案放入集合
    val source = Source.fromFile("C:\\Users\\Amos\\Desktop\\blackList.txt")
    //   檔案大小1GB
    val blkList: List[String] = source.getLines().toList
    source.close()

    // 1. 建立廣播對象
    val bc_blkList: Broadcast[List[String]] = sc.broadcast(blkList)

    // 加載日志資料建立RDD
    val rdd = sc.textFile("C:\\Users\\Amos\\Desktop\\weblog\\access.log-20211107")
    // 将日志資料通過處理得到  (ip,是否為黑名單使用者)
    rdd
    .repartition(10)
    .map(line => {
        val ip = line.split(" ").head
        //2. 需要使用時  從公共緩存中讀取對象
        val list = bc_blkList.value
        (ip, if (list.contains(ip)) 1 else 0)
    })
    .foreach(println)

}
           

❤️ END ❤️

繼續閱讀