天天看點

【源碼】Flink 算子 chain 在一起的條件

Flink 任務的圖結構中,部分算子是 chain 在一起的,因為 chain 在一起有很多好處(減少序列化和網絡開銷,提高效率),而算子 chain 在一起是需要條件的

Flink 任務在生成 JobGraph 的時候,會加入 chain 的概念,會判斷算子能否 chain 在一起

env.disableOperatorChaining()           
// 檢測上下遊是否能 chain 在一起
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
  StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
  StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

  // 下遊節點輸入邊為 1
  return downStreamVertex.getInEdges().size() == 1
    // 上下遊是同一個 sharingGroup
    && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
    // 上下遊算子政策能 chain 在一起
    && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
    // 上下遊邊的分區政策是 ForwardPartitioner
    && (edge.getPartitioner() instanceof ForwardPartitioner)
    // shuffleMode 不是 batch
    && edge.getShuffleMode() != ShuffleMode.BATCH
    // 上下遊并發度一樣
    && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
    // streamGraph 是可以 chain
    && streamGraph.isChainingEnabled();
}

@VisibleForTesting
static boolean areOperatorsChainable(
  StreamNode upStreamVertex,
  StreamNode downStreamVertex,
  StreamGraph streamGraph) {
  StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory();
  StreamOperatorFactory<?> downStreamOperator = downStreamVertex.getOperatorFactory();
  // 上下遊算子 有工廠類
  if (downStreamOperator == null || upStreamOperator == null) {
    return false;
  }
  // 上遊的政策是 NEVER , 下遊的政策不是 ALWAYS
  if (upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER ||
    downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS) {
    return false;
  }

  // yielding operators cannot be chained to legacy sources
  if (downStreamOperator instanceof YieldingOperatorFactory) {
    // unfortunately the information that vertices have been chained is not preserved at this point
    return !getHeadOperator(upStreamVertex, streamGraph).isStreamSource();
  }
  // 其他 都是可以 chain 的
  return true;
}