天天看点

SparkStreaming窗口入门

window操作就是窗口函数。Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

SparkStreaming窗口入门

案例演示

以nc作为源头进行测试

nc -lk mypc01 10087
           
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}

object WindowDemo1 extends App {
  private val conf = new SparkConf().setAppName("test").setMaster("local[*]")
  private val duration: Duration = Seconds(10)
  //构建StreamingContext
  private val ssc: StreamingContext = new StreamingContext(conf, duration)
  //以socket作为源
  private val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("mypc01", 10087)
  reduceByKeyAndWindowDemo().print()
  ssc.start()
  ssc.awaitTermination()

  def reduceByKeyAndWindowDemo() = {
    val value: DStream[(String, Int)] = dstream.map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(30), Seconds(20))
    val value2: DStream[(String, Int)] = value.transform((rdd: RDD[(String, Int)]) => {
    //降序排序并取前三
      val tuples: Array[(String, Int)] = rdd.sortBy((_._2), ascending = false).take(3)
      //Array转为RDD,因为transform要返回一个RDD
      val value1: RDD[(String, Int)] = ssc.sparkContext.makeRDD(tuples)
      value1
    })
    value2
  }
}
           

方法解析

通过在此DStream的滑动窗口上应用reduceByKey来返回新的DStream。 与DStream.reduceByKey()类似,但将其应用于滑动窗口。 新的DStream生成与该DStream具有相同间隔的RDD。 哈希分区用于生成具有Spark默认分区数的RDD。

参数:

reduceFunc –关联和交换的reduce函数

windowDuration –窗口的宽度; 必须是此DStream批处理间隔的倍数

继续阅读