转换操作
1.无状态转换
- map(func)
- flatMap(func)
- fileter(func)
- repartition(numPartitionts)
- union(otherStream)
- count()
- reduce(func)
- count()
- join(otherStream,[numTasks])
- cogroup(otherStream,[numTasks])
- transform(func)
- countByValue(func)
- reduceByKey(func,[numTasks])
- transform(func)
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
如:
- stream:DStream[(String,String)]
下面两种操作等价:
valueStream:DStream[String] = stream.map(_._2)
valueStream:DStream[String] = steam.transform(rdd => rdd.map(_._2))
2.有状态转换
def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)]
- 状态存储在checkpoint中,类似于一个HashMap,key就是KV结构的key。
- updateFunc中Seq[V]就是RDD中所有value的集合,第一个Option[s]是上一次的状态,第二个Option[s]是产生的新的状态。
package com.dengdan
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StatefulWordcount extends App {
val sparkConf = new SparkConf().setAppName("stateful").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
//设置checkpoint文件保存路径
ssc.sparkContext.setCheckpointDir("./checkpoint")
val line = ssc.socketTextStream("master", 9999)
val words = line.flatMap(_.split(" "))
val word2Count = words.map((_, 1))
//def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
//state 是对应key的 1+1+...+1
val state = word2Count.updateStateByKey[(Int)] { (values: Seq[Int], state: Option[Int]) =>
state match {
case None => Some(values.sum)
case Some(pre) => Some(values.sum + pre)
}
}
state.print()
ssc.start()
ssc.awaitTermination()
}
3.窗口函数
窗口大小:RDD的个数。认为是将几个RDD整合在一起。滑动步长:隔多个RDD滑动一次。 认为是窗口RDD的计算频率。如:求一小时内所有用户的访问量。窗口大小和滑动步长都应该是最小时长的整数倍。
在窗口的计算中,会有优化操作,如果累加,那么可以不用再将数据累加一遍,只是需要减去丢失的数据,加上添加的数据。
简单版函数如:
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration): DStream[(K, V)]
reduceFunc 用于将窗口内的所有RDD进行规约;
reduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks])
当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。Note:默认情况下,这个操作使用Spark的默认数量并行任务(本地是2),在集群模式中依据配置属性(
spark.default.parallelism
)来做grouping。可以通过设置可选参数numTasks来设置不同数量的tasks。
优化版函数:
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner,
filterFunc: ((K, V)) => Boolean
): DStream[(K, V)]
reduceFunc 用于将失去的数据“减掉”,invReduceFunc用于将新增的数据“加上”,
window( windowLength,slideInterval)
countByWindow(windowLength,slideInterval)
返回一个滑动窗口计数流中的元素。
reduceByWindow(func,windowLength,slideInterval)
通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流。
DStream输出
1.foreachRDD()
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
行动操作。将DStream的输出,和RDD的输出连接起来。