文章目录
-
-
- KeyBy
- Reduce
-
- flink保存累计值原理
- Split 和Select
-
- split
- Select
-
- 需求:将kafka中数据根据某属性分割开,分成两个流
- Connect和 CoMap
-
- Connect
- CoMap,CoFlatMap
- Union
-
- Connect与 Union 区别
-
常见的map.flatMap,filter类比spark
DataStream → KeyedStream:输入必须是Tuple类型(一般通过map转换),逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。
KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
//求各个渠道的累计个数
val startUplogDstream: DataStream[StartUpLog] = dstream.map{ JSON.parseObject(_,classOf[StartUpLog])}
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
//reduce //sum
keyedStream.reduce{ (ch1,ch2)=>
(ch1._1,ch1._2+ch2._2)
} .print().setParallelism(1)
类比可知,spark的reduceByKey == keyBy+Reduce
flink是一种有状态的流计算框架
- operator state : 主要是保存数据在流程中的处理状态,用于确保语义的exactly-once
- keyed state : 主要保存数据在计算过程中的累计值
这两种状态都是通过checkpoint机制保存在StateBackend中,StateBackend可以选择保存在内存中(默认使用)或者保存在磁盘文件中。

DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。
SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。
package kafka
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object ConsumerApp {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaConsumer = KafKaUtil.getConsumer("test")
//import org.apache.flink.api.scala._ 这里要加入隐式转换
val dstream = environment.addSource(kafkaConsumer)
// dstream.print()
val logStream = dstream.split {
log =>
var flags: List[String] = null
if ("Apple".equals(log)) {
flags = List(log)
} else {
flags = List("Android")
}
flags
}
val apple = logStream.select("Apple")
apple.print("apple:").setParallelism(1)
val android = logStream.select("Android")
android.print("Android:").setParallelism(1)
environment.execute()
}
}
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
这两个不是算子,只是类比 connect + map / flatMap
ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。
val connStream = androidStream.connect(appleStream)
val allStream = connStream.map(
(log1: String) => log1 + "1",
(log2: String) => log2 + "2"
)
allStream.print("连接流:")
val unionStream = appleStream.union(androidStream)
unionStream.print("union:")
environment.execute()