1.問題
- 在上面的那個案例中存在這樣一個問題:每個批次的單詞次數都被正确的統計出來,但是結果不能累加!
如果需要累加需要使用updateStateByKey(func)來更新狀态.
- 根據于key的前置狀态和key的新值,對key進行更新,傳回一個新狀态的DStream
2.準備
●首先在linux伺服器上安裝nc工具(nc指令是netcat指令的簡稱,原本是用來設定路由器.,我們可以利用它向某個端口發送資料)
yum install -y nc
●啟動一個服務端并開發9999端口,等一下往這個端口發資料
nc -lk 9999
●發送資料
3.代碼示範
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author itcast
* Date 2019/8/8 10:47
* Desc 示範使用Spark監聽Socket:node-01:9999發送過來的資料,并做WordCount,并做結果的累加
*/
object WordCount2 {
def main(args: Array[String]): Unit = {
//1.建立StreamingContext
//spark.master should be set as local[n], n > 1
val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))//5表示5秒中對資料進行切分形成一個RDD
//requirement failed: ....Please set it by StreamingContext.checkpoint().
//注意:我們在下面使用到了updateStateByKey對目前資料和曆史資料進行累加
//那麼曆史資料存在哪?我們需要給他設定一個checkpoint目錄
ssc.checkpoint("./wc")//開發中HDFS
//2.監聽Socket接收資料
//ReceiverInputDStream就是接收到的所有的資料組成的RDD,封裝成了DStream,接下來對DStream進行操作就是對RDD進行操作
val dataDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node-01",9999)
//3.操作資料
val wordDStream: DStream[String] = dataDStream.flatMap(_.split(" "))
val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_,1))
//val wordAndCount: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_)
//====================使用updateStateByKey對目前資料和曆史資料進行累加====================
val wordAndCount: DStream[(String, Int)] =wordAndOneDStream.updateStateByKey(updateFunc)
wordAndCount.print()
ssc.start()//開啟
ssc.awaitTermination()//等待優雅停止
}
//currentValues:目前批次的value值,如:1,1,1 (以測試資料中的hadoop為例)
//historyValue:之前累計的曆史值,第一次沒有值是0,第二次是3
//目标是把目前資料+曆史資料傳回作為新的結果(下次的曆史資料)
def updateFunc(currentValues:Seq[Int], historyValue:Option[Int] ):Option[Int] ={
val result: Int = currentValues.sum + historyValue.getOrElse(0)
Some(result)
}
}
4.執行
1.先執行nc -lk 9999
2.然後執行代碼
3.不斷的在1中輸入不同的單詞
hadoop spark sqoop hadoop spark hive hadoop
4.觀察IDEA控制台輸出
sparkStreaming每隔5s計算一次目前5s内的資料,然後将每個批次的資料輸出