Flink 任务的图结构中,部分算子是 chain 在一起的,因为 chain 在一起有很多好处(减少序列化和网络开销,提高效率),而算子 chain 在一起是需要条件的
Flink 任务在生成 JobGraph 的时候,会加入 chain 的概念,会判断算子能否 chain 在一起
env.disableOperatorChaining()
// 检测上下游是否能 chain 在一起
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
// 下游节点输入边为 1
return downStreamVertex.getInEdges().size() == 1
// 上下游是同一个 sharingGroup
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
// 上下游算子策略能 chain 在一起
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
// 上下游边的分区策略是 ForwardPartitioner
&& (edge.getPartitioner() instanceof ForwardPartitioner)
// shuffleMode 不是 batch
&& edge.getShuffleMode() != ShuffleMode.BATCH
// 上下游并发度一样
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
// streamGraph 是可以 chain
&& streamGraph.isChainingEnabled();
}
@VisibleForTesting
static boolean areOperatorsChainable(
StreamNode upStreamVertex,
StreamNode downStreamVertex,
StreamGraph streamGraph) {
StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> downStreamOperator = downStreamVertex.getOperatorFactory();
// 上下游算子 有工厂类
if (downStreamOperator == null || upStreamOperator == null) {
return false;
}
// 上游的策略是 NEVER , 下游的策略不是 ALWAYS
if (upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER ||
downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS) {
return false;
}
// yielding operators cannot be chained to legacy sources
if (downStreamOperator instanceof YieldingOperatorFactory) {
// unfortunately the information that vertices have been chained is not preserved at this point
return !getHeadOperator(upStreamVertex, streamGraph).isStreamSource();
}
// 其他 都是可以 chain 的
return true;
}