天天看點

Flink原理與實作:如何生成ExecutionGraph及實體執行圖

閱讀本文之前,請先閱讀Flink原理與實作系列前面的幾篇文章 :

<a href="#">Flink 原理與實作:架構和拓撲概覽</a>

<a href="#">Flink 原理與實作:如何生成 StreamGraph</a>

<a href="#">Flink 原理與實作:如何生成 JobGraph</a>

StreamGraph和JobGraph都是在client生成的,這篇文章将描述如何生成ExecutionGraph以及實體執行圖。同時會講解一個作業送出後如何被排程和執行。

client生成JobGraph之後,就通過submitJob送出至JobMaster。

在其構造函數中,會生成ExecutionGraph:

看下這個方法,比較長,略過了一些次要的代碼片斷:

可以看到,生成execution graph的代碼,主要是在最後一行,即ExecutionGraph.attachJobGraph方法:

可以看到,建立ExecutionJobVertex的重點就在它的構造函數中:

ExecutionJobVertex和ExecutionVertex是建立完了,但是ExecutionEdge還沒有建立呢,接下來看一下<code>attachJobGraph</code>方法中這一行代碼:

這個方法代碼如下:

看下<code>ExecutionVertex.connectSource</code>方法代碼:

<code>connectAllToAll</code>方法:

看這個方法之前,需要知道,ExecutionVertex的inputEdges變量,是一個二維資料。它表示了這個ExecutionVertex上每一個input所包含的ExecutionEdge清單。

即,如果ExecutionVertex有兩個不同的輸入:輸入A和B。其中輸入A的partition=1, 輸入B的partition=8,那麼這個二維數組inputEdges如下(為簡短,以irp代替IntermediateResultPartition)

[ ExecutionEdge[ A.irp[0]] ]

[ ExecutionEdge[ B.irp[0], B.irp[1], ..., B.irp[7] ]

是以上面的代碼就很容易了解了。

到這裡為止,ExecutionJobGraph就建立完成了。接下來看下這個ExecutionGraph是如何轉化成Task并開始執行的。

接下來我們以最簡單的mini cluster為例講解一下Task如何被排程和執行。

簡單略過client端job的送出和StreamGraph到JobGraph的翻譯,以及上面ExecutionGraph的翻譯。

送出後的job的流通過程大緻如下:

建立完JobMaster之後,JobMaster就會進行leader election,得到leader之後,會回調<code>grantLeadership</code>方法,進而調用<code>jobManager.start(leaderSessionID);</code>開始運作job。

重點是在下面這行,擷取到resource manage之後,就會回調<code>ResourceManagerLeaderListener.notifyLeaderAddress</code>,整個調用流如下:

然後終于來到了最核心的排程代碼,在<code>JobMaster.onResourceManagerRegistrationSuccess</code>方法中:

ExecutionGraph.scheduleForExecution --&gt; ExecutionGraph.scheduleEager

這個方法會計算所有的ExecutionVertex總數,并為每個ExecutionVertex配置設定一個SimpleSlot(暫時不考慮slot sharing的情況),然後封裝成ExecutionAndSlot,顧名思義,即ExecutionVertex + Slot(更為貼切地說,應該是ExecutionAttempt + Slot)。

然後調用<code>execAndSlot.executionAttempt.deployToSlot(slot);</code>進行deploy,即<code>Execution.deployToSlot</code>。

這個方法先會進行一系列狀态遷移和檢查,然後進行deploy,比較核心的代碼如下:

ExecutionVertex.createDeploymentDescriptor方法中,包含了從Execution Graph到真正實體執行圖的轉換。如将IntermediateResultPartition轉化成ResultPartition,ExecutionEdge轉成InputChannelDeploymentDescriptor(最終會在執行時轉化成InputGate)。

最後通過RPC方法送出task,實際會調用到<code>TaskExecutor.submitTask</code>方法中。

這個方法會建立真正的Task,然後調用<code>task.startTaskThread();</code>開始task的執行。

在Task構造函數中,會根據輸入的參數,建立InputGate, ResultPartition, ResultPartitionWriter等。

而<code>startTaskThread</code>方法,則會執行<code>executingThread.start</code>,進而調用<code>Task.run</code>方法。

它的最核心的代碼如下:

這裡的invokable即為operator對象執行個體,通過反射建立。具體地,即為OneInputStreamTask,或者SourceStreamTask等。這個nameOfInvokableClass是哪裡生成的呢?其實早在生成StreamGraph的時候,這就已經确定了,見<code>StreamGraph.addOperator</code>方法:

這裡的<code>OneInputStreamTask.class</code>即為生成的StreamNode的vertexClass。這個值會一直傳遞,當StreamGraph被轉化成JobGraph的時候,這個值會被傳遞到JobVertex的invokableClass。然後當JobGraph被轉成ExecutionGraph的時候,這個值被傳入到ExecutionJobVertex.TaskInformation.invokableClassName中,一直傳到Task中。

那麼使用者真正寫的邏輯代碼在哪裡呢?比如word count中的Tokenizer,去了哪裡呢?

OneInputStreamTask的基類StreamTask,包含了headOperator和operatorChain。當我們調用<code>dataStream.flatMap(new Tokenizer())</code>的時候,會生成一個StreamFlatMap的operator,這個operator是一個AbstractUdfStreamOperator,而使用者的代碼<code>new Tokenizer</code>,即為它的userFunction。

是以再串回來,以OneInputStreamTask為例,Task的核心執行代碼即為<code>OneInputStreamTask.invoke</code>方法,它會調用<code>StreamTask.run</code>方法,這是個抽象方法,最終會調用其派生類的run方法,即OneInputStreamTask, SourceStreamTask等。

OneInputStreamTask的run方法代碼如下:

就是一直不停地循環調用<code>inputProcessor.processInput(operator, lock)</code>方法,即<code>StreamInputProcessor.processInput</code>方法:

上面的代碼中,<code>streamOperator.processElement(record);</code>才是真正處理使用者邏輯的代碼,以StreamFlatMap為例,即為它的processElement方法:

這樣,整個排程和執行邏輯就全部串起來啦。