天天看点

spark-18.sparkStreaming_3_DStream的转换操作与输出转换操作DStream输出

转换操作

1.无状态转换

  • map(func)
  • flatMap(func)
  • fileter(func)
  • repartition(numPartitionts)
  • union(otherStream)
  • count()
  • reduce(func)
  • count()
  • join(otherStream,[numTasks])
  • cogroup(otherStream,[numTasks])
  • transform(func)
  • countByValue(func)
  • reduceByKey(func,[numTasks])
  • transform(func)
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
           

如:

  • stream:DStream[(String,String)]

下面两种操作等价:

valueStream:DStream[String] = stream.map(_._2)
           
valueStream:DStream[String] = steam.transform(rdd => rdd.map(_._2))
           

2.有状态转换

def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)]
           
  1. 状态存储在checkpoint中,类似于一个HashMap,key就是KV结构的key。
  2. updateFunc中Seq[V]就是RDD中所有value的集合,第一个Option[s]是上一次的状态,第二个Option[s]是产生的新的状态。
package com.dengdan

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StatefulWordcount extends App {
  val sparkConf = new SparkConf().setAppName("stateful").setMaster("local[*]")
  val ssc = new StreamingContext(sparkConf, Seconds(5))
  //设置checkpoint文件保存路径
  ssc.sparkContext.setCheckpointDir("./checkpoint")
  val line = ssc.socketTextStream("master", 9999)
  val words = line.flatMap(_.split(" "))
  val word2Count = words.map((_, 1))
  //def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
  //state 是对应key的 1+1+...+1
  val state = word2Count.updateStateByKey[(Int)] { (values: Seq[Int], state: Option[Int]) =>
    state match {
      case None => Some(values.sum)
      case Some(pre) => Some(values.sum + pre)
    }
  }
  state.print()
  ssc.start()
  ssc.awaitTermination()
}
           

3.窗口函数

窗口大小:RDD的个数。认为是将几个RDD整合在一起。滑动步长:隔多个RDD滑动一次。 认为是窗口RDD的计算频率。如:求一小时内所有用户的访问量。窗口大小和滑动步长都应该是最小时长的整数倍。

spark-18.sparkStreaming_3_DStream的转换操作与输出转换操作DStream输出

在窗口的计算中,会有优化操作,如果累加,那么可以不用再将数据累加一遍,只是需要减去丢失的数据,加上添加的数据。

spark-18.sparkStreaming_3_DStream的转换操作与输出转换操作DStream输出

简单版函数如:

def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration): DStream[(K, V)]
           

reduceFunc 用于将窗口内的所有RDD进行规约;

reduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks])

当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。Note:默认情况下,这个操作使用Spark的默认数量并行任务(本地是2),在集群模式中依据配置属性(

spark.default.parallelism

)来做grouping。可以通过设置可选参数numTasks来设置不同数量的tasks。

优化版函数:

def reduceByKeyAndWindow(
      reduceFunc: (V, V) => V,
      invReduceFunc: (V, V) => V,
      windowDuration: Duration,
      slideDuration: Duration,
      partitioner: Partitioner,
      filterFunc: ((K, V)) => Boolean
    ): DStream[(K, V)]
           

reduceFunc 用于将失去的数据“减掉”,invReduceFunc用于将新增的数据“加上”,

window( windowLength,slideInterval)

countByWindow(windowLength,slideInterval)

返回一个滑动窗口计数流中的元素。

reduceByWindow(func,windowLength,slideInterval)

通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流。

DStream输出

1.foreachRDD()

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit

行动操作。将DStream的输出,和RDD的输出连接起来。

spark-18.sparkStreaming_3_DStream的转换操作与输出转换操作DStream输出

继续阅读