“為什麼我的 Flink 作業 Web UI 中隻顯示出了一個框,并且 Records Sent 和Records Received 名額都是 0 ?是我的程式寫得有問題嗎?”
Flink 算子鍊簡介
筆者在 Flink 社群群裡經常能看到類似這樣的疑問。這種情況幾乎都不是程式有問題,而是因為 Flink 的 operator chain ——即算子鍊機制導緻的,即送出的作業的執行計劃中,所有算子的并發執行個體(即 sub-task )都因為滿足特定條件而串成了整體來執行,自然就觀察不到算子之間的資料流量了。
當然上述是一種特殊情況。我們更常見到的是隻有部分算子得到了算子鍊機制的優化,如官方文檔中出現過多次的下圖所示,注意 Source 和 map() 算子。

算子鍊機制的好處是顯而易見的:所有 chain 在一起的 sub-task 都會在同一個線程(即 TaskManager 的 slot)中執行,能夠減少不必要的資料交換、序列化和上下文切換,進而提高作業的執行效率。
鋪墊了這麼多,接下來就通過源碼簡單看看算子鍊産生的條件,以及它是如何在 Flink Runtime 中實作的。
邏輯計劃中的算子鍊
對 Flink Runtime 稍有了解的看官應該知道,Flink 作業的執行計劃會用三層圖結構來表示,即:
- StreamGraph —— 原始邏輯執行計劃
- JobGraph —— 優化的邏輯執行計劃(Web UI 中看到的就是這個)
- ExecutionGraph —— 實體執行計劃
算子鍊是在優化邏輯計劃時加入的,也就是由 StreamGraph 生成 JobGraph 的過程中。那麼我們來到負責生成 JobGraph 的 o.a.f.streaming.api.graph.StreamingJobGraphGenerator 類,檢視其核心方法 createJobGraph() 的源碼。
private JobGraph createJobGraph() {
// make sure that all vertices start immediately
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
setChaining(hashes, legacyHashes, chainedOperatorHashes);
setPhysicalEdges();
// 略......
return jobGraph;
}
可見,該方法會先計算出 StreamGraph 中各個節點的哈希碼作為唯一辨別,并建立一個空的 Map 結構儲存即将被鍊在一起的算子的哈希碼,然後調用 setChaining() 方法,如下源碼所示。
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
}
}
可見是逐個周遊 StreamGraph 中的 Source 節點,并調用 createChain() 方法。createChain() 是邏輯計劃層建立算子鍊的核心方法,完整源碼如下,有點長。
private List<StreamEdge> createChain(
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
int chainIndex,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
if (!builtVertices.contains(startNodeId)) {
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
byte[] primaryHashBytes = hashes.get(currentNodeId);
OperatorID currentOperatorId = new OperatorID(primaryHashBytes);
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
}
if (currentNode.getOutputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
}
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
if (currentNodeId.equals(startNodeId)) {
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(currentOperatorId);
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
先解釋一下方法開頭建立的 3 個 List 結構:
- transitiveOutEdges:目前算子鍊在 JobGraph 中的出邊清單,同時也是 createChain() 方法的最終傳回值;
- chainableOutputs:目前能夠鍊在一起的 StreamGraph 邊清單;
- nonChainableOutputs:目前不能夠鍊在一起的 StreamGraph 邊清單。
接下來,從 Source 開始周遊 StreamGraph 中目前節點的所有出邊,調用 isChainable() 方法判斷是否可以被鍊在一起(這個判斷邏輯稍後會講到)。可以連結的出邊被放入 chainableOutputs 清單,否則放入 nonChainableOutputs 清單。
對于 chainableOutputs 中的邊,就會以這些邊的直接下遊為起點,繼續遞歸調用createChain() 方法延展算子鍊。對于 nonChainableOutputs 中的邊,由于目前算子鍊的延展已經到頭,就會以這些“斷點”為起點,繼續遞歸調用 createChain() 方法試圖建立新的算子鍊。也就是說,邏輯計劃中整個建立算子鍊的過程都是遞歸的,亦即實際傳回時,是從 Sink 端開始傳回的。
然後要判斷目前節點是不是算子鍊的起始節點。如果是,則調用 createJobVertex()方法為算子鍊建立一個 JobVertex( 即 JobGraph 中的節點),也就形成了我們在Web UI 中看到的 JobGraph 效果:
最後,還需要将各個節點的算子鍊資料寫入各自的 StreamConfig 中,算子鍊的起始節點要額外儲存下 transitiveOutEdges。StreamConfig 在後文的實體執行階段會再次用到。
形成算子鍊的條件
來看看 isChainable() 方法的代碼。 由此可得,上下遊算子能夠 chain 在一起的條件還是非常苛刻的(老生常談了),列舉如下:
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}
- 上下遊算子執行個體處于同一個 SlotSharingGroup 中(之後再提);
- 下遊算子的連結政策(ChainingStrategy)為 ALWAYS ——既可以與上遊連結,也可以與下遊連結。我們常見的 map()、filter() 等都屬此類;
- 上遊算子的連結政策為 HEAD 或 ALWAYS。HEAD 政策表示隻能與下遊連結,這在正常情況下是 Source 算子的專屬;
- 兩個算子間的實體分區邏輯是 ForwardPartitioner ,可參見之前寫過的《聊聊Flink DataStream 的八種實體分區邏輯》;
- 兩個算子間的 shuffle 方式不是批處理模式;
- 上下遊算子執行個體的并行度相同;
- 沒有禁用算子鍊。
禁用算子鍊
使用者可以在一個算子上調用 startNewChain() 方法強制開始一個新的算子鍊,或者調用 disableOperatorChaining() 方法指定它不參與算子鍊。代碼位于 SingleOutputStreamOperator 類中,都是通過改變算子的連結政策實作的。
@PublicEvolving
public SingleOutputStreamOperator<T> disableChaining() {
return setChainingStrategy(ChainingStrategy.NEVER);
}
@PublicEvolving
public SingleOutputStreamOperator<T> startNewChain() {
return setChainingStrategy(ChainingStrategy.HEAD);
}
如果要在整個運作時環境中禁用算子鍊,調用 StreamExecutionEnvironment.disableOperatorChaining() 方法即可。
實體計劃中的算子鍊
在 JobGraph 轉換成 ExecutionGraph 并交由 TaskManager 執行之後,會生成排程執行的基本任務單元 ——StreamTask,負責執行具體的 StreamOperator 邏輯。在StreamTask.invoke() 方法中,初始化了狀态後端、checkpoint 存儲和定時器服務之後,可以發現:
operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();
構造出了一個 OperatorChain 執行個體,這就是算子鍊在實際執行時的形态。解釋一下OperatorChain 中的幾個主要屬性。
private final StreamOperator<?>[] allOperators;
private final RecordWriterOutput<?>[] streamOutputs;
private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;
private final OP headOperator;
- headOperator:算子鍊的第一個算子,對應 JobGraph 中的算子鍊起始節點;
- allOperators:算子鍊中的所有算子,倒序排列,即 headOperator 位于該數組的末尾;
- streamOutputs:算子鍊的輸出,可以有多個;
- chainEntryPoint:算子鍊的“入口點”,它的含義将在後文說明。
由上可知,所有 StreamTask 都會建立 OperatorChain。如果一個算子無法進入算子鍊,也會形成一個隻有 headOperator 的單個算子的 OperatorChain。
OperatorChain 構造方法中的核心代碼如下。
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge outEdge = outEdgesInOrder.get(i);
RecordWriterOutput<?> streamOutput = createStreamOutput(
recordWriters.get(i),
outEdge,
chainedConfigs.get(outEdge.getSourceId()),
containingTask.getEnvironment());
this.streamOutputs[i] = streamOutput;
streamOutputMap.put(outEdge, streamOutput);
}
// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
this.chainEntryPoint = createOutputCollector(
containingTask,
configuration,
chainedConfigs,
userCodeClassloader,
streamOutputMap,
allOps);
if (operatorFactory != null) {
WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = getChainEntryPoint();
headOperator = operatorFactory.createStreamOperator(containingTask, configuration, output);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge());
} else {
headOperator = null;
}
// add head operator to end of chain
allOps.add(headOperator);
this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);
首先會周遊算子鍊整體的所有出邊,并調用 createStreamOutput() 方法建立對應的下遊輸出 RecordWriterOutput。然後就會調用 createOutputCollector() 方法建立實體的算子鍊,并傳回 chainEntryPoint,這個方法比較重要,部分代碼如下。
private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators) {
List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);
// create collectors for the network outputs
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
@SuppressWarnings("unchecked")
RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// Create collectors for the chained outputs
for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
int outputId = outputEdge.getTargetId();
StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
WatermarkGaugeExposingOutput<StreamRecord<T>> output = createChainedOperator(
containingTask,
chainedOpConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperators,
outputEdge.getOutputTag());
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// 以下略......
}
該方法從上一節提到的 StreamConfig 中分别取出出邊和連結邊的資料,并建立各自的 Output。出邊的 Output 就是将資料發往算子鍊之外下遊的 RecordWriterOutput,而連結邊的輸出要靠 createChainedOperator() 方法。
private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators,
OutputTag<IN> outputTag) {
// create the output that the operator writes to first. this may recursively create more operators
WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector(
containingTask,
operatorConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperators);
// now create the operator and give it the output collector to write its output to
StreamOperatorFactory<OUT> chainedOperatorFactory = operatorConfig.getStreamOperatorFactory(userCodeClassloader);
OneInputStreamOperator<IN, OUT> chainedOperator = chainedOperatorFactory.createStreamOperator(
containingTask, operatorConfig, chainedOperatorOutput);
allOperators.add(chainedOperator);
WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
}
else {
TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
}
// wrap watermark gauges since registered metrics must be unique
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue);
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue);
return currentOperatorOutput;
}
我們一眼就可以看到,這個方法遞歸調用了上述 createOutputCollector() 方法,與邏輯計劃階段類似,通過不斷延伸 Output 來産生 chainedOperator(即算子鍊中除了headOperator 之外的算子),并逆序傳回,這也是 allOperators 數組中的算子順序為倒序的原因。
chainedOperator 産生之後,将它們通過 ChainingOutput 連接配接起來,形成如下圖所示的結構。
圖檔來自: http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/
最後來看看 ChainingOutput.collect() 方法是如何輸出資料流的。
@Override
public void collect(StreamRecord<T> record) {
if (this.outputTag != null) {
// we are only responsible for emitting to the main input
return;
}
pushToOperator(record);
}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
// we are only responsible for emitting to the side-output specified by our
// OutputTag.
return;
}
pushToOperator(record);
}
protected <X> void pushToOperator(StreamRecord<X> record) {
try {
// we know that the given outputTag matches our OutputTag so the record
// must be of the type that our operator expects.
@SuppressWarnings("unchecked")
StreamRecord<T> castRecord = (StreamRecord<T>) record;
numRecordsIn.inc();
operator.setKeyContextElement1(castRecord);
operator.processElement(castRecord);
}
catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
可見是通過調用連結算子的 processElement() 方法,直接将資料推給下遊處理了。也就是說,OperatorChain 完全可以看做一個由 headOperator 和 streamOutputs組成的單個算子,其内部的 chainedOperator 和 ChainingOutput 都像是被黑盒遮蔽,同時沒有引入任何 overhead。
打通了算子鍊在執行層的邏輯,看官應該會明白 chainEntryPoint 的含義了。由于它位于遞歸傳回的終點,是以它就是流入算子鍊的起始 Output,即上圖中指向 headOperator 的 RecordWriterOutput。
文章轉載自簡書,作者:LittleMagic。原文連結:
https://www.jianshu.com/p/799744e347c7