Flink 任務的圖結構中,部分算子是 chain 在一起的,因為 chain 在一起有很多好處(減少序列化和網絡開銷,提高效率),而算子 chain 在一起是需要條件的
Flink 任務在生成 JobGraph 的時候,會加入 chain 的概念,會判斷算子能否 chain 在一起
env.disableOperatorChaining()
// 檢測上下遊是否能 chain 在一起
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
// 下遊節點輸入邊為 1
return downStreamVertex.getInEdges().size() == 1
// 上下遊是同一個 sharingGroup
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
// 上下遊算子政策能 chain 在一起
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
// 上下遊邊的分區政策是 ForwardPartitioner
&& (edge.getPartitioner() instanceof ForwardPartitioner)
// shuffleMode 不是 batch
&& edge.getShuffleMode() != ShuffleMode.BATCH
// 上下遊并發度一樣
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
// streamGraph 是可以 chain
&& streamGraph.isChainingEnabled();
}
@VisibleForTesting
static boolean areOperatorsChainable(
StreamNode upStreamVertex,
StreamNode downStreamVertex,
StreamGraph streamGraph) {
StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> downStreamOperator = downStreamVertex.getOperatorFactory();
// 上下遊算子 有工廠類
if (downStreamOperator == null || upStreamOperator == null) {
return false;
}
// 上遊的政策是 NEVER , 下遊的政策不是 ALWAYS
if (upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER ||
downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS) {
return false;
}
// yielding operators cannot be chained to legacy sources
if (downStreamOperator instanceof YieldingOperatorFactory) {
// unfortunately the information that vertices have been chained is not preserved at this point
return !getHeadOperator(upStreamVertex, streamGraph).isStreamSource();
}
// 其他 都是可以 chain 的
return true;
}