1. 累加器
Apache Spark 使用共享變量。當驅動程式向叢集執行器發送任務時,叢集的每個節點都會收到一份共享變量的副本。如果我們想實作向 MapReduce 上的計數器,顯然是不可以的;如果我們想要更新這些副本的值,也無法影響驅動器的對中應變量。Apache Spark 支援兩種基本類型的共享變量——累加器和廣播。
當我們想要對資料進行關聯操作時,可以使用累加器。累加器通過關聯和互動操作,可實作計數、求和或求平均的功能。
累加器有兩個實作類:
-
:用于計算64位整數的總和、計數和平均值的累加器。LongAccumulator
-
:用于計算雙精度浮點數的和、計數和平均數。DoubleAccumulator
1.1 使用累加器
累加器實作步驟:
- 執行個體化累加器對象
- 使用
注冊累加器對象SparkContext
- 使用累加器進行資料的添加
- 使用
擷取累加器的數值value()
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("Scala_Accumulator")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(1 to 10)
// 1. 執行個體化累加器對象
var acc = new LongAccumulator()
// 2. 使用 `SparkContext` 注冊累加器對象
sc.register(acc)
// 3. 使用累加器進行資料的添加
rdd.map(x => {
acc.add(1)
}).collect().toList
// 4. 使用 `value()` 擷取累加器的數值
println(acc.value)
}
1.2 自定義累加器
自定義累加器的功能提供在在
1.x
版本之後,但是在
2.0
版本之後,累加器的易用性有了較大的改進,并提供了
AccumulatorV2
抽象類。是以自定義累加器,可繼承該類,并實作其中的方法。
class My_Accumulator extends AccumulatorV2[Int, Long] {
// 建立成員屬性用于記錄目前累加器的值
var count: Long = 0L
/**
* 用于判斷目前累加器是否為初始狀态
*
* @return
*/
override def isZero: Boolean = this.count == 0
/**
* 複制目前累加器的狀态
*/
override def copy(): AccumulatorV2[Int, Long] = {
val acc = new My_Accumulator
acc.count = this.count
acc
}
/**
* 重置目前累加器的值
*/
override def reset(): Unit = this.count = 0
/**
* 将傳入的對象添加到目前的累加器值中
*
* @param v 累加的參數
*/
override def add(v: Int): Unit = this.count += v
/**
* 将其他分區的累加器傳入merge 并将所有累加器的值進行合并
*
* @param other 其他分區的累加器
*/
override def merge(other: AccumulatorV2[Int, Long]): Unit = {
val o = other.asInstanceOf[My_Accumulator]
this.count += o.count
}
/**
* @return 傳回目前累加器的值
*/
override def value: Long = this.count
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("Scala_Accumulator")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(1 to 10)
// 1. 執行個體化自定義的累加器對象
var acc = new My_Accumulator
// 2. 使用 `SparkContext` 注冊自定義累加器對象
sc.register(acc)
// 3. 使用累加器進行資料的添加
rdd.map(x => {
acc.add(1)
}).collect().toList
// 4. 使用 `value()` 擷取累加器的數值
println(acc.value)
}
2. 廣播變量
廣播變量(Broadcast)允許 Spark 的不同節點上儲存一個安全的隻讀緩存變量,通過廣播變量可高效分發較大的對象。
使用廣播變量後,使得每個節點的 executor 中的 cache 才存儲副本,就不同為每個 task 建立副本了。
2.1 使用廣播變量
使用廣播變量:
- 對一個類型為 T 的對象調用
建立出一個SparkContext.broadcast
對象Broadcast[T]
- 通過
屬性通路該對象的值value
- 變量隻會被發送到各個節點一次,為避免副本被更改,應當作為隻讀處理
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("Demo14")
val sc = new SparkContext(conf)
// 加載黑名單檔案放入集合
val source = Source.fromFile("C:\\Users\\Amos\\Desktop\\blackList.txt")
// 檔案大小1GB
val blkList: List[String] = source.getLines().toList
source.close()
// 1. 建立廣播對象
val bc_blkList: Broadcast[List[String]] = sc.broadcast(blkList)
// 加載日志資料建立RDD
val rdd = sc.textFile("C:\\Users\\Amos\\Desktop\\weblog\\access.log-20211107")
// 将日志資料通過處理得到 (ip,是否為黑名單使用者)
rdd
.repartition(10)
.map(line => {
val ip = line.split(" ").head
//2. 需要使用時 從公共緩存中讀取對象
val list = bc_blkList.value
(ip, if (list.contains(ip)) 1 else 0)
})
.foreach(println)
}
❤️ END ❤️