天天看點

26.DataStream API之Operators(Overview)

flink 1.9

操作算子介紹:

操作算子operator可以将一個資料流轉換成另一個新的資料流,程式可以通過operator将多個轉換組合成複雜的資料流拓撲。

本節将描述基本轉換、應用這些轉換之後的有效實體分區以及對Flink操作算子連結的了解。

DataStream Transformations

Transformation Description

Map

DataStream → DataStream

擷取一個元素并生成一個元素。一個map函數,使輸入流的值*2:

DataStream<Integer> dataStream = //...

dataStream.map(new MapFunction<Integer, Integer>() {

@Override

public Integer map(Integer value) throws Exception {

return 2 * value;

}

});

FlatMap

DataStream → DataStream

擷取一個元素并生成零個、一個或多個元素。将句子分割成單詞:

dataStream.flatMap(new FlatMapFunction<String, String>() {

@Override

public void flatMap(String value, Collector<String> out)

throws Exception {

for(String word: value.split(" ")){

out.collect(word);

}

}

});

Filter

DataStream → DataStream

為每個元素計算布爾值的函數,并保留函數傳回為true的元素。過濾掉為零的元素的過濾器:

dataStream.filter(new FilterFunction<Integer>() {

@Override

public boolean filter(Integer value) throws Exception {

return value != 0;

}

});

KeyBy

DataStream → KeyedStream

邏輯上将流劃分為不相交的分區partitions。所有具有相同key的記錄都被配置設定到同一個分區上。在内部,keyBy()是通過哈希分區實作的。有不同的方法來指定key(specify keys)。這個轉換傳回一個KeyedStream,它是使用key state所必需的。

dataStream.keyBy("someKey") // Key by field "someKey"

dataStream.keyBy(0) // Key by the first element of a Tuple

Attention 以下情況下,不能作為key進行分組:

1.如果類型是POJO類型,切記不覆寫hashCode()方法,因為依賴于Object.hashCode()的實作。

2.任何類型的數組。

Reduce

KeyedStream → DataStream

key資料流上的“滾動”增量彙總.将目前元素和上一個增量彙總的值相累加. 

A reduce function that creates a stream of partial sums:

keyedStream.reduce(new ReduceFunction<Integer>() {

@Override

public Integer reduce(Integer value1, Integer value2)

throws Exception {

return value1 + value2;

}

});

Fold

KeyedStream → DataStream

具有初始值的key資料流上的“滾動”增量彙總。将目前元素與最後一個增量彙總的值相組合,并發出新值。

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...

DataStream<String> result =

keyedStream.fold("start", new FoldFunction<Integer, String>() {

@Override

public String fold(String current, Integer value) {

return current + "-" + value;

}

});

Aggregations

KeyedStream → DataStream

分組資料流上的滾動聚合操作. min和minBy的差別是min傳回的是一個最小值,而minBy傳回的是其字段中包含最小值的元素(同樣原理适用于max和maxBy)。

keyedStream.sum(0);

keyedStream.sum("key");

keyedStream.min(0);

keyedStream.min("key");

keyedStream.max(0);

keyedStream.max("key");

keyedStream.minBy(0);

keyedStream.minBy("key");

keyedStream.maxBy(0);

keyedStream.maxBy("key");

Window

KeyedStream → WindowedStream

Windows可以在已經分區的KeyedStreams上定義。Windows根據某些特性(例如,最近5秒内到達的資料)對每個key中的資料進行分組。有關windows的完整描述,請參見windows

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

WindowAll

DataStream → AllWindowedStream

Windows可以在正常資料流上定義。Windows根據一些特性(例如,最近5秒内到達的資料)對所有流事件進行分組。有關windows的完整描述,請參見windows。

注意: 這個操作在許多情況下非并行操作. 所有的記錄都會聚集到一個windowAll操作的任務中。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

Window Apply

WindowedStream → DataStream

AllWindowedStream → DataStream

将一個通用函數作為一個整體傳給window. 下面是一個手動對window中的元素進行求和的函數.

Note: 如果正在使用windowAll轉換,則需要使用AllWindowFunction。

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {

public void apply (Tuple tuple,

Window window,

Iterable<Tuple2<String, Integer>> values,

Collector<Integer> out) throws Exception {

int sum = 0;

for (value t: values) {

sum += t.f1;

}

out.collect (new Integer(sum));

}

});

// applying an AllWindowFunction on non-keyed window stream

allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {

public void apply (Window window,

Iterable<Tuple2<String, Integer>> values,

Collector<Integer> out) throws Exception {

int sum = 0;

for (value t: values) {

sum += t.f1;

}

out.collect (new Integer(sum));

}

});

Window Reduce

WindowedStream → DataStream

 給window賦一個reduce功能的函數,并傳回一個規約的結果.

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {

public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {

return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);

}

});

Window Fold

WindowedStream → DataStream

給視窗賦一個fold功能的函數,并傳回一個fold後的結果. 對于這個函數,當我們傳入 (1,2,3,4,5)這個序列時, 将會得到如下的結果: "start-1-2-3-4-5":

windowedStream.fold("start", new FoldFunction<Integer, String>() {

public String fold(String current, Integer value) {

return current + "-" + value;

}

});

Aggregations on windows

WindowedStream → DataStream

對window的元素做聚合操作. min和 minBy的差別是min傳回的是最小值,而minBy傳回的是包含最小值字段的元素。(同樣的原理适用于 max 和 maxBy).

windowedStream.sum(0);

windowedStream.sum("key");

windowedStream.min(0);

windowedStream.min("key");

windowedStream.max(0);

windowedStream.max("key");

windowedStream.minBy(0);

windowedStream.minBy("key");

windowedStream.maxBy(0);

windowedStream.maxBy("key");

Union

DataStream* → DataStream

對兩個或者兩個以上的DataStream進行union操作,産生一個包含所有DataStream元素的新DataStream.注意:如果你将一個DataStream跟它自己做union操作,在新的DataStream中,你将看到每一個元素都出現兩次.

dataStream.union(otherStream1, otherStream2, ...);

Window Join

DataStream,DataStream → DataStream

根據一個給定的key和window對兩個DataStream做join操作.

dataStream.join(otherStream)

.where(<key selector>).equalTo(<key selector>)

.window(TumblingEventTimeWindows.of(Time.seconds(3)))

.apply (new JoinFunction () {...});

Interval Join

KeyedStream,KeyedStream → DataStream

在給定的時間間隔内,将兩個key流的兩個元素e1和e2用一個公共鍵連接配接起來,so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

// this will join the two streams so that

// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2

keyedStream.intervalJoin(otherKeyedStream)

.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound

.upperBoundExclusive(true) // optional

.lowerBoundExclusive(true) // optional

.process(new ProcessJoinFunction() {...});

注意:IntervalJoinFunction官網寫的這個類沒找到。

Window CoGroup

DataStream,DataStream → DataStream

根據一個給定的key和window對兩個DataStream做Cogroups操作.

dataStream.coGroup(otherStream)

.where(0).equalTo(1)

.window(TumblingEventTimeWindows.of(Time.seconds(3)))

.apply (new CoGroupFunction () {...});

Connect

DataStream,DataStream → ConnectedStreams

連接配接兩個保持他們類型的資料流。允許兩個流之間共享狀态的連接配接。左右;流式一對一的連接配接。

DataStream<Integer> someStream = //...

DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

CoMap, CoFlatMap

ConnectedStreams → DataStream

作用于connected 資料流上,功能與map和flatMap一樣。

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {

@Override

public Boolean map1(Integer value) {

return true;

}

@Override

public Boolean map2(String value) {

return false;

}

});

connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

@Override

public void flatMap1(Integer value, Collector<String> out) {

out.collect(value.toString());

}

@Override

public void flatMap2(String value, Collector<String> out) {

for (String word: value.split(" ")) {

out.collect(word);

}

}

});

Split

DataStream → SplitStream

根據某些特征把一個DataStream拆分成兩個或者多個DataStream.

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {

@Override

public Iterable<String> select(Integer value) {

List<String> output = new ArrayList<String>();

if (value % 2 == 0) {

output.add("even");

}

else {

output.add("odd");

}

return output;

}

});

Select

SplitStream → DataStream

從一個SplitStream中擷取一個或者多個DataStream.

SplitStream<Integer> split;

DataStream<Integer> even = split.select("even");

DataStream<Integer> odd = split.select("odd");

DataStream<Integer> all = split.select("even","odd");

Iterate

DataStream →

IterativeStream → DataStream

 通過将一個操作算子的輸出重定向到前面的某個操作算子,在流中建立一個“回報(feedback)”循環。這對于定義不斷更新模型的算法特别有用。下面的代碼從一個流開始,并持續地應用疊代體。大于0的元素被發送回回報(feedback)通道,其餘的元素被轉發到下遊. 請參考iterations來擷取疊代的完整描述.

IterativeStream<Long> iteration = initialStream.iterate();

DataStream<Long> iterationBody = iteration.map ();

DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){

@Override

public boolean filter(Long value) throws Exception {

return value > 0;

}

});

iteration.closeWith(feedback);

DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){

@Override

public boolean filter(Long value) throws Exception {

return value <= 0;

}

});

Extract Timestamps

DataStream → DataStream

提取記錄中的時間戳來跟需要事件時間的window一起發揮作用. 更多詳情參考 Event Time.

stream.assignTimestamps (new TimeStampExtractor() {...});(已廢棄)

stream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks())

min()和minBy()差別舉例:

資料源:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class DataSource1 extends RichParallelSourceFunction<Tuple5<String, Integer, Long, String, String>> {
   private volatile boolean running = true;

   @Override
   public void run(SourceContext<Tuple5<String, Integer, Long, String, String>> ctx) throws InterruptedException {

      Tuple5[] elements1 = new Tuple5[]{

         Tuple5.of("a", 2, 1551169050002L, "w", "e"),
         Tuple5.of("a", 1, 1551169050002L, "2", "1"),
         Tuple5.of("a", 1, 1551169050001L, "3", "1"),
         Tuple5.of("aa", 33, 1551169064000L, "4", "1"),
         Tuple5.of("a", 2, 1551169054000L, "5", "1"),
         Tuple5.of("a", 1, 1551169050003L, "6", "1"),
         Tuple5.of("a", 3, 1551169064000L, "7", "1"),
         Tuple5.of("b", 5, 1551169100000L, "8", "1"),
         Tuple5.of("a", 4, 1551169079000L, "9", "1"),
         Tuple5.of("aa", 44, 1551169079000L, "10", "1"),
         Tuple5.of("b", 6, 1551169108000L, "11", "1")
      };

      int count = 0;
      while (running && count < elements1.length) {
         ctx.collect(new Tuple5<>((String)elements1[count].f0,(Integer)elements1[count].f1,(Long)elements1[count].f2,
            (String)elements1[count].f3,(String)elements1[count].f4));
         count++;
         Thread.sleep(300);
      }
   }

   @Override
   public void cancel() {
      running = false;
   }
}
           

 主函數:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

public class KeyDemo1 {
   public static void main(String[] args) throws Exception {

      Long delay = 5000L;
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.setParallelism(2);

      // 設定資料源
      DataStream<Tuple5<String, Integer, Long, String, String>> source = env.addSource(new DataSource1()).setParallelism(1).name("Demo Source");

      // 設定水位線
      DataStream<Tuple5<String, Integer, Long, String, String>> stream = source.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple5<String, Integer, Long, String, String>>(org.apache.flink.streaming.api.windowing.time.Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple5<String, Integer, Long, String, String> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      );


      stream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(10))).minBy(1).print();
      env.execute("TimeWindowDemo");
   }
}
           

調用minBy()輸出:

26.DataStream API之Operators(Overview)

調用min()輸出:

26.DataStream API之Operators(Overview)

總結:

min 和 sql 上的 MIN 是一樣的作用,取frequency的最小值。 minBy 是取 frequency 最小的一整行。

join()方法Demo:

資料源1:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class DataSource1 extends RichParallelSourceFunction<Tuple5<String, Integer, Long, String, String>> {
   private volatile boolean running = true;

   @Override
   public void run(SourceContext<Tuple5<String, Integer, Long, String, String>> ctx) throws InterruptedException {
      Tuple5[] elements1 = new Tuple5[]{
         Tuple5.of("a", 2, 1551169050002L, "w", "e"),
         Tuple5.of("a", 1, 1551169050002L, "2", "1"),
         Tuple5.of("a", 1, 1551169050001L, "3", "1"),
         Tuple5.of("aa", 33, 1551169064000L, "4", "1"),
         Tuple5.of("a", 2, 1551169054000L, "5", "1"),
         Tuple5.of("a", 1, 1551169050003L, "6", "1"),
         Tuple5.of("a", 3, 1551169064000L, "7", "1"),
         Tuple5.of("b", 5, 1551169100000L, "8", "1"),
         Tuple5.of("a", 4, 1551169079000L, "9", "1"),
         Tuple5.of("aa", 44, 1551169079000L, "10", "1"),
         Tuple5.of("b", 6, 1551169108000L, "11", "1")
      };

      int count = 0;
      while (running && count < elements1.length) {
         ctx.collect(new Tuple5<>((String)elements1[count].f0,(Integer)elements1[count].f1,(Long)elements1[count].f2,
            (String)elements1[count].f3,(String)elements1[count].f4));
         count++;
         Thread.sleep(300);
      }
   }

   @Override
   public void cancel() {
      running = false;
   }
}
           

資料源2:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class DataSource2 extends RichParallelSourceFunction<Tuple3<String, Integer, Long>> {
   private volatile boolean running = true;

   @Override
   public void run(SourceContext<Tuple3<String, Integer, Long>> ctx) throws InterruptedException {

      Tuple3[] elements1 = new Tuple3[]{
         Tuple3.of("a", 1, 1551169050000L),
         Tuple3.of("aa", 33, 1551169064000L),
         Tuple3.of("a", 2, 1551169054000L),
         Tuple3.of("a", 3, 1551169064000L),
         Tuple3.of("b", 5, 1551169100000L),
         Tuple3.of("a", 4, 1551169079000L),
         Tuple3.of("aa1", 44, 1551169079000L),
         Tuple3.of("b1", 6, 1551169108000L)
      };

      int count = 0;
      while (running && count < elements1.length) {
         ctx.collect(new Tuple3<>((String)elements1[count].f0,(Integer)elements1[count].f1,(Long)elements1[count].f2));
         count++;
         Thread.sleep(300);
      }
   }

   @Override
   public void cancel() {
      running = false;
   }
}
           

 主函數:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.JoinedStreams;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.text.SimpleDateFormat;

public class JoinTest {

   public static void main(String[] args) throws Exception{
      Long delay = 5000L;
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.setParallelism(2);

      // 設定資料源
      DataStream<Tuple5<String, Integer, Long, String, String>> source = env.addSource(new DataSource1()).setParallelism(1).name("Demo Source");
      DataStream<Tuple3<String, Integer, Long>> source1 = env.addSource(new DataSource2()).setParallelism(1).name("Demo Source1");


      // 設定水位線
      DataStream<Tuple5<String, Integer, Long, String, String>> stream = source.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple5<String, Integer, Long, String, String>>(org.apache.flink.streaming.api.windowing.time.Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple5<String, Integer, Long, String, String> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      );
      // 設定水位線
      DataStream<Tuple3<String, Integer, Long>> stream1 = source1.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer, Long>>(org.apache.flink.streaming.api.windowing.time.Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple3<String, Integer, Long> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      );

//    stream.join(stream1).where((KeySelector<Tuple5<String, Integer, Long, String, String>, String>) value -> {int a;return "ds";})
//       .equalTo((KeySelector<Tuple3<String,Integer,Long>,String>) value -> null);

      stream.join(stream1).where(value-> value.f0).equalTo(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .apply((first, second) -> first.toString() +"|"+ second.toString()).print();

//    stream.join(stream1).where(value-> value.f0).equalTo(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(10)))
//    .apply((Tuple5<String, Integer, Long, String, String> first, Tuple3<String, Integer, Long> second) -> {return null;});

//    stream.join(stream1).where(new KeySelector<Tuple5<String,Integer,Long,String,String>, String>() {
//       @Override
//       public String getKey(Tuple5<String, Integer, Long, String, String> value) throws Exception {
//          return null;
//       }
//    });



      env.execute("joinWindowDemo");
   }
}
           
26.DataStream API之Operators(Overview)

Interval Join方法 demo

資料源:

Tuple3[] elements1 = new Tuple3[]{
   Tuple3.of("a", 1, 1551169050000L),
   Tuple3.of("aa", 33, 1551169064000L),
   Tuple3.of("a", 2, 1551169054000L),
   Tuple3.of("a", 3, 1551169064000L),
   Tuple3.of("b", 5, 1551169100000L),
   Tuple3.of("a", 4, 1551169068999L),
   Tuple3.of("aa1", 44, 1551169079000L),
   Tuple3.of("b1", 6, 1551169108000L)
};

Tuple5[] elements1 = new Tuple5[]{

   Tuple5.of("a", 2, 1551169050002L, "w", "e"),
   Tuple5.of("a", 1, 1551169050002L, "2", "1"),
   Tuple5.of("a", 1, 1551169050001L, "3", "1"),
   Tuple5.of("aa", 33, 1551169064000L, "4", "1"),
   Tuple5.of("a", 2, 1551169054000L, "5", "1"),
   Tuple5.of("a", 1, 1551169050003L, "6", "1"),
   Tuple5.of("a", 3, 1551169064000L, "7", "1"),
   Tuple5.of("b", 5, 1551169100000L, "8", "1"),
   Tuple5.of("a", 4, 1551169079000L, "9", "1"),
   Tuple5.of("aa", 44, 1551169079000L, "10", "1"),
   Tuple5.of("b", 6, 1551169108000L, "11", "1")
};
           

主函數:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

public class IntervalJoinTest {
   public static void main(String[] args) throws Exception {
      Long delay = 5000L;
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.setParallelism(2);

      // 設定資料源
      DataStream<Tuple5<String, Integer, Long, String, String>> source = env.addSource(new DataSource1()).setParallelism(1).name("Demo Source");
      DataStream<Tuple3<String, Integer, Long>> source1 = env.addSource(new DataSource2()).setParallelism(1).name("Demo Source1");


      // 設定水位線
      KeyedStream<Tuple5<String, Integer, Long, String, String>, Tuple> keyedStream = source.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple5<String, Integer, Long, String, String>>(Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple5<String, Integer, Long, String, String> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      ).keyBy(0);
      DataStream<Tuple5<String, Integer, Long, String, String>> stream = keyedStream;

      // 設定水位線
      KeyedStream<Tuple3<String, Integer, Long>, Tuple> keyedStream1 = source1.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer, Long>>(Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple3<String, Integer, Long> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      ).keyBy(0);
      DataStream<Tuple3<String, Integer, Long>> stream1 = keyedStream1;


      keyedStream.intervalJoin(keyedStream1).between(Time.seconds(-5), Time.seconds(5)).
         lowerBoundExclusive().upperBoundExclusive().process(new ProcessJoinFunction<Tuple5<String, Integer, Long, String, String>, Tuple3<String, Integer, Long>, Object>() {
         @Override
         public void processElement(Tuple5<String, Integer, Long, String, String> left, Tuple3<String, Integer, Long> right, Context ctx, Collector<Object> out) throws Exception {
            System.out.println("left:" + left.toString() + " | right:" + right.toString());
         }
      }).print();


      env.execute("IntervalJoinWindowDemo");
   }
}
           
26.DataStream API之Operators(Overview)

Connect類demo:

資料源:

Tuple3[] elements1 = new Tuple3[]{
   Tuple3.of("a", 1, 1551169050000L),
   Tuple3.of("aa", 33, 1551169064000L),
   Tuple3.of("a", 2, 1551169054000L),
   Tuple3.of("a", 3, 1551169064000L),
   Tuple3.of("b", 5, 1551169100000L),
   Tuple3.of("a", 4, 1551169068999L),
   Tuple3.of("aa1", 44, 1551169079000L),
   Tuple3.of("b1", 6, 1551169108000L)
};

Tuple5[] elements1 = new Tuple5[]{

   Tuple5.of("a", 2, 1551169050002L, "w", "e"),
   Tuple5.of("a", 1, 1551169050002L, "2", "1"),
   Tuple5.of("a", 1, 1551169050001L, "3", "1"),
   Tuple5.of("aa", 33, 1551169064000L, "4", "1"),
   Tuple5.of("a", 2, 1551169054000L, "5", "1"),
   Tuple5.of("a", 1, 1551169050003L, "6", "1"),
   Tuple5.of("a", 3, 1551169064000L, "7", "1"),
   Tuple5.of("b", 5, 1551169100000L, "8", "1"),
   Tuple5.of("a", 4, 1551169079000L, "9", "1"),
   Tuple5.of("aa", 44, 1551169079000L, "10", "1"),
   Tuple5.of("b", 6, 1551169108000L, "11", "1")
};
           

主函數:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

public class ConnectedStreamsTest {
   public static void main(String[] args) throws Exception {
      Long delay = 5000L;
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.setParallelism(2);

      // 設定資料源
      DataStream<Tuple5<String, Integer, Long, String, String>> source = env.addSource(new DataSource1()).setParallelism(1).name("Demo Source");
      DataStream<Tuple3<String, Integer, Long>> source1 = env.addSource(new DataSource2()).setParallelism(1).name("Demo Source1");


      // 設定水位線
      DataStream<Tuple5<String, Integer, Long, String, String>> stream = source.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple5<String, Integer, Long, String, String>>(Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple5<String, Integer, Long, String, String> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      );

      // 設定水位線
      DataStream<Tuple3<String, Integer, Long>> stream1 = source1.assignTimestampsAndWatermarks(
         new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer, Long>>(Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple3<String, Integer, Long> element) {
               SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//             System.out.println(element.f0 + "\t" + element.f1 + " watermark -> " + format.format(getCurrentWatermark().getTimestamp()) + " timestamp -> " + format.format(element.f2));
               return element.f2;
            }
         }
      );

      stream.connect(stream1).flatMap(new CoFlatMapFunction<Tuple5<String, Integer, Long, String, String>, Tuple3<String, Integer, Long>, Object>() {
         @Override
         public void flatMap1(Tuple5<String, Integer, Long, String, String> value, Collector<Object> out) throws Exception {
            System.out.println("flatMap1:" + value.toString());
         }

         @Override
         public void flatMap2(Tuple3<String, Integer, Long> value, Collector<Object> out) throws Exception {
            System.out.println("flatMap2:" + value.toString());
         }
      });


      env.execute("ConnectedStreamsDemo");
   }
}
           
26.DataStream API之Operators(Overview)

iterate()方法demo

資料源:

package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class DataSource3 extends RichParallelSourceFunction<Long> {

   @Override
   public void run(SourceContext<Long> ctx) throws Exception {
      Long[] elements1 = new Long[]{
         200l, -2l, -3l, -4l, -5l, -6l
      };

      for (Long a : elements1) {
         ctx.collect(a);
         Thread.sleep(300);
      }
   }

   @Override
   public void cancel() {

   }
}
           
package org.apache.flink.examples.java.dataStremAPI.operators;

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class IterativeStreamTest {
   public static void main(String[] args) throws Exception {
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      env.setParallelism(1);

      // 設定資料源
      DataStream<Long> source = env.addSource(new DataSource3()).name("Demo Source");

      IterativeStream<Long> iterate = source.iterate();
      DataStream<Long> iterationBody = iterate.map(value -> {
         value = value - 1;
         return value;
      });
      DataStream<Long> feedback = iterationBody.filter(value -> value > 0);
      iterate.closeWith(feedback);

      iterationBody.filter(value -> value <= 0).print();

      env.execute("IterativeStreamDemo");
   }
}
           
26.DataStream API之Operators(Overview)

注意:整體并發度要與source并發度一樣,不然會報一下錯誤:

Exception in thread "main" java.lang.UnsupportedOperationException: Parallelism of the feedback stream must match the parallelism of the original stream. Parallelism of original stream: 1; parallelism of feedback stream: 2. Parallelism can be modified using DataStream#setParallelism() method
	at org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:90)
	at org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
	at org.apache.flink.examples.java.dataStremAPI.operators.IterativeStreamTest.main(IterativeStreamTest.java:23)
           
26.DataStream API之Operators(Overview)

以下轉換可用于元組的資料流:

Transformation Description

Project

DataStream → DataStream

從元組中選擇字段的子集,輸出到下遊

DataStream<Tuple3<Integer, Double, String>> in = // [...]

DataStream<Tuple2<String, Integer>> out = in.project(2,0);

實體分區(Physical partitioning)

Flink還通過以下功能(如果需要的話)對轉換後的流分區進行底層控制。

ransformation Description

Custom partitioning

DataStream → DataStream

使用使用者自定義的分區來為每一個元素選擇具體task. dataStream.partitionCustom(partitioner, "someKey");

dataStream.partitionCustom(partitioner, 0);

Random partitioning

DataStream → DataStream

按均勻分布随機劃分元素.

dataStream.shuffle();

Rebalancing (Round-robin partitioning)

DataStream → DataStream

循環的為元素分區,為每一個分區建立相同的負載,這在資料傾斜的優化上是非常有用的:

dataStream.rebalance();

Rescaling

DataStream → DataStream

重複的分區元素到下遊操作的子集中. 如果你想将一個source的并行執行個體

拆分到多個mapper操作的子集中來進行分布式加載,而又不希望調用rebalance()産生的全量重分區的話,這個方法是很有用的。這個函數隻會根據其他配置參數如TaskManagers的slot數,來進行本地的資料傳輸而不是在網絡中進行傳輸.

上遊操作向其發送元素的下遊操作子集取決于上遊和下遊操作的并行度。例如,如果上遊操作算子并行度為2,下遊操作算子并行度為6,那麼上遊的一個操作算子将會把元素配置設定給下遊的三個操作算子,而另一個上遊操作算子将元素配置設定給另外下遊三個操作算子。另一方面,如果下遊操作算子的并行度為2,上遊操作算子的并行度為6,那麼上遊三個操作算子将把他們的元素配置設定給下遊其中一個操作算子,而另外上遊三個操作算子将配置設定給另一個下遊操作算子。

在不同并行度不是彼此的倍數的情況下,一個或多個下遊操作算子将會接收到來自上遊操作算子的不同數量的輸入。

見下圖:

dataStream.rescale();

Broadcasting

DataStream → DataStream

将元素廣播到每個分區上.

dataStream.broadcast();

26.DataStream API之Operators(Overview)

任務鍊和資源組(Task chaining and resource group)

連結兩個連續的transformation操作,意味着将這兩個操作放在同一個線程中執行來獲得更好的性能。Flink預設情況下,會盡可能的将操作連結在一起(例如:兩個連續的map操作),如果需要的話,Flink提供了細粒度的連結控制:

如果你想在你的整個作業中禁用連結操作的話,可以使用StreamExecutionEnvironment.disableOperatorChaining()。通過下面的方法可以獲得更多關于連結控制的例子。因為這些操作依賴前一次transformation,是以隻能用在DataStream的transformation操作之後,例如 你可以這麼使用someStreaam.map(…).startNewChain(),但是你不能這麼使用someStream.startNewChain()。

在Flink中一個資源組就是一個slots, 如果需要的話,你可以手動的拆分不同的操作到不同的slot中去。

Transformation Description
Start new chain

從這個操作開始,新啟一個新的chain. 兩個map操作将連結在一起,而filter将不再和第一個map連結在一起: 

someStream.filter(...).map(...).startNewChain().map(...);

Disable chaining

Map禁用連結操作

someStream.map(...).disableChaining();

Set slot sharing group

設定操作的slot共享組. Flink将把slot共享組的操作放到同一個slot中,而非slot共享組的操作放到其他的slot中. 這個機制可以用來做slot隔離. 如果所有的輸入操作都在同一個slot共享組中,那麼新的slot共享組将繼承自輸入操作的slot共享組. 預設的slot共享的組名稱為"default", 可以通過調用slotSharingGroup("default")顯式地将操作放入共享組中。.

someStream.filter(...).slotSharingGroup("name");

https://www.jianshu.com/p/ea80d15e9b5e

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/

繼續閱讀