天天看点

SparkStreaming updateStateByKey 使用

updateStateByKey算子经常在实时计算时使用,最常见的就是wordCount类型的统计需求,那么这里使用官网并结合自己一些网上看的一些例子写的demo,如下:

官方: updateStateByKey允许你在持续更新信息的过程中随意获取状态。想要使用这个输入流,你需要以下两步: 1 定义状态--状态可以是任意的数据类型 2 定义状态更新函数--指定一个如何更新状态的函数,该函数从输入流中获取前一个状态和新的值

在每一批次中,spark会对所有已经存在的keys使用状态更新函数,而不会在意批次中是否有数据。如果更新函数返回None,那么key-value对将会被擦除。 以下是个人做的例子,实时计算word count: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object Test002 { def main(args: Array[String]): Unit = { System.setProperty("hadoop.home.dir", "D:\\workSofts\\hadoop-common-2.2.0-bin-master") val updateFunc = (values: Seq[Int], state: Option[Int]) => { val sum = values.sum val pre = state.getOrElse(0) Some(sum + pre) }

val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) }

val sparkConf = new SparkConf().setAppName("test").setMaster("local[3]") val ssc = new StreamingContext(sparkConf, Seconds(3))

ssc.checkpoint("D:/ScalaWorkSpace/checkpoint") val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) val lines = ssc.socketTextStream("115.28.68.26", 9999) val words = lines.flatMap(_.split(" ")) val wordCount = words.map(s => (s, 1)) val state = wordCount.updateStateByKey[Int](newUpdateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD) state.print() ssc.start() ssc.awaitTermination() }

}

详细的不做解释,逻辑也很简单,熟悉这个function的形式就好。

继续阅读