Spark Streaming進階
- 帶狀态的算子
帶狀态的算子
updateStateByKey算子的使用
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by llp on 2021/4/27.
*/
object StatefulWordCount2 {
def main(args: Array[String]): Unit = {
// 1.建立StreamingContext
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
val ssc = new StreamingContext(sparkConf,Seconds(5))
// 2.建立checkpoint
// 如果要使用updateStateByKey算子,就必須設定一個checkpoint目錄,開啟checkpoint機制
// 這樣的話才能把每個key對應的state除了在記憶體中有,那麼是不是也要checkpoint一份
// 因為你要長期儲存一份key的state的話,那麼spark streaming是要求必須用checkpoint的,以便于在
// 記憶體資料丢失的時候,可以從checkpoint中恢複資料
// 開啟checkpoint機制,很簡單,隻要調用jssc的checkpoint()方法,設定一個hdfs目錄即可
// https://blog.csdn.net/erfucun/article/details/52278729
ssc.checkpoint("/opt/data")
// 實作WordCount邏輯
val lines = ssc.socketTextStream("hadoop2", 9999)
//val lines = ssc.textFileStream("/opt/data")
val result = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey((values:Seq[Int],state:Option[Int])=>{
//更新函數兩個參數Seq[V], Option[S],前者是每個key新增的值的集合,後者是目前儲存的狀态,
//建立一個變量,用于記錄單詞出現次數
var newValue=state.getOrElse(0) //getOrElse相當于if....else.....
for(value <- values){
newValue +=value //将單詞出現次數累計相加
}
Option(newValue)
})
result.print()
ssc.start()
ssc.awaitTermination()
}
}