天天看點

Spark Streaming實時流處理項目實戰(九)Spark Streaming進階帶狀态的算子

Spark Streaming進階

  • 帶狀态的算子

帶狀态的算子

updateStateByKey算子的使用

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Created by llp on 2021/4/27.
 */


object StatefulWordCount2 {

  def main(args: Array[String]): Unit = {
    
    // 1.建立StreamingContext
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(5))
    
    // 2.建立checkpoint
    // 如果要使用updateStateByKey算子,就必須設定一個checkpoint目錄,開啟checkpoint機制
    // 這樣的話才能把每個key對應的state除了在記憶體中有,那麼是不是也要checkpoint一份
    // 因為你要長期儲存一份key的state的話,那麼spark streaming是要求必須用checkpoint的,以便于在
    // 記憶體資料丢失的時候,可以從checkpoint中恢複資料
    // 開啟checkpoint機制,很簡單,隻要調用jssc的checkpoint()方法,設定一個hdfs目錄即可
    // https://blog.csdn.net/erfucun/article/details/52278729
    ssc.checkpoint("/opt/data")

    // 實作WordCount邏輯
    val lines = ssc.socketTextStream("hadoop2", 9999)
    //val lines = ssc.textFileStream("/opt/data")
    
    val result = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey((values:Seq[Int],state:Option[Int])=>{
      //更新函數兩個參數Seq[V], Option[S],前者是每個key新增的值的集合,後者是目前儲存的狀态,
      //建立一個變量,用于記錄單詞出現次數
      var newValue=state.getOrElse(0) //getOrElse相當于if....else.....
      for(value <- values){
        newValue +=value //将單詞出現次數累計相加
      }
      Option(newValue)
    })
    result.print()

    ssc.start()
    ssc.awaitTermination()
  }

}

           

繼續閱讀