Interval Join
KeyedStream,KeyedStream → DataStream #
Join two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that
e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
.
Interval Join會将兩個資料流按照相同的key,并且在其中一個流的時間範圍内的資料進行join處理。通常用于把一定時間範圍内相關的分組資料拉成一個寬表。我們通常可以用類似下面的表達式來使用interval Join來處理兩個資料流:
key1 == key2 && e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

我們通常可以使用下面的程式設計模型來處理兩個資料流:
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});
2.操作DataSet
執行個體如下:
public class JoinDemo {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String,Integer>> data1=env.fromElements(
Tuple2.of("class1",100),
Tuple2.of("class1",400),
Tuple2.of("class2",200),
Tuple2.of("class2",400)
);
DataSet<Tuple2<String,Integer>> data2=env.fromElements(
Tuple2.of("class1",300),
Tuple2.of("class1",600),
Tuple2.of("class2",200),
Tuple2.of("class3",200)
);
data1.join(data2)
.where(0).equalTo(0)
.with(new JoinFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, Object>() {
@Override
public Object join(Tuple2<String, Integer> tuple1,
Tuple2<String, Integer> tuple2) throws Exception {
return new String(tuple1.f0+" : "+tuple1.f1+" "+tuple2.f1);
}
}).print();
}
}
運作結果:
class1 : 100 300
class1 : 400 300
class1 : 100 600
class1 : 400 600
class2 : 200 200
class2 : 400 200