天天看點

Spark Streaming 中 updateStateByKey 模式 代碼實作示範(三)

1.問題

  • 在上面的那個案例中存在這樣一個問題:每個批次的單詞次數都被正确的統計出來,但是結果不能累加!

       如果需要累加需要使用updateStateByKey(func)來更新狀态.

  • 根據于key的前置狀态和key的新值,對key進行更新,傳回一個新狀态的DStream

2.準備

●首先在linux伺服器上安裝nc工具(nc指令是netcat指令的簡稱,原本是用來設定路由器.,我們可以利用它向某個端口發送資料)

yum install -y nc

●啟動一個服務端并開發9999端口,等一下往這個端口發資料

nc -lk 9999

●發送資料

3.代碼示範

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Author itcast
  * Date 2019/8/8 10:47
  * Desc 示範使用Spark監聽Socket:node-01:9999發送過來的資料,并做WordCount,并做結果的累加
  */
object WordCount2 {
  def main(args: Array[String]): Unit = {
    //1.建立StreamingContext
    //spark.master should be set as local[n], n > 1
    val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc,Seconds(5))//5表示5秒中對資料進行切分形成一個RDD
    //requirement failed: ....Please set it by StreamingContext.checkpoint().
    //注意:我們在下面使用到了updateStateByKey對目前資料和曆史資料進行累加
    //那麼曆史資料存在哪?我們需要給他設定一個checkpoint目錄
    ssc.checkpoint("./wc")//開發中HDFS
    //2.監聽Socket接收資料
    //ReceiverInputDStream就是接收到的所有的資料組成的RDD,封裝成了DStream,接下來對DStream進行操作就是對RDD進行操作
    val dataDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node-01",9999)
    //3.操作資料
    val wordDStream: DStream[String] = dataDStream.flatMap(_.split(" "))
    val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_,1))
    //val wordAndCount: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_)
    //====================使用updateStateByKey對目前資料和曆史資料進行累加====================
    val wordAndCount: DStream[(String, Int)] =wordAndOneDStream.updateStateByKey(updateFunc)
    wordAndCount.print()
    ssc.start()//開啟
    ssc.awaitTermination()//等待優雅停止
  }
  //currentValues:目前批次的value值,如:1,1,1 (以測試資料中的hadoop為例)
  //historyValue:之前累計的曆史值,第一次沒有值是0,第二次是3
  //目标是把目前資料+曆史資料傳回作為新的結果(下次的曆史資料)
  def updateFunc(currentValues:Seq[Int], historyValue:Option[Int] ):Option[Int] ={
    val result: Int = currentValues.sum + historyValue.getOrElse(0)
    Some(result)
  }
}
           

4.執行

1.先執行nc -lk 9999

2.然後執行代碼

3.不斷的在1中輸入不同的單詞

hadoop spark sqoop hadoop spark hive hadoop

4.觀察IDEA控制台輸出

sparkStreaming每隔5s計算一次目前5s内的資料,然後将每個批次的資料輸出

繼續閱讀