在資料庫中的靜态表上做 OLAP 分析時,兩表 join 是非常常見的操作。同理,在流式處理作業中,有時也需要在兩條流上做 join 以獲得更豐富的資訊。Flink DataStream API 為使用者提供了3個算子來實作雙流 join,分别是:
- join()
- coGroup()
- intervalJoin()
本文舉例說明它們的使用方法,順便聊聊比較特殊的 interval join 的原理。
準備資料
從 Kafka 分别接入點選流和訂單流,并轉化為 POJO。
DataStream<String> clickSourceStream = env
.addSource(new FlinkKafkaConsumer011<>(
"ods_analytics_access_log",
new SimpleStringSchema(),
kafkaProps
).setStartFromLatest());
DataStream<String> orderSourceStream = env
.addSource(new FlinkKafkaConsumer011<>(
"ods_ms_order_done",
new SimpleStringSchema(),
kafkaProps
).setStartFromLatest());
DataStream<AnalyticsAccessLogRecord> clickRecordStream = clickSourceStream
.map(message -> JSON.parseObject(message, AnalyticsAccessLogRecord.class));
DataStream<OrderDoneLogRecord> orderRecordStream = orderSourceStream
.map(message -> JSON.parseObject(message, OrderDoneLogRecord.class));
join() 算子提供的語義為"Window join",即按照指定字段和(滾動/滑動/會話)視窗進行 inner join,支援處理時間和事件時間兩種時間特征。以下示例以10秒滾動視窗,将兩個流通過商品 ID 關聯,取得訂單流中的售價相關字段。

clickRecordStream
.join(orderRecordStream)
.where(record -> record.getMerchandiseId())
.equalTo(record -> record.getMerchandiseId())
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply(new JoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
@Override
public String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception {
return StringUtils.join(Arrays.asList(
accessRecord.getMerchandiseId(),
orderRecord.getPrice(),
orderRecord.getCouponMoney(),
orderRecord.getRebateAmount()
), '\t');
}
})
.print().setParallelism(1);
簡單易用。
隻有 inner join 肯定還不夠,如何實作 left/right outer join 呢?答案就是利用 coGroup() 算子。它的調用方式類似于 join() 算子,也需要開窗,但是 CoGroupFunction 比 JoinFunction 更加靈活,可以按照使用者指定的邏輯比對左流和/或右流的資料并輸出。
以下的例子就實作了點選流 left join 訂單流的功能,是很樸素的 nested loop join 思想(二重循環)。
clickRecordStream
.coGroup(orderRecordStream)
.where(record -> record.getMerchandiseId())
.equalTo(record -> record.getMerchandiseId())
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply(new CoGroupFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, Tuple2<String, Long>>() {
@Override
public void coGroup(Iterable<AnalyticsAccessLogRecord> accessRecords, Iterable<OrderDoneLogRecord> orderRecords, Collector<Tuple2<String, Long>> collector) throws Exception {
for (AnalyticsAccessLogRecord accessRecord : accessRecords) {
boolean isMatched = false;
for (OrderDoneLogRecord orderRecord : orderRecords) {
// 右流中有對應的記錄
collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice()));
isMatched = true;
}
if (!isMatched) {
// 右流中沒有對應的記錄
collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null));
}
}
}
})
.print().setParallelism(1);
join() 和 coGroup() 都是基于視窗做關聯的。但是在某些情況下,兩條流的資料步調未必一緻。例如,訂單流的資料有可能在點選流的購買動作發生之後很久才被寫入,如果用視窗來圈定,很容易 join 不上。是以 Flink 又提供了"Interval join"的語義,按照指定字段以及右流相對左流偏移的時間區間進行關聯,即:
right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]
interval join 也是 inner join,雖然不需要開窗,但是需要使用者指定偏移區間的上下界,并且隻支援事件時間。
示例代碼如下。注意在運作之前,需要分别在兩個流上應用 assignTimestampsAndWatermarks() 方法擷取事件時間戳和水印。
clickRecordStream
.keyBy(record -> record.getMerchandiseId())
.intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
.between(Time.seconds(-30), Time.seconds(30))
.process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
@Override
public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
collector.collect(StringUtils.join(Arrays.asList(
accessRecord.getMerchandiseId(),
orderRecord.getPrice(),
orderRecord.getCouponMoney(),
orderRecord.getRebateAmount()
), '\t'));
}
})
.print().setParallelism(1);
由上可見,interval join 與 window join 不同,是兩個 KeyedStream 之上的操作,并且需要調用 between() 方法指定偏移區間的上下界。如果想令上下界是開區間,可以調用 upperBoundExclusive()/lowerBoundExclusive() 方法。
interval join 的實作原理
以下是 KeyedStream.process(ProcessJoinFunction) 方法調用的重載方法的邏輯。
public <OUT> SingleOutputStreamOperator<OUT> process(
ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
TypeInformation<OUT> outputType) {
Preconditions.checkNotNull(processJoinFunction);
Preconditions.checkNotNull(outputType);
final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
new IntervalJoinOperator<>(
lowerBound,
upperBound,
lowerBoundInclusive,
upperBoundInclusive,
left.getType().createSerializer(left.getExecutionConfig()),
right.getType().createSerializer(right.getExecutionConfig()),
cleanedUdf
);
return left
.connect(right)
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);
}
可見是先對兩條流執行 connect() 和 keyBy() 操作,然後利用 IntervalJoinOperator 算子進行轉換。在 IntervalJoinOperator 中,會利用兩個 MapState 分别緩存左流和右流的資料。
private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
LEFT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
));
this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
RIGHT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
));
}
其中 Long 表示事件時間戳,List> 表示該時刻到來的資料記錄。當左流和右流有資料到達時,會分别調用 processElement1() 和 processElement2() 方法,它們都調用了 processElement() 方法,代碼如下。
@Override
public void processElement1(StreamRecord<T1> record) throws Exception {
processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
}
@Override
public void processElement2(StreamRecord<T2> record) throws Exception {
processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
}
@SuppressWarnings("unchecked")
private <THIS, OTHER> void processElement(
final StreamRecord<THIS> record,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
final long relativeLowerBound,
final long relativeUpperBound,
final boolean isLeft) throws Exception {
final THIS ourValue = record.getValue();
final long ourTimestamp = record.getTimestamp();
if (ourTimestamp == Long.MIN_VALUE) {
throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
"interval stream joins need to have timestamps meaningful timestamps.");
}
if (isLate(ourTimestamp)) {
return;
}
addToBuffer(ourBuffer, ourValue, ourTimestamp);
for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
final long timestamp = bucket.getKey();
if (timestamp < ourTimestamp + relativeLowerBound ||
timestamp > ourTimestamp + relativeUpperBound) {
continue;
}
for (BufferEntry<OTHER> entry: bucket.getValue()) {
if (isLeft) {
collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
} else {
collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
}
}
}
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
if (isLeft) {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
} else {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
}
}
這段代碼的思路是:
- 取得目前流 StreamRecord 的時間戳,調用 isLate() 方法判斷它是否是遲到資料(即時間戳小于目前水印值),如是則丢棄。
- 調用 addToBuffer() 方法,将時間戳和資料一起插入目前流對應的 MapState。
- 周遊另外一個流的 MapState,如果資料滿足前述的時間區間條件,則調用 collect() 方法将該條資料投遞給使用者定義的 ProcessJoinFunction 進行處理。collect() 方法的代碼如下,注意結果對應的時間戳是左右流時間戳裡較大的那個。
private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
collector.setAbsoluteTimestamp(resultTimestamp);
context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);
userFunction.processElement(left, right, context, collector);
}
- 調用 TimerService.registerEventTimeTimer() 注冊時間戳為 timestamp + relativeUpperBound 的定時器,該定時器負責在水印超過區間的上界時執行狀态的清理邏輯,防止資料堆積。注意左右流的定時器所屬的 namespace 是不同的,具體邏輯則位于 onEventTime() 方法中。
@Override
public void onEventTime(InternalTimer<K, String> timer) throws Exception {
long timerTimestamp = timer.getTimestamp();
String namespace = timer.getNamespace();
logger.trace("onEventTime @ {}", timerTimestamp);
switch (namespace) {
case CLEANUP_NAMESPACE_LEFT: {
long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
logger.trace("Removing from left buffer @ {}", timestamp);
leftBuffer.remove(timestamp);
break;
}
case CLEANUP_NAMESPACE_RIGHT: {
long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
logger.trace("Removing from right buffer @ {}", timestamp);
rightBuffer.remove(timestamp);
break;
}
default:
throw new RuntimeException("Invalid namespace " + namespace);
}
}
本文轉載自簡書,作者:LittleMagic
原文連結:
https://www.jianshu.com/p/45ec888332df