天天看点

26.DataStream API之Operators(Overview)

flink 1.9

操作算子介绍:

操作算子operator可以将一个数据流转换成另一个新的数据流,程序可以通过operator将多个转换组合成复杂的数据流拓扑。

本节将描述基本转换、应用这些转换之后的有效物理分区以及对Flink操作算子链接的理解。

DataStream Transformations

Transformation Description

Map

DataStream → DataStream

获取一个元素并生成一个元素。一个map函数,使输入流的值*2:

DataStream<Integer> dataStream = //...

dataStream.map(new MapFunction<Integer, Integer>() {

@Override

public Integer map(Integer value) throws Exception {

return 2 * value;

}

});

FlatMap

DataStream → DataStream

获取一个元素并生成零个、一个或多个元素。将句子分割成单词:

dataStream.flatMap(new FlatMapFunction<String, String>() {

@Override

public void flatMap(String value, Collector<String> out)

throws Exception {

for(String word: value.split(" ")){

out.collect(word);

}

}

});

Filter

DataStream → DataStream

为每个元素计算布尔值的函数,并保留函数返回为true的元素。过滤掉为零的元素的过滤器:

dataStream.filter(new FilterFunction<Integer>() {

@Override

public boolean filter(Integer value) throws Exception {

return value != 0;

}

});

KeyBy

DataStream → KeyedStream

逻辑上将流划分为不相交的分区partitions。所有具有相同key的记录都被分配到同一个分区上。在内部,keyBy()是通过哈希分区实现的。有不同的方法来指定key(specify keys)。这个转换返回一个KeyedStream,它是使用key state所必需的。

dataStream.keyBy("someKey") // Key by field "someKey"

dataStream.keyBy(0) // Key by the first element of a Tuple

Attention 以下情况下,不能作为key进行分组:

1.如果类型是POJO类型,切记不覆盖hashCode()方法,因为依赖于Object.hashCode()的实现。

2.任何类型的数组。

Reduce

KeyedStream → DataStream

key数据流上的“滚动”增量汇总.将当前元素和上一个增量汇总的值相累加. 

A reduce function that creates a stream of partial sums:

keyedStream.reduce(new ReduceFunction<Integer>() {

@Override

public Integer reduce(Integer value1, Integer value2)

throws Exception {

return value1 + value2;

}

});

Fold

KeyedStream → DataStream

具有初始值的key数据流上的“滚动”增量汇总。将当前元素与最后一个增量汇总的值相组合,并发出新值。

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...

DataStream<String> result =

keyedStream.fold("start", new FoldFunction<Integer, String>() {

@Override

public String fold(String current, Integer value) {

return current + "-" + value;

}

});

Aggregations

KeyedStream → DataStream

分组数据流上的滚动聚合操作. min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含最小值的元素(同样原理适用于max和maxBy)。

keyedStream.sum(0);

keyedStream.sum("key");

keyedStream.min(0);

keyedStream.min("key");

keyedStream.max(0);

keyedStream.max("key");

keyedStream.minBy(0);

keyedStream.minBy("key");

keyedStream.maxBy(0);

keyedStream.maxBy("key");

Window

KeyedStream → WindowedStream

Windows可以在已经分区的KeyedStreams上定义。Windows根据某些特性(例如,最近5秒内到达的数据)对每个key中的数据进行分组。有关windows的完整描述,请参见windows

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

WindowAll

DataStream → AllWindowedStream

Windows可以在常规数据流上定义。Windows根据一些特性(例如,最近5秒内到达的数据)对所有流事件进行分组。有关windows的完整描述,请参见windows。

注意: 这个操作在许多情况下非并行操作. 所有的记录都会聚集到一个windowAll操作的任务中。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

Window Apply

WindowedStream → DataStream

AllWindowedStream → DataStream

将一个通用函数作为一个整体传给window. 下面是一个手动对window中的元素进行求和的函数.

Note: 如果正在使用windowAll转换,则需要使用AllWindowFunction。

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {

public void apply (Tuple tuple,

Window window,

Iterable<Tuple2<String, Integer>> values,

Collector<Integer> out) throws Exception {

int sum = 0;

for (value t: values) {

sum += t.f1;

}

out.collect (new Integer(sum));

}

});

// applying an AllWindowFunction on non-keyed window stream

allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {

public void apply (Window window,

Iterable<Tuple2<String, Integer>> values,

Collector<Integer> out) throws Exception {

int sum = 0;

for (value t: values) {

sum += t.f1;

}

out.collect (new Integer(sum));

}

});

Window Reduce

WindowedStream → DataStream

 给window赋一个reduce功能的函数,并返回一个规约的结果.

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {

public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {

return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);

}

});

Window Fold

WindowedStream → DataStream

给窗口赋一个fold功能的函数,并返回一个fold后的结果. 对于这个函数,当我们传入 (1,2,3,4,5)这个序列时, 将会得到如下的结果: "start-1-2-3-4-5":

windowedStream.fold("start", new FoldFunction<Integer, String>() {

public String fold(String current, Integer value) {

return current + "-" + value;

}

});

Aggregations on windows

WindowedStream → DataStream

对window的元素做聚合操作. min和 minBy的区别是min返回的是最小值,而minBy返回的是包含最小值字段的元素。(同样的原理适用于 max 和 maxBy).

windowedStream.sum(0);

windowedStream.sum("key");

windowedStream.min(0);

windowedStream.min("key");

windowedStream.max(0);

windowedStream.max("key");

windowedStream.minBy(0);

windowedStream.minBy("key");

windowedStream.maxBy(0);

windowedStream.maxBy("key");

Union

DataStream* → DataStream

对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream.注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次.

dataStream.union(otherStream1, otherStream2, ...);

Window Join

DataStream,DataStream → DataStream

根据一个给定的key和window对两个DataStream做join操作.

dataStream.join(otherStream)

.where(<key selector>).equalTo(<key selector>)

.window(TumblingEventTimeWindows.of(Time.seconds(3)))

.apply (new JoinFunction () {...});

Interval Join

KeyedStream,KeyedStream → DataStream

在给定的时间间隔内,将两个key流的两个元素e1和e2用一个公共键连接起来,so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

// this will join the two streams so that

// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2

keyedStream.intervalJoin(otherKeyedStream)

.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound

.upperBoundExclusive(true) // optional

.lowerBoundExclusive(true) // optional

.process(new ProcessJoinFunction() {...});

注意:IntervalJoinFunction官网写的这个类没找到。

Window CoGroup

DataStream,DataStream → DataStream

根据一个给定的key和window对两个DataStream做Cogroups操作.

dataStream.coGroup(otherStream)

.where(0).equalTo(1)

.window(TumblingEventTimeWindows.of(Time.seconds(3)))

.apply (new CoGroupFunction () {...});

Connect

DataStream,DataStream → ConnectedStreams

连接两个保持他们类型的数据流。允许两个流之间共享状态的连接。左右;流式一对一的连接。

DataStream<Integer> someStream = //...

DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

CoMap, CoFlatMap

ConnectedStreams → DataStream

作用于connected 数据流上,功能与map和flatMap一样。

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {

@Override

public Boolean map1(Integer value) {

return true;

}

@Override

public Boolean map2(String value) {

return false;

}

});

connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

@Override

public void flatMap1(Integer value, Collector<String> out) {

out.collect(value.toString());

}

@Override

public void flatMap2(String value, Collector<String> out) {

for (String word: value.split(" ")) {

out.collect(word);

}

}

});

Split

DataStream → SplitStream

根据某些特征把一个DataStream拆分成两个或者多个DataStream.

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {

@Override

public Iterable<String> select(Integer value) {

List<String> output = new ArrayList<String>();

if (value % 2 == 0) {

output.add("even");

}

else {

output.add("odd");

}

return output;

}

});

Select

SplitStream → DataStream

从一个SplitStream中获取一个或者多个DataStream.

SplitStream<Integer> split;

DataStream<Integer> even = split.select("even");

DataStream<Integer> odd = split.select("odd");

DataStream<Integer> all = split.select("even","odd");

Iterate

DataStream →

IterativeStream → DataStream

 通过将一个操作算子的输出重定向到前面的某个操作算子,在流中创建一个“反馈(feedback)”循环。这对于定义不断更新模型的算法特别有用。下面的代码从一个流开始,并持续地应用迭代体。大于0的元素被发送回反馈(feedback)通道,其余的元素被转发到下游. 请参考iterations来获取迭代的完整描述.

IterativeStream<Long> iteration = initialStream.iterate();

DataStream<Long> iterationBody = iteration.map ();

DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){

@Override

public boolean filter(Long value) throws Exception {

return value > 0;

}

});

iteration.closeWith(feedback);

DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){

@Override

public boolean filter(Long value) throws Exception {

return value <= 0;

}

});

Extract Timestamps

DataStream → DataStream

提取记录中的时间戳来跟需要事件时间的window一起发挥作用. 更多详情参考 Event Time.

stream.assignTimestamps (new TimeStampExtractor() {...});(已废弃)

stream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks())

min()和minBy()区别举例:

数据源:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class DataSource1 extends RichParallelSourceFunction<Tuple5<String, Integer, Long, String, String>> {
   private volatile boolean running = true;

   @Override
   public void run(SourceContext<Tuple5<String, Integer, Long, String, String>> ctx) throws InterruptedException {

      Tuple5[] elements1 = new Tuple5[]{

         Tuple5.of("a", 2, 1551169050002L, "w", "e"),
         Tuple5.of("a", 1, 1551169050002L, "2", "1"),
         Tuple5.of("a", 1, 1551169050001L, "3", "1"),
         Tuple5.of("aa", 33, 1551169064000L, "4", "1"),
         Tuple5.of("a", 2, 1551169054000L, "5", "1"),
         Tuple5.of("a", 1, 1551169050003L, "6", "1"),
         Tuple5.of("a", 3, 1551169064000L, "7", "1"),
         Tuple5.of("b", 5, 1551169100000L, "8", "1"),
         Tuple5.of("a", 4, 1551169079000L, "9", "1"),
         Tuple5.of("aa", 44, 1551169079000L, "10", "1"),
         Tuple5.of("b", 6, 1551169108000L, "11", "1")
      };

      int count = 0;
      while (running && count < elements1.length) {
         ctx.collect(new Tuple5<>((String)elements1[count].f0,(Integer)elements1[count].f1,(Long)elements1[count].f2,
            (String)elements1[count].f3,(String)elements1[count].f4));
         count++;
         Thread.sleep(300);
      }
   }

   @Override
   public void cancel() {
      running = false;
   }
}
           

 主函数:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

public class KeyDemo1 {
   public static void main(String[] args) throws Exception {

      Long delay = 5000L;
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.setParallelism(2);

      // 设置数据源
      DataStream<Tuple5<String, Integer, Long, String, String>> source = env.addSource(new DataSource1()).setParallelism(1).name("Demo Source");

      // 设置水位线
      DataStream<Tuple5<String, Integer, Long, String, String>> stream = source.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple5<String, Integer, Long, String, String>>(org.apache.flink.streaming.api.windowing.time.Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple5<String, Integer, Long, String, String> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      );


      stream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(10))).minBy(1).print();
      env.execute("TimeWindowDemo");
   }
}
           

调用minBy()输出:

26.DataStream API之Operators(Overview)

调用min()输出:

26.DataStream API之Operators(Overview)

总结:

min 和 sql 上的 MIN 是一样的作用,取frequency的最小值。 minBy 是取 frequency 最小的一整行。

join()方法Demo:

数据源1:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class DataSource1 extends RichParallelSourceFunction<Tuple5<String, Integer, Long, String, String>> {
   private volatile boolean running = true;

   @Override
   public void run(SourceContext<Tuple5<String, Integer, Long, String, String>> ctx) throws InterruptedException {
      Tuple5[] elements1 = new Tuple5[]{
         Tuple5.of("a", 2, 1551169050002L, "w", "e"),
         Tuple5.of("a", 1, 1551169050002L, "2", "1"),
         Tuple5.of("a", 1, 1551169050001L, "3", "1"),
         Tuple5.of("aa", 33, 1551169064000L, "4", "1"),
         Tuple5.of("a", 2, 1551169054000L, "5", "1"),
         Tuple5.of("a", 1, 1551169050003L, "6", "1"),
         Tuple5.of("a", 3, 1551169064000L, "7", "1"),
         Tuple5.of("b", 5, 1551169100000L, "8", "1"),
         Tuple5.of("a", 4, 1551169079000L, "9", "1"),
         Tuple5.of("aa", 44, 1551169079000L, "10", "1"),
         Tuple5.of("b", 6, 1551169108000L, "11", "1")
      };

      int count = 0;
      while (running && count < elements1.length) {
         ctx.collect(new Tuple5<>((String)elements1[count].f0,(Integer)elements1[count].f1,(Long)elements1[count].f2,
            (String)elements1[count].f3,(String)elements1[count].f4));
         count++;
         Thread.sleep(300);
      }
   }

   @Override
   public void cancel() {
      running = false;
   }
}
           

数据源2:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class DataSource2 extends RichParallelSourceFunction<Tuple3<String, Integer, Long>> {
   private volatile boolean running = true;

   @Override
   public void run(SourceContext<Tuple3<String, Integer, Long>> ctx) throws InterruptedException {

      Tuple3[] elements1 = new Tuple3[]{
         Tuple3.of("a", 1, 1551169050000L),
         Tuple3.of("aa", 33, 1551169064000L),
         Tuple3.of("a", 2, 1551169054000L),
         Tuple3.of("a", 3, 1551169064000L),
         Tuple3.of("b", 5, 1551169100000L),
         Tuple3.of("a", 4, 1551169079000L),
         Tuple3.of("aa1", 44, 1551169079000L),
         Tuple3.of("b1", 6, 1551169108000L)
      };

      int count = 0;
      while (running && count < elements1.length) {
         ctx.collect(new Tuple3<>((String)elements1[count].f0,(Integer)elements1[count].f1,(Long)elements1[count].f2));
         count++;
         Thread.sleep(300);
      }
   }

   @Override
   public void cancel() {
      running = false;
   }
}
           

 主函数:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.JoinedStreams;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.text.SimpleDateFormat;

public class JoinTest {

   public static void main(String[] args) throws Exception{
      Long delay = 5000L;
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.setParallelism(2);

      // 设置数据源
      DataStream<Tuple5<String, Integer, Long, String, String>> source = env.addSource(new DataSource1()).setParallelism(1).name("Demo Source");
      DataStream<Tuple3<String, Integer, Long>> source1 = env.addSource(new DataSource2()).setParallelism(1).name("Demo Source1");


      // 设置水位线
      DataStream<Tuple5<String, Integer, Long, String, String>> stream = source.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple5<String, Integer, Long, String, String>>(org.apache.flink.streaming.api.windowing.time.Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple5<String, Integer, Long, String, String> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      );
      // 设置水位线
      DataStream<Tuple3<String, Integer, Long>> stream1 = source1.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer, Long>>(org.apache.flink.streaming.api.windowing.time.Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple3<String, Integer, Long> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      );

//    stream.join(stream1).where((KeySelector<Tuple5<String, Integer, Long, String, String>, String>) value -> {int a;return "ds";})
//       .equalTo((KeySelector<Tuple3<String,Integer,Long>,String>) value -> null);

      stream.join(stream1).where(value-> value.f0).equalTo(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .apply((first, second) -> first.toString() +"|"+ second.toString()).print();

//    stream.join(stream1).where(value-> value.f0).equalTo(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(10)))
//    .apply((Tuple5<String, Integer, Long, String, String> first, Tuple3<String, Integer, Long> second) -> {return null;});

//    stream.join(stream1).where(new KeySelector<Tuple5<String,Integer,Long,String,String>, String>() {
//       @Override
//       public String getKey(Tuple5<String, Integer, Long, String, String> value) throws Exception {
//          return null;
//       }
//    });



      env.execute("joinWindowDemo");
   }
}
           
26.DataStream API之Operators(Overview)

Interval Join方法 demo

数据源:

Tuple3[] elements1 = new Tuple3[]{
   Tuple3.of("a", 1, 1551169050000L),
   Tuple3.of("aa", 33, 1551169064000L),
   Tuple3.of("a", 2, 1551169054000L),
   Tuple3.of("a", 3, 1551169064000L),
   Tuple3.of("b", 5, 1551169100000L),
   Tuple3.of("a", 4, 1551169068999L),
   Tuple3.of("aa1", 44, 1551169079000L),
   Tuple3.of("b1", 6, 1551169108000L)
};

Tuple5[] elements1 = new Tuple5[]{

   Tuple5.of("a", 2, 1551169050002L, "w", "e"),
   Tuple5.of("a", 1, 1551169050002L, "2", "1"),
   Tuple5.of("a", 1, 1551169050001L, "3", "1"),
   Tuple5.of("aa", 33, 1551169064000L, "4", "1"),
   Tuple5.of("a", 2, 1551169054000L, "5", "1"),
   Tuple5.of("a", 1, 1551169050003L, "6", "1"),
   Tuple5.of("a", 3, 1551169064000L, "7", "1"),
   Tuple5.of("b", 5, 1551169100000L, "8", "1"),
   Tuple5.of("a", 4, 1551169079000L, "9", "1"),
   Tuple5.of("aa", 44, 1551169079000L, "10", "1"),
   Tuple5.of("b", 6, 1551169108000L, "11", "1")
};
           

主函数:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

public class IntervalJoinTest {
   public static void main(String[] args) throws Exception {
      Long delay = 5000L;
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.setParallelism(2);

      // 设置数据源
      DataStream<Tuple5<String, Integer, Long, String, String>> source = env.addSource(new DataSource1()).setParallelism(1).name("Demo Source");
      DataStream<Tuple3<String, Integer, Long>> source1 = env.addSource(new DataSource2()).setParallelism(1).name("Demo Source1");


      // 设置水位线
      KeyedStream<Tuple5<String, Integer, Long, String, String>, Tuple> keyedStream = source.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple5<String, Integer, Long, String, String>>(Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple5<String, Integer, Long, String, String> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      ).keyBy(0);
      DataStream<Tuple5<String, Integer, Long, String, String>> stream = keyedStream;

      // 设置水位线
      KeyedStream<Tuple3<String, Integer, Long>, Tuple> keyedStream1 = source1.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer, Long>>(Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple3<String, Integer, Long> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      ).keyBy(0);
      DataStream<Tuple3<String, Integer, Long>> stream1 = keyedStream1;


      keyedStream.intervalJoin(keyedStream1).between(Time.seconds(-5), Time.seconds(5)).
         lowerBoundExclusive().upperBoundExclusive().process(new ProcessJoinFunction<Tuple5<String, Integer, Long, String, String>, Tuple3<String, Integer, Long>, Object>() {
         @Override
         public void processElement(Tuple5<String, Integer, Long, String, String> left, Tuple3<String, Integer, Long> right, Context ctx, Collector<Object> out) throws Exception {
            System.out.println("left:" + left.toString() + " | right:" + right.toString());
         }
      }).print();


      env.execute("IntervalJoinWindowDemo");
   }
}
           
26.DataStream API之Operators(Overview)

Connect类demo:

数据源:

Tuple3[] elements1 = new Tuple3[]{
   Tuple3.of("a", 1, 1551169050000L),
   Tuple3.of("aa", 33, 1551169064000L),
   Tuple3.of("a", 2, 1551169054000L),
   Tuple3.of("a", 3, 1551169064000L),
   Tuple3.of("b", 5, 1551169100000L),
   Tuple3.of("a", 4, 1551169068999L),
   Tuple3.of("aa1", 44, 1551169079000L),
   Tuple3.of("b1", 6, 1551169108000L)
};

Tuple5[] elements1 = new Tuple5[]{

   Tuple5.of("a", 2, 1551169050002L, "w", "e"),
   Tuple5.of("a", 1, 1551169050002L, "2", "1"),
   Tuple5.of("a", 1, 1551169050001L, "3", "1"),
   Tuple5.of("aa", 33, 1551169064000L, "4", "1"),
   Tuple5.of("a", 2, 1551169054000L, "5", "1"),
   Tuple5.of("a", 1, 1551169050003L, "6", "1"),
   Tuple5.of("a", 3, 1551169064000L, "7", "1"),
   Tuple5.of("b", 5, 1551169100000L, "8", "1"),
   Tuple5.of("a", 4, 1551169079000L, "9", "1"),
   Tuple5.of("aa", 44, 1551169079000L, "10", "1"),
   Tuple5.of("b", 6, 1551169108000L, "11", "1")
};
           

主函数:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

public class ConnectedStreamsTest {
   public static void main(String[] args) throws Exception {
      Long delay = 5000L;
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.setParallelism(2);

      // 设置数据源
      DataStream<Tuple5<String, Integer, Long, String, String>> source = env.addSource(new DataSource1()).setParallelism(1).name("Demo Source");
      DataStream<Tuple3<String, Integer, Long>> source1 = env.addSource(new DataSource2()).setParallelism(1).name("Demo Source1");


      // 设置水位线
      DataStream<Tuple5<String, Integer, Long, String, String>> stream = source.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple5<String, Integer, Long, String, String>>(Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple5<String, Integer, Long, String, String> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      );

      // 设置水位线
      DataStream<Tuple3<String, Integer, Long>> stream1 = source1.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer, Long>>(Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple3<String, Integer, Long> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      );

      stream.connect(stream1).flatMap(new CoFlatMapFunction<Tuple5<String, Integer, Long, String, String>, Tuple3<String, Integer, Long>, Object>() {
         @Override
         public void flatMap1(Tuple5<String, Integer, Long, String, String> value, Collector<Object> out) throws Exception {
            System.out.println("flatMap1:" + value.toString());
         }

         @Override
         public void flatMap2(Tuple3<String, Integer, Long> value, Collector<Object> out) throws Exception {
            System.out.println("flatMap2:" + value.toString());
         }
      });


      env.execute("ConnectedStreamsDemo");
   }
}
           
26.DataStream API之Operators(Overview)

iterate()方法demo

数据源:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class DataSource3 extends RichParallelSourceFunction<Long> {

   @Override
   public void run(SourceContext<Long> ctx) throws Exception {
      Long[] elements1 = new Long[]{
         200l, -2l, -3l, -4l, -5l, -6l
      };

      for (Long a : elements1) {
         ctx.collect(a);
         Thread.sleep(300);
      }
   }

   @Override
   public void cancel() {

   }
}
           
package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class IterativeStreamTest {
   public static void main(String[] args) throws Exception {
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.setParallelism(1);

      // 设置数据源
      DataStream<Long> source = env.addSource(new DataSource3()).name("Demo Source");

      IterativeStream<Long> iterate = source.iterate();
      DataStream<Long> iterationBody = iterate.map(value -> {
         value = value - 1;
         return value;
      });
      DataStream<Long> feedback = iterationBody.filter(value -> value > 0);
      iterate.closeWith(feedback);

      iterationBody.filter(value -> value <= 0).print();

      env.execute("IterativeStreamDemo");
   }
}
           
26.DataStream API之Operators(Overview)

注意:整体并发度要与source并发度一样,不然会报一下错误:

Exception in thread "main" java.lang.UnsupportedOperationException: Parallelism of the feedback stream must match the parallelism of the original stream. Parallelism of original stream: 1; parallelism of feedback stream: 2. Parallelism can be modified using DataStream#setParallelism() method
	at org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:90)
	at org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
	at org.apache.flink.examples.java.dataStremAPI.operators.IterativeStreamTest.main(IterativeStreamTest.java:23)
           
26.DataStream API之Operators(Overview)

以下转换可用于元组的数据流:

Transformation Description

Project

DataStream → DataStream

从元组中选择字段的子集,输出到下游

DataStream<Tuple3<Integer, Double, String>> in = // [...]

DataStream<Tuple2<String, Integer>> out = in.project(2,0);

物理分区(Physical partitioning)

Flink还通过以下功能(如果需要的话)对转换后的流分区进行底层控制。

ransformation Description

Custom partitioning

DataStream → DataStream

使用用户自定义的分区来为每一个元素选择具体task. dataStream.partitionCustom(partitioner, "someKey");

dataStream.partitionCustom(partitioner, 0);

Random partitioning

DataStream → DataStream

按均匀分布随机划分元素.

dataStream.shuffle();

Rebalancing (Round-robin partitioning)

DataStream → DataStream

循环的为元素分区,为每一个分区创建相同的负载,这在数据倾斜的优化上是非常有用的:

dataStream.rebalance();

Rescaling

DataStream → DataStream

重复的分区元素到下游操作的子集中. 如果你想将一个source的并行实例

拆分到多个mapper操作的子集中来进行分布式加载,而又不希望调用rebalance()产生的全量重分区的话,这个方法是很有用的。这个函数只会根据其他配置参数如TaskManagers的slot数,来进行本地的数据传输而不是在网络中进行传输.

上游操作向其发送元素的下游操作子集取决于上游和下游操作的并行度。例如,如果上游操作算子并行度为2,下游操作算子并行度为6,那么上游的一个操作算子将会把元素分配给下游的三个操作算子,而另一个上游操作算子将元素分配给另外下游三个操作算子。另一方面,如果下游操作算子的并行度为2,上游操作算子的并行度为6,那么上游三个操作算子将把他们的元素分配给下游其中一个操作算子,而另外上游三个操作算子将分配给另一个下游操作算子。

在不同并行度不是彼此的倍数的情况下,一个或多个下游操作算子将会接收到来自上游操作算子的不同数量的输入。

见下图:

dataStream.rescale();

Broadcasting

DataStream → DataStream

将元素广播到每个分区上.

dataStream.broadcast();

26.DataStream API之Operators(Overview)

任务链和资源组(Task chaining and resource group)

链接两个连续的transformation操作,意味着将这两个操作放在同一个线程中执行来获得更好的性能。Flink默认情况下,会尽可能的将操作链接在一起(例如:两个连续的map操作),如果需要的话,Flink提供了细粒度的链接控制:

如果你想在你的整个作业中禁用链接操作的话,可以使用StreamExecutionEnvironment.disableOperatorChaining()。通过下面的方法可以获得更多关于链接控制的例子。因为这些操作依赖前一次transformation,所以只能用在DataStream的transformation操作之后,例如 你可以这么使用someStreaam.map(…).startNewChain(),但是你不能这么使用someStream.startNewChain()。

在Flink中一个资源组就是一个slots, 如果需要的话,你可以手动的拆分不同的操作到不同的slot中去。

Transformation Description
Start new chain

从这个操作开始,新启一个新的chain. 两个map操作将链接在一起,而filter将不再和第一个map链接在一起: 

someStream.filter(...).map(...).startNewChain().map(...);

Disable chaining

Map禁用链接操作

someStream.map(...).disableChaining();

Set slot sharing group

设置操作的slot共享组. Flink将把slot共享组的操作放到同一个slot中,而非slot共享组的操作放到其他的slot中. 这个机制可以用来做slot隔离. 如果所有的输入操作都在同一个slot共享组中,那么新的slot共享组将继承自输入操作的slot共享组. 默认的slot共享的组名称为"default", 可以通过调用slotSharingGroup("default")显式地将操作放入共享组中。.

someStream.filter(...).slotSharingGroup("name");

https://www.jianshu.com/p/ea80d15e9b5e

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/

继续阅读