天天看点

4.1、Flink流处理(Stream API)- Operators(操作数据流)DataStream Transformations(流数据转换)

目录

DataStream Transformations(流数据转换)

物理分区

任务链接和资源组

Operators将一个或多个数据流转换为一个新的数据流。程序可以将多个转换组合成复杂的数据流拓扑。

描述基本转换、应用这些转换之后的有效物理分区以及对Flink操作符链接的理解。

DataStream Transformations(流数据转换)

转换方法 描述及代码样例

Map

DataStream → DataStream

获取一个元素并生成一个元素。一个map函数,例子:使输入流的值加倍:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});
    
           

FlatMap

DataStream → DataStream

获取一个元素并生成零个、一个或多个元素。一种将句子分割成单词的平面映射函数:
dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});
    
           

Filter

DataStream → DataStream

为每个元素计算布尔函数,并保留函数返回true的元素。过滤掉零值的过滤器:
dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});
    
           

KeyBy

DataStream → KeyedStream

逻辑上将流划分为不相交的分区。所有具有相同key的记录都被分配到同一个分区。在内部,keyBy()是通过哈希分区实现的。有不同的方法来指定键。

这个转换返回一个KeyedStream,它是使用key状态所必需的。

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
    
           
注意:下面的类型不能作为key:
  1. 没有重写hashCode的POJO对象,依赖于Object.hashCode()实现
  2. 任意类型的数组

Reduce

KeyedStream → DataStream

基于keyBy(),将当前元素与最后一个缩减值组合并返回新值。
keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});
            
           

Fold

KeyedStream → DataStream

基于keyBy(),将当前元素与最后一个折叠值组合并返回新值。

应用于序列(1,2,3,4,5)时,返回序列“start-1”,“start-1-2”,“start-1-2-3”,…

DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });
          
           

Aggregations

KeyedStream → DataStream

基于keyBy(),min和minBy的区别在于,min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy也是如此)。
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
           

Window

KeyedStream → WindowedStream

Windows可以在已经分区的KeyedStreams上定义。Windows根据某些特性(例如,最近5秒内到达的数据)对每个键中的数据进行分组。Window的详解//TODO
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
    
           

WindowAll

DataStream → AllWindowedStream

可以在常规数据流上定义。Windows根据一些特性(例如,最近5秒内到达的数据)对所有流事件进行分组。

注意:在大多案例中,这是一个非并行转换。所有记录将在windowAll操作符的一个任务中收集。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
  
           

Window Apply

WindowedStream → DataStream

AllWindowedStream → DataStream

将一个通用函数应用于整个窗口。下面是一个手动汇总窗口元素的函数。

注意:如果正在使用windowAll转换,则需要使用AllWindowFunction。

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});
    
           

Window Reduce

WindowedStream → DataStream

将reduce函数应用于窗口并返回reduce后的值。
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
});
    
           

Window Fold

WindowedStream → DataStream

将fold函数应用于窗口并返回fold后的值。将示例函数应用于序列(1,2,3,4,5)时,将序列折叠成字符串“start 1-2-3-4-5”:
windowedStream.fold("start", new FoldFunction<Integer, String>() {
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
});
    
           

Aggregations on windows

WindowedStream → DataStream

基于window(),in和minBy的区别在于,min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy也是如此)。
windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");
    
           

Union

DataStream* → DataStream

两个或多个数据流的联合,创建一个包含来自所有流的所有元素的新流。注意:如果将一个数据流与它自己相结合,将得到结果流中的每个元素两次。
dataStream.union(otherStream1, otherStream2, ...);
           

Window Join

DataStream,DataStream → DataStream

连接给定键和公共窗口上的两个数据流。
dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});
    
           

Interval Join

KeyedStream,KeyedStream → DataStream

将两个键流的两个元素e1和e2用一个公共键在给定的时间间隔内连接起来,这样

e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...});
    
           

Window CoGroup

DataStream,DataStream → DataStream

将给定键和公共窗口上的两个数据流组合在一起。
dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});
    
           

Connect

DataStream,DataStream → ConnectedStreams

连接两个保留类型的数据流。允许两个流之间共享状态的连接。
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
    
           

CoMap, CoFlatMap

ConnectedStreams → DataStream

类似于基于connect数据流上map和flatMap
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});
    
           

Split

DataStream → SplitStream

根据某些标准将流分成两个或多个流。
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});
                
           

Select

SplitStream → DataStream

从拆分流中选择一个或多个流。
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
                
           

Iterate

DataStream → IterativeStream → DataStream

通过将一个操作符的输出重定向到前面的某个操作符,在流中创建一个“反馈”循环。这对于定义不断更新模型的算法特别有用。下面的代码从一个流开始,并持续地应用迭代体。大于0的元素被发送回反馈通道,其余的元素被转发到下游。Iterations后续详解//TODO
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});
                
           

Extract Timestamps

DataStream → DataStream

从记录中提取时间戳,以便使用使用事件时间语义的窗口。Event Time后续详解//TODO

Project

DataStream → DataStream

以下转换可用于元组的数据流:

从元组中选择字段的子集

DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);
           

物理分区

Flink还通过以下功能(如果需要的话)对转换后的流分区进行底层控制。

方法 描述和代码样例

Custom partitioning

DataStream → DataStream

使用用户定义的分区程序为每个元素选择目标任务。
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
            
           

Random partitioning

DataStream → DataStream

根据均匀分布随机划分元素。
dataStream.shuffle();
            
           

Rebalancing (Round-robin partitioning)

DataStream → DataStream

分区元素循环,每个分区创建相同的负载。对于存在数据倾斜的性能优化非常有用。
dataStream.rebalance();
           

Rescaling

DataStream → DataStream

将元素(循环)划分为下游操作的子集。如果您想要pipelines,例如,从一个源的每个并行实例分散到几个映射器的子集来分配负载,但又不希望rebalance()导致数据的完全重平衡,那么这是非常有用的。这将只需要本地数据传输,而不是通过网络传输数据,这取决于其他配置值,比如TaskManagers的槽数。

上游操作向其发送元素的下游操作子集取决于上游和下游操作的并行度。例如,如果上游操作具有并行性2,下游操作具有并行性6,那么一个上游操作将把元素分配给三个下游操作,而另一个上游操作将分配给另外三个下游操作。另一方面,如果下游操作的并行度为2,上游操作的并行度为6,则上游操作的并行度为3。

在不同并行度不是彼此的倍数的情况下,一个或多个下游操作与上游操作的输入数量不同。

4.1、Flink流处理(Stream API)- Operators(操作数据流)DataStream Transformations(流数据转换)
dataStream.rescale();
            
           

Broadcasting

DataStream → DataStream

向每个分区广播元素。
dataStream.broadcast();
           

任务链接和资源组

chaining意味着将它们放在同一个线程中以获得更好的性能。如果可能的话,默认情况下使用Flink的chain操作符(例如,两个后续的映射转换)。如果需要,API可以对链接进行细粒度控制:

如果希望在整个作业中禁用链接,请使用StreamExecutionEnvironment.disableOperatorChaining()。对于更细粒度的控制,可以使用以下函数。注意,这些函数只能在DataStream转换之后使用,因为它们引用的是前面的转换。例如,可以使用someStream.map(…). startnewchain(),但是不能使用someStream.startNewChain()。

resource group是Flink中的一个插槽,请参见插槽//TODO。如果需要,可以手动将operators隔离在单独的槽中。

方法 描述和代码样例
Start new chain 从这个操作符开始,创建一个新的chain。这两个映射器将被链接,过滤器将不会链接到第一个映射器。
someStream.filter(...).map(...).startNewChain().map(...);
           
Disable chaining
Do not chain the map operator 
           
Do not chain the map operator 设置操作的槽共享组。Flink将把具有相同槽共享组的operator放入相同槽中,同时将没有槽共享组的operator保留在其他槽中。这可以用来隔离插槽。如果所有输入operator都位于同一个槽共享组中,则槽共享组将从输入operator继承。默认槽共享组的名称为“default”,可以通过调用slotSharingGroup(“default”)显式地将操作放入该组。
someStream.filter(...).slotSharingGroup("name");
           

继续阅读