天天看點

Spark綜合學習筆記(八)SparkStreaming案例2 狀态管理

學習緻謝:

​​https://www.bilibili.com/video/BV1Xz4y1m7cv?p=42​​

需求:

對從Socket接收的資料做WordCoun并要求能夠和曆史資料進行累加!

如:先發了一個spark,得到spark,1然後不管隔多久再發一個spark,得到spark,2也就是說要對資料的曆史狀态進行維護!

Spark綜合學習筆記(八)SparkStreaming案例2 狀态管理

實作思路:

一、updataStateByKey

先設定checkpoint存儲狀态status,使用updataStateByKey實作狀态管理的單詞統計,需要自己寫一個updateFunc方法,如下:

Spark綜合學習筆記(八)SparkStreaming案例2 狀态管理

代碼實作

package streaming

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

/**
  * Author itcast
  * Desc 使用SparkStreaming接受node1"9999的資料并做WordCount+實作狀态管理
  * 如輸入 spark hadoop得到(spark,1)(hadoop,1)
  * 再下一個批次輸入spark,得到spark,2
  */
object Status {
  def main(args: Array[String]): Unit = {
    //TODO 0.準備環境
    val conf:SparkConf=new SparkConf().setMaster("spark").setMaster("local[*]")
    val sc:   SparkContext=new SparkContext(conf)
    sc.setLogLevel("WARN")
    //the time interval at which streaming data will be dicided into batches
    val ssc:StreamingContext= new StreamingContext(sc,Seconds(5))

    //The checkpoint directory has not been set. PLease set it by streamingContext.checkpoint().
    //注意:state存在checkpoint中
    ssc.checkpoint("./ckp")
    //TODO 1.加載資料
    val lines:ReceiverInputDStream[String]=ssc.socketTextStream("node1",9999)
    //TODO 2.處理資料
    //定義一個函數用來處理狀态:把目前資料和曆史狀态累加
    //currentValues:表示該key(spark)的目前批次的值,如:[1,1]
    //historyValue:表示該key(如spark)的曆史值,第一次是0,後面之後就是之前的累加值,如1
    val updateFunc=(currentValues:Seq[Int],historyValue:Option[Int])=>{
      if(currentValues.size>0){
        val currentResult:Int=currentValues.sum+historyValue.getOrElse(0)
        Some(currentResult)
      }else{
        historyValue
      }
    }
    val resuleDS:DStream[(String,Int)]=lines.flatMap(_.split(" "))
      .map((_,1))
      //updateFunc:(Seq[v],Option[s]) =>option[s]
      .updateStateByKey(updateFunc)
    //TODO 3.輸出結果
    resuleDS.print()
    //TODO 4.啟動并等待結束
    ssc.start()
    ssc.awaitTermination()//注意:流式應用程式啟動之後需要一直運作等待停止、等待到來
    //TODO 5.關閉資源
    ssc.stop(stopSparkContext = true,stopGracefully = true)//優雅關閉
  }
}      

示範:

(1)如圖,不同批次實作了累計

Spark綜合學習筆記(八)SparkStreaming案例2 狀态管理

(2)停掉之後再重新啟動,後之前的資料恢複不了了,就是說曆史狀态維護隻能在目前應用

二、mapWithState

繼續閱讀