天天看点

SparkStreaming(17):updateStateByKey算子,保留上一次计算结果

1.实现功能

如果SparkStreaming程序断掉,重新启动,可以读取断掉之前的结果。通过,使用SparkStreaming的HA:checkpoints。

【参考:kafka(十四):SparkStreaming和Kafka接口的HA:checkpoints】

2.代码

package _0809kafka

//import com.beifeng.util.SparkUtil
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
 *
 * 之前做的计算当中,当前批次的计算值不会累加到下一个批次
 *
 * 当前批次的值计算完之后,存到外部存储系统中
 * 下一个批次计算完值之后,在取出上一个批次计算出来的值,
 * 做相加,更新会原位置上
 *
 * checkpoint会保留上一个程序的ssc的状态和UpdateStateByKey的结果
 * 但是构造ssc的时候,必须按照规矩写,否则就读不到UpdateStateByKey上一次的结果
 */
object UpdateStateByKeyAPI_1020HA {
  def main(args: Array[String]) {
    //使用checkpoint来存储批次的数据
    //1、创建sparkConf
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("UpdateStateByKeyAPI")
      .setMaster("local[2]")
    //2、创建sparkContext
    val sc = new SparkContext(sparkConf)

//    val path = s"file:///E:\\workspace\\SparkPro\\checkpoint\\streaming_05"
    val path = s"file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\streaming_07"

    def creatingFunc():StreamingContext ={
      val ssc = new StreamingContext(sc,Seconds(10))
      ssc.checkpoint(path)
      val socketDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop",9999)

      //api updateStateByKey
      val resultDStream: DStream[(String, Long)] = socketDStream.mapPartitions(iter =>{
        //对于当前批次的值做数据转换
        iter.flatMap(_.split(" "))
          .filter(_.nonEmpty)
          .map(word => (word,1))
      })
        //对于当前批次的值,做累加(aggr聚合)操作
        .reduceByKey(_ + _)
        //对于value的操作,相同key怎么处理对应的value
        .updateStateByKey((seq: Seq[Int],state: Option[Long])=>{
        //当前批次的相同key的value的聚合值
        val sum = seq.sum
        val preState= state.getOrElse(0L)
        /**
         * if(sum + preState > 1000){
         * Some(sum + preState)
         * }else{
         * //清空当前key的value值
         * None
         * }
         */
        Some(sum + preState)
      })

      resultDStream.foreachRDD((rdd,time) =>{
        println(s"----------------当前时间为:${time}----------------")
        //比如说:某些key不打印,某些值过于小也可以不打印,或者打印排序后的前5
        rdd.filter(t =>{
          t._2 > 100
        }).foreach(println)
      })
      ssc
    }

    val ssc = StreamingContext.getActiveOrCreate(path,creatingFunc)


    ssc.start()
    ssc.awaitTermination()


  }
}
           

3.测试

(1)打开nc

            nc -lt 9999

(2)运行程序

(3)结果:

----------------当前时间为:1540004570000 ms----------------
(hadoop,212)
(ccs,159)
----------------当前时间为:1540004580000 ms----------------
[Stage 9:=================================================>         (5 + 1) / 6]
(hadoop,360)
(ccs,270)
           

(测试成功~)

继续阅读