天天看點

Flink運作時之用戶端送出作業圖-上用戶端送出作業圖流處理程式送出作業圖批處理程式送出作業圖公共的送出流程

作業圖(JobGraph)是Flink的運作時所能了解的作業表示,無論程式通過是DataStream還是DataSet

API編寫的,它們的JobGraph送出給JobManager以及之後的處理都将得到統一。本篇我們将分析用戶端如何送出JobGraph給JobManager。

在前面講解Flink的核心概念的時候我們談到了Flink利用了“惰性求值”的概念,隻有當最終調用execute方法時,才會真正開始執行。是以,execute方法是我們的切入點。

DataStream API所編寫的程式生成作業圖之後,在送出時産生的方法調用時序圖示意如下:

上圖中的多個run方法是同名的方法重載。

從時序圖中可以看到,ClusterClient對其自身抽象方法submitJob的調用是觸發作業圖送出的方法。随後真正的送出邏輯由JobClient實作。

ClusterClient封裝了送出一個程式到遠端叢集的必要的功能,而StandaloneClusterClient則擴充了ClusterClient的功能,它專門針對獨立的叢集提供服務,這兩個類都位于flink-clients子產品中。JobClient則負責将使用者的Job送出給JobManager,它充當了送出代理的角色,并傳回表示作業執行結果的JobExecutionResult對象。

JobClient是送出所有類型的Job的統一入口,具體的送出細節我們将會在“公共的送出流程”中詳細分析。

利用DataSet API所編寫的程式生成作業圖之後,在送出時産生的方法調用的時序圖如下:

上圖中出現多個重名的run方法為同名方法重載。

從上圖中可以看到,批處理程式的JobGraph跟流處理程式的JobGraph在送出之前有非常明顯的不同。它引入了PlanExecutor作為Flink程式的計劃執行器。而RemoteExecutor是PlanExecutor的實作,用于将程式送出給遠端的Flink叢集。具體的送出動作被進一步委托給ClusterClient及其實作(StandaloneClusterClient)最終同樣被JobClient代理送出給JobManager。

從前面的時序圖可見Flink對于不同類型的程式的送出流程最終是殊途同歸的。是以,接下來我們将對公共的送出流程進行分析。一個程式的JobGraph真正被送出始于對JobClient的submitJobAndWait方法的調用。

submitJobAndWait方法用于将一個JobGraph發送到指定的JobClient

actor,随後它會将該JobGraph轉發給JobManager。該方法會一直阻塞,直到該作業執行完成或者感覺不到JobManager的存活。如果作業被順利執行完成則傳回JobExecutionResult對象而如果JobManager産生故障,則抛出抛出JobExecutionException異常。

一個JobGraph從送出開始會經過多個對象層層遞交,各個對象之間的互動關系如下圖所示:

JobClient在其中起到了“橋接”作用,它橋接了同步的方法調用和異步的消息通信。更具體得說,JobClient可以看做是一個“靜态類”提供了一些靜态方法,這裡我們主要關注上面的submitJobAndWait方法,該方法内部封裝了Actor之間的異步通信(具體的通信對象是JobClientActor,它負責跟JobManager的ActorSystem的Actor對象進行通信),并以阻塞的形式傳回結果。而ClusterClient隻需調用JobClient的submitJobAndWait方法,而無需關注其内部是如何實作的。

通過調用JobClient的submitJobAndWait靜态方法,會觸發基于Akka的Actor之間的消息通信來完成後續的送出JobGraph的動作。這之間的互動示意圖如下:

這裡總共有兩個ActorSystem,一個歸屬于JobClient,另一個歸屬于JobManager。在submitJobAndWait方法中,其首先會建立一個JobClientActor的ActorRef:

然後向其發起一個SubmitJobAndWait消息,該消息将JobGraph的執行個體送出給JobClientActor。發起模式是ask,它表示需要一個應答消息。

Akka的消息通信模型有兩種: Fire and forget:消息的生産者不期望從消息的消費者那裡得到應答。這種消息會以異步的形式發送,發送方法在發送完成之後立即傳回。Akka的actor使用tell方法發送這種消息。 Send and receive:消息的生産者期待并将等待從消費者那裡得到應答。這種消息也會以異步的形式發送,發送完成後會傳回一個Future對象,該對象表示一個潛在的應答。Akka的actor使用ask方法發送這種消息,并通過Future來擷取響應。

JobClient向JobClientActor發送消息的代碼段如下:

該SubmitJobAndWait消息被JobClientActor接收後,最終通過調用tryToSubmitJob方法觸發真正的送出動作。在tryToSubmitJob方法中,一個JobGraph的送出将會分為兩步:

将使用者程式相關的Jar包上傳至JobManager;

給JobManager Actor發送封裝JobGraph的SubmitJob消息;

随後,JobManager

Actor會接收到來自JobClientActor的SubmitJob消息,進而觸發submitJob方法,該方法的執行主體已經是JobManager了。submitJob包含的邏輯較為複雜,且任何一個檢測或者子調用所産生的異常都可能會導緻送出失敗。我們列舉一下該方法完成的主要任務:

向BlobLibraryCacheManager注冊該Job;

建構ExecutionGraph對象;

對JobGraph中的每個頂點進行初始化;

将DAG拓撲中從source開始排序,排序後的頂點集合附加到ExecutionGraph對象;

擷取檢查點相關的配置,并将其設定到ExecutionGraph對象;

向ExecutionGraph注冊相關的listener;

執行恢複操作或者将JobGraph資訊寫入SubmittedJobGraphStore以在後續用于恢複目的;

響應給用戶端JobSubmitSuccess消息;

對ExecutionGraph對象進行排程執行;

如果送出流程順利,使用者程式包以及描述Job的JobGraph将會被JobManager接收,随後JobManager會對Job進行排程、部署并執行。JobClient會阻塞等待送出結果傳回。在得到傳回結果之後,先進行解析判斷它是否是Job被成功執行後傳回的結果:

還是失敗後傳回的結果:

以上就是批處理作業和流處理作業共同的送出流程,這中間涉及了JobManager接收到使用者送出後一系列處理,這部分的處理細節我們随後進行分析。

原文釋出時間為:2017-03-31

本文作者:vinoYang