天天看点

(10) flink中的算子 流的分割与join

文章目录

      • KeyBy
      • Reduce
        • flink保存累计值原理
      • Split 和Select
        • split
        • Select
          • 需求:将kafka中数据根据某属性分割开,分成两个流
      • Connect和 CoMap
        • Connect
        • CoMap,CoFlatMap
        • Union
          • Connect与 Union 区别

常见的map.flatMap,filter类比spark

DataStream → KeyedStream:输入必须是Tuple类型(一般通过map转换),逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

//求各个渠道的累计个数
val startUplogDstream: DataStream[StartUpLog] = dstream.map{ JSON.parseObject(_,classOf[StartUpLog])}
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
//reduce //sum
keyedStream.reduce{  (ch1,ch2)=>
  (ch1._1,ch1._2+ch2._2)
} .print().setParallelism(1)

           

类比可知,spark的reduceByKey == keyBy+Reduce

flink是一种有状态的流计算框架

  1. operator state : 主要是保存数据在流程中的处理状态,用于确保语义的exactly-once
  2. keyed state : 主要保存数据在计算过程中的累计值

这两种状态都是通过checkpoint机制保存在StateBackend中,StateBackend可以选择保存在内存中(默认使用)或者保存在磁盘文件中。

(10) flink中的算子 流的分割与join

DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。

(10) flink中的算子 流的分割与join

SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。

package kafka

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

object ConsumerApp {
  def main(args: Array[String]): Unit = {
    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    val kafkaConsumer = KafKaUtil.getConsumer("test")

    //import org.apache.flink.api.scala._ 这里要加入隐式转换
    val dstream = environment.addSource(kafkaConsumer)

//    dstream.print()
    val logStream = dstream.split {
      log =>
        var flags: List[String] = null
        if ("Apple".equals(log)) {
          flags = List(log)
        } else {
          flags = List("Android")
        }
        flags
    }
    val apple = logStream.select("Apple")

    apple.print("apple:").setParallelism(1)

    val android = logStream.select("Android")
    android.print("Android:").setParallelism(1)
    environment.execute()
  }
}

           

(10) flink中的算子 流的分割与join

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

这两个不是算子,只是类比 connect + map / flatMap

(10) flink中的算子 流的分割与join

ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。

val connStream = androidStream.connect(appleStream)
    val allStream = connStream.map(
      (log1: String) => log1 + "1",
      (log2: String) => log2 + "2"
    )
    allStream.print("连接流:")
           

val unionStream = appleStream.union(androidStream)
    unionStream.print("union:")

    environment.execute()