window操作就是窗口函数。Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。
案例演示
以nc作为源头进行测试
nc -lk mypc01 10087
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}
object WindowDemo1 extends App {
private val conf = new SparkConf().setAppName("test").setMaster("local[*]")
private val duration: Duration = Seconds(10)
//构建StreamingContext
private val ssc: StreamingContext = new StreamingContext(conf, duration)
//以socket作为源
private val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("mypc01", 10087)
reduceByKeyAndWindowDemo().print()
ssc.start()
ssc.awaitTermination()
def reduceByKeyAndWindowDemo() = {
val value: DStream[(String, Int)] = dstream.map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(30), Seconds(20))
val value2: DStream[(String, Int)] = value.transform((rdd: RDD[(String, Int)]) => {
//降序排序并取前三
val tuples: Array[(String, Int)] = rdd.sortBy((_._2), ascending = false).take(3)
//Array转为RDD,因为transform要返回一个RDD
val value1: RDD[(String, Int)] = ssc.sparkContext.makeRDD(tuples)
value1
})
value2
}
}
方法解析
通过在此DStream的滑动窗口上应用reduceByKey来返回新的DStream。 与DStream.reduceByKey()类似,但将其应用于滑动窗口。 新的DStream生成与该DStream具有相同间隔的RDD。 哈希分区用于生成具有Spark默认分区数的RDD。
参数:
reduceFunc –关联和交换的reduce函数
windowDuration –窗口的宽度; 必须是此DStream批处理间隔的倍数