天天看點

Flink送出流程和架構

一、Flink送出任務的流程

  Flink任務送出後,Client向HDFS上傳Flink的jar包和配置,之後向Yarn ResourceManager送出任務,ResourceManager配置設定Container資源并通知對應的NodeManager啟動

ApplicationMaster,ApplicationMaster啟動後加載Flink的jar包和配置建構環境,然後啟動JobManager;之後Application Master向ResourceManager申請資源啟動TaskManager

,ResourceManager配置設定Container資源後,由ApplicationMaster通知資源所在的節點的NodeManager啟動TaskManager,NodeManager加載Flink的Jar包和配置建構環境并啟動TaskManager,TaskManager啟動向JobManager發送心跳,并等待JobManager向其配置設定任務。

二、Flink任務排程原理

Flink送出流程和架構

1、Program Code:我們編寫的Flink應用程式代碼。

2、JobClient:JobClient不是Flink程式執行的内部部分,但它是任務執行的起點。JobClient負責接受使用者的程式代碼,然後建立資料流,将資料流送出給

JobManager以便進一步執行。執行完成後,JobClient将結果傳回給使用者。

3、JobManager:主程序(也稱為作業管理器)協調和管理程式的執行。它的主要指責包括安排任務,管理checkpoint,故障恢複等。機器叢集中至少要有一個master,

master負責排程task,協調checkpoints和容災,高可用設定的話可以有多個master,但要保證一個是leader,其他是standby;JobManager包含ActorSystem、Scheduler、

CheckPoint三個重要的元件。

4、TaskManager:從JobManager處接收需要部署的Task。TaskManager是在JVM中一個或多個線程中執行任務的工作節點。任務執行的并行性由每個TaskManager上可用

的任務槽決定。每個任務代表配置設定給任務槽的一組資源。例如:如果TaskManager有四個插槽,那麼它将為每個插槽配置設定25%的記憶體。可以在任務槽中運作一個或多個線程。

同一插槽中的線程共享相同的JVM。同一JVM中的任務共享TCP連接配接和心跳資訊。TaskManager的一個Slot代表一個可用線程,該線程具有固定的記憶體,注意Slot隻對記憶體隔離,

沒有對CPU隔離。

預設情況下,Flink允許子任務共享Slot,即使它們是不同的task的subtask,隻要他們來自相同的job。這種共享可以有更好的資源使用率。

TaskManager是Flink的worker節點,他負責Flink中本機slot資源的管理以及具體task的執行。

TaskManager上的基本資源機關是slot,一個作業的task最終會部署在一個TM的slot上運作,TM會負責維護本地的slot資源清單,

并來與Flink Master和JobManager通信。

三、Work與Slot

  每一個worker(TaskManager)是一個JVM程序,他可能會在獨立的線程上執行一個或多個subtask。為了控制一個worker能接收到多少個task,worker通過task slot來進行控制

(一個worker至少要有一個task slot)。 

  每個task slot表示TaskManager擁有資源的一個固定大小的子集。假如一個TaskManager有三個slot,那麼它會将其管理的記憶體分成三份給各個slot。資源slot化意味着一個subtask将不需要跟來自其他的job的subtask競争被管理的記憶體,取而代之的是它将擁有一定數量的記憶體儲備。需要注意的是,這裡不會涉及到CPU的隔離,slot目前僅僅用來隔離

task的受管理的記憶體。

  通過調整task slot的數量,允許使用者定義subtask之間如何互相隔離。如果一個TaskManager一個slot,那将意味着每個task group運作在獨立的JVM中(該JVM可能是通過一個特定的容器啟動的),而一個TaskManager多個slot意味着更多的subtask可以共享同一個JVM。而在同一個JVM程序中的task将共享TCP連接配接(基于多路複用)和心跳資訊。他們也

可能共享資料集和資料結構,是以這減少了每個task的負載。

四、Flink程式架構

每個Flink程式都包含以下的若幹流程

獲得一個執行環境:(Execution Environment) 相當于Spark中的SparkContext

加在/建立初始資料:(Source)

指定轉換這些資料:(Transformation)

指定放置計算結果的位置:(Sink)

觸發程式執行

五、 Environment

執行環境StreamExecutionEnvironment是所有Flink程式的基礎。

StreamExecutionEnvironment.getExecutionEnvironment(根據運作情況,傳回本地或者叢集的運作環境)  預設分區是8
  建立一個執行環境,表示目前執行程式的上下文。如果程式是獨立調用的,則此方法傳回本地執行環境:如果從指令行用戶端調用程式以送出到叢集,則此方法傳回此叢集的執行環境,也就是說,
getEexecutionEnvironment會根據查詢運作的方式決定傳回什麼樣的運作環境,是最常用的一種建立執行環境的方式。

StreamExecutionEnvironment.createLocalEnvironment(1)   -》開始隻有一個分區
  傳回本地執行環境,需要在調用時指定預設的并行度。

StreamExecutionEnvironment.createRemoteEnvironment("localhost",8800)
  傳回叢集執行環境,将Jar送出到遠端伺服器。需要在調用時指定JobManager的IP和端口号,并指定要在叢集中運作的Jar包。
  

      

六、Source

1、基于File的資料源

1.1、readTextFile(path)

1.1、readTextFile(path)
一列一列的讀取遵循TextInputFormat規範的文本檔案,并将結果作為String傳回。


      
Flink送出流程和架構