天天看点

Spark Streaming的DStream转换

目录

​​DStream转换​​

​​1、无状态转换​​

​​2、有状态转换​​

​​2-1、updateStateByKey​​

​​2-2、Window Operations​​

DStream转换

    DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

1、无状态转换

    无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。 注意,针对键值对的 DStream 转化操作(比 如 reduceByKey())要添加import StreamingContext._ 才能在 Scala中使用。

     1、def map[U: ClassTag](mapFunc: T => U): DStream[U]   将源DStream中的每个元素通过一个函数func从而得到新的DStreams。

     2、def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]   和map类似,但是每个输入的项可以被映射为0或更多项。

     3、def filter(filterFunc: T => Boolean): DStream[T]  选择源DStream中函数func判为true的记录作为新DStream

     4、def repartition(numPartitions: Int): DStream[T]   通过创建更多或者更少的partition来改变此DStream的并行级别。

     5、def union(that: DStream[T]): DStream[T]   将一个具有相同slideDuration(时间窗口)新的DStream和当前DStream进行合并,返回新的DStream

     6、def count(): DStream[Long]   统计源DStreams中每个RDD所含元素的个数得到单元素RDD的新DStreams。

     7、def reduce(reduceFunc: (T, T) => T): DStream[T]   

            通过函数func(两个参数一个输出)来整合源DStreams中每个RDD元素得到单元素RDD的DStream。

     8、def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null): DStream[(T, Long)]   对于DStreams中元素类型为K调用此函数,得到包含(K,Long)对的新DStream,其中Long值表明相应的K在源DStream中每个RDD出现的次数。

     9、def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]   

            对(K,V)对的DStream调用此函数,返回同样(K,V)对的新DStream,但是新DStream中的对应V为使用reduce函数整合而来

    10、def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]   两DStream分别为(K,V)和(K,W)对,返回(K,(V,W))对的新DStream。

    11、def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]   

            两DStream分别为(K,V)和(K,W)对,返回(K,(Seq[V],Seq[W])对新DStream

    12、def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]   将RDD到RDD映射的函数func作用于源DStream中每个RDD上得到新DStream。这个可用于在DStream的RDD上做任意操作。注意的是,在这个转换函数里面能够应用所有RDD的转换操作。

2、有状态转换

2-1、updateStateByKey

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]

// Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of each key.Hash partitioning is used to generate the RDDs with Spark's default number of partitions.

    1、S是你需要保存的状态的类型。

    2、updateFunc 是定义了每一批次RDD如何来更新的状态值。Seq[V] 是当前批次相同key的值的集合。Option[S] 是框架自动提供的,上一次保存的状态的值。

    3、updateStateByKey会返回一个新的DStream,该DStream中保存了(Key,State)的序列。

2-2、Window Operations

    Window Operations有点类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。

Spark Streaming的DStream转换

    所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是 StreamContext 的批次间隔的整数倍。窗口时长控制每次计算最近的多少个批次的数据,其实就是最近的 windowDuration/batchInterval 个批次。如果有一个以 10 秒为批次间隔的源 DStream,要创建一个最近 30 秒的时间窗口(即最近 3 个批次),就应当把 windowDuration 设为 30 秒。而滑动步长的默认值与批次间隔相等,用来控制对新的 DStream 进行计算的间隔。如果源 DStream 批次间隔为 10 秒,并且我们只希望每两个批次计算一次窗口结果, 就应该把滑动步长设置为 20 秒。

Spark Streaming的DStream转换

1、def window(windowDuration: Duration, slideDuration: Duration): DStream[T]   

    基于对源DStream窗化的批次进行计算返回一个新的DStream,windowDuration是窗口大小,slideDuration滑动步长。

2、def countByWindow(windowDuration: Duration,slideDuration: Duration): DStream[Long]   注意,返回的是window中记录的条数。

3、def reduceByWindow(reduceFunc: (T, T) => T,windowDuration: Duration,slideDuration: Duration): DStream[T]

     通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流。

4、def reduceByKeyAndWindow(reduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration): DStream[(K, V)]

    通过给定的窗口大小以滑动步长来应用reduceFunc函数,返回DStream[(K, V)], K就是DStream中相应的K,V是window应用了reduce之后产生的最终值。

5、def reduceByKeyAndWindow(reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration = self.slideDuration,numPartitions: Int = ssc.sc.defaultParallelism,filterFunc: ((K, V)) => Boolean = null): DStream[(K, V)]

    这个版本是4版本的优化版本,计算方式不同,采用增量计算的模式,每次计算只是在上一次计算的基础上减去丢失的值,加上增加的值。