閱讀本文之前,請先閱讀Flink原理與實作系列前面的幾篇文章 :
<a href="#">Flink 原理與實作:架構和拓撲概覽</a>
<a href="#">Flink 原理與實作:如何生成 StreamGraph</a>
<a href="#">Flink 原理與實作:如何生成 JobGraph</a>
StreamGraph和JobGraph都是在client生成的,這篇文章将描述如何生成ExecutionGraph以及實體執行圖。同時會講解一個作業送出後如何被排程和執行。
client生成JobGraph之後,就通過submitJob送出至JobMaster。
在其構造函數中,會生成ExecutionGraph:
看下這個方法,比較長,略過了一些次要的代碼片斷:
可以看到,生成execution graph的代碼,主要是在最後一行,即ExecutionGraph.attachJobGraph方法:
可以看到,建立ExecutionJobVertex的重點就在它的構造函數中:
ExecutionJobVertex和ExecutionVertex是建立完了,但是ExecutionEdge還沒有建立呢,接下來看一下<code>attachJobGraph</code>方法中這一行代碼:
這個方法代碼如下:
看下<code>ExecutionVertex.connectSource</code>方法代碼:
<code>connectAllToAll</code>方法:
看這個方法之前,需要知道,ExecutionVertex的inputEdges變量,是一個二維資料。它表示了這個ExecutionVertex上每一個input所包含的ExecutionEdge清單。
即,如果ExecutionVertex有兩個不同的輸入:輸入A和B。其中輸入A的partition=1, 輸入B的partition=8,那麼這個二維數組inputEdges如下(為簡短,以irp代替IntermediateResultPartition)
[ ExecutionEdge[ A.irp[0]] ]
[ ExecutionEdge[ B.irp[0], B.irp[1], ..., B.irp[7] ]
是以上面的代碼就很容易了解了。
到這裡為止,ExecutionJobGraph就建立完成了。接下來看下這個ExecutionGraph是如何轉化成Task并開始執行的。
接下來我們以最簡單的mini cluster為例講解一下Task如何被排程和執行。
簡單略過client端job的送出和StreamGraph到JobGraph的翻譯,以及上面ExecutionGraph的翻譯。
送出後的job的流通過程大緻如下:
建立完JobMaster之後,JobMaster就會進行leader election,得到leader之後,會回調<code>grantLeadership</code>方法,進而調用<code>jobManager.start(leaderSessionID);</code>開始運作job。
重點是在下面這行,擷取到resource manage之後,就會回調<code>ResourceManagerLeaderListener.notifyLeaderAddress</code>,整個調用流如下:
然後終于來到了最核心的排程代碼,在<code>JobMaster.onResourceManagerRegistrationSuccess</code>方法中:
ExecutionGraph.scheduleForExecution --> ExecutionGraph.scheduleEager
這個方法會計算所有的ExecutionVertex總數,并為每個ExecutionVertex配置設定一個SimpleSlot(暫時不考慮slot sharing的情況),然後封裝成ExecutionAndSlot,顧名思義,即ExecutionVertex + Slot(更為貼切地說,應該是ExecutionAttempt + Slot)。
然後調用<code>execAndSlot.executionAttempt.deployToSlot(slot);</code>進行deploy,即<code>Execution.deployToSlot</code>。
這個方法先會進行一系列狀态遷移和檢查,然後進行deploy,比較核心的代碼如下:
ExecutionVertex.createDeploymentDescriptor方法中,包含了從Execution Graph到真正實體執行圖的轉換。如将IntermediateResultPartition轉化成ResultPartition,ExecutionEdge轉成InputChannelDeploymentDescriptor(最終會在執行時轉化成InputGate)。
最後通過RPC方法送出task,實際會調用到<code>TaskExecutor.submitTask</code>方法中。
這個方法會建立真正的Task,然後調用<code>task.startTaskThread();</code>開始task的執行。
在Task構造函數中,會根據輸入的參數,建立InputGate, ResultPartition, ResultPartitionWriter等。
而<code>startTaskThread</code>方法,則會執行<code>executingThread.start</code>,進而調用<code>Task.run</code>方法。
它的最核心的代碼如下:
這裡的invokable即為operator對象執行個體,通過反射建立。具體地,即為OneInputStreamTask,或者SourceStreamTask等。這個nameOfInvokableClass是哪裡生成的呢?其實早在生成StreamGraph的時候,這就已經确定了,見<code>StreamGraph.addOperator</code>方法:
這裡的<code>OneInputStreamTask.class</code>即為生成的StreamNode的vertexClass。這個值會一直傳遞,當StreamGraph被轉化成JobGraph的時候,這個值會被傳遞到JobVertex的invokableClass。然後當JobGraph被轉成ExecutionGraph的時候,這個值被傳入到ExecutionJobVertex.TaskInformation.invokableClassName中,一直傳到Task中。
那麼使用者真正寫的邏輯代碼在哪裡呢?比如word count中的Tokenizer,去了哪裡呢?
OneInputStreamTask的基類StreamTask,包含了headOperator和operatorChain。當我們調用<code>dataStream.flatMap(new Tokenizer())</code>的時候,會生成一個StreamFlatMap的operator,這個operator是一個AbstractUdfStreamOperator,而使用者的代碼<code>new Tokenizer</code>,即為它的userFunction。
是以再串回來,以OneInputStreamTask為例,Task的核心執行代碼即為<code>OneInputStreamTask.invoke</code>方法,它會調用<code>StreamTask.run</code>方法,這是個抽象方法,最終會調用其派生類的run方法,即OneInputStreamTask, SourceStreamTask等。
OneInputStreamTask的run方法代碼如下:
就是一直不停地循環調用<code>inputProcessor.processInput(operator, lock)</code>方法,即<code>StreamInputProcessor.processInput</code>方法:
上面的代碼中,<code>streamOperator.processElement(record);</code>才是真正處理使用者邏輯的代碼,以StreamFlatMap為例,即為它的processElement方法:
這樣,整個排程和執行邏輯就全部串起來啦。