Flink運作架構
- 一、Flink運作元件
-
- 1、作業管理器(JobManager)
- 2、任務管理器(TaskManager)
- 3、資料總管(ResourceManager)
- 4、分發器(Dispatcher)
- 二、任務執行流程
-
- 1.任務送出流程
- 2.任務送出流程(with Yarn)
- 3、任務排程原理
- 4、TaskManager和Taskslot關系
-
- 4.1、對slot(插槽),以及core,partition,Parallelism(并行度)了解。
- 三、程式與資料流
-
- 1、Dataflow
- 2、執行圖(ExecutionGraph)
- 3、任務鍊(Operator Chains)
- 4、并行度(Parallelism)
一、Flink運作元件
- 作業管理器(JobManager)
- 任務管理器(TaskManager)
- 資料總管(ResourceManager)
- 分發器(Dispacher)
同時運作在一個Master程序中,以上管理器都為線程。
1、作業管理器(JobManager)
控制一個應用程式執行的主程序,
- 每一個flink程式都會被一個不同的JobManager所控制執行。相當于一段flink的程式代碼。
- 一個JobManager會接收一個需要執行的應用程式,這個應用程式中包含:作業圖(JobGraph),邏輯資料流圖(Local dataFlow graph),和打包的所有的類,庫,以及其他所需要對應的jar包。
- JobManager會将擷取的JobGraph轉換為對應的實體層面的資料流圖,也就是ExecutionGraph(執行圖),執行圖中包含了所有可以并行執行的任務。
- JobManager會向ResourceManager申請此次任務所需要的資源,該資源即為TaskManager上的插槽(slot),擷取資源之後,會将得到的ExecutionGraph發送到對應的TaskManager上。
- JobManager在發送執行圖之後,會負責對中央協調的操作。例如對于檢查點checkpoints的協調。
2、任務管理器(TaskManager)
是Flink運作中,執行任務的工作程序。
- flink中會有多個TaskManager運作。每一個TaskManager上的插槽(slot)限制着每個TaskManager能夠執行任務的數量。
- 啟動之後,TaskManager會向資料總管注冊它的插槽;收到資料總管的指令後,TaskManager就會将一個或者多個插槽提供給JobManager調用。JobManager就可以向插槽配置設定任務(tasks)來執行了。
- 在執行過程中,一個TaskManager可以跟其它運作同一應用程式的TaskManager交換資料。
3、資料總管(ResourceManager)
主要負責管理任務管理器(TaskManager)的插槽(slot)
- TaskManager 插槽是Flink中定義的處理資源單元。
- Flink為不同的環境和資源管理工具提供了不同資料總管,比如YARN、Mesos、K8s,以及Standalone部署,一般為yarn。
- 當JobManager申請插槽資源時,ResourceManager會将有空閑插槽的TaskManager配置設定給JobManager。如果Flink的資料總管沒有足夠的插槽來滿足JobManager的請求,它還可以向資源提供平台(YARN的資料總管)發起會話,以提供啟動TaskManager程序的容器。
4、分發器(Dispatcher)
- 可以跨作業運作,它為應用送出提供了REST接口。
- 當一個應用被送出執行時,分發器就會啟動并将應用移交給一個JobManager。
- 分發器也會啟動一個Web UI,用來友善地展示和監控作業執行的資訊。
- 分發器在架構中可能并不是必需的,這取決于應用送出運作的方式。
二、任務執行流程
1.任務送出流程

個人了解:
- 首先App送出應用,兩種方式。①通過flink run指令送出,該送出方式不經過Dispatcher,直接送出。②通過flink-web頁面8081端口進行送出,會經過Dispatcher。
- 送出的應用會送出到JobManager上,JobManager收到應用後,向ResourceManager請求資源(slots),同時TaskManager啟動,并向ResourceManager上請求注冊slots。
- TaskManager向JobManager提供執行該應用所需要的資源。
- JobManager将該任務配置設定到對應的TaskManager的插槽中(slots),任務在TaskManagre的插槽中執行。
2.任務送出流程(with Yarn)
每次JobManager收到作業時,都會向ResourceManager請求資源,當ResourceManager資源不夠時,會向yarn叢集申請資源。
3、任務排程原理
代碼(流程圖) -> StreamGraph -> JobGraph -> ExecutionGraph -> 實體執行圖
- 我們寫的flink代碼會在打包編譯的過程中,會存在優化,生成一個作業圖。
- 然後通過分發器(Dispatcher)将作業圖送出到JobManager。
- 通過排程器(scheduler)将任務排程到任務管理器(TaskManager)上執行。
- 每一個任務管理器都有對應的插槽(配置檔案可以指定),每個任務管理器都是一個jvm程序,而每個任務插槽為一個線程。
- flink中通信子產品用的是Actor System。
4、TaskManager和Taskslot關系
- Flink 中每一個 TaskManager 都是一個JVM程序,它中的一個或多個 subtask可能會在獨立的線程上執行。
- 為了控制一個 TaskManager 能接收多少個 task, TaskManager 通過 task slot 來進行控制(一個 TaskManager 至少有一個 slot)
二、Flink運作架構一、Flink運作元件二、任務執行流程三、程式與資料流 - 預設情況下,Flink 允許子任務共享 slot。 這樣的結果是,一個 slot 可以儲存作業的整個管道。
- Task Slot 是靜态的概念,是指 TaskManager 具有的并發執行能力
4.1、對slot(插槽),以及core,partition,Parallelism(并行度)了解。
- 對全局設定并行度就是全局的并行度,對單個算子設定并行度,隻作用于單個算子,一次程式申請的插槽的個數是max(全局的并行度,單個算子的并行度)。
三、程式與資料流
1、Dataflow
- flink所有的程式都是由三部分組成:Source,transform,Sink。
- Source:讀取資料源。
- transform:通過對應的算子對資料進行轉換和處理。
- Sink:對流的輸出。
代碼中還有,程式開始時的擷取開發環境,程式完成後,需要對程式進行執行(對于代碼而言)。
flink上運作的程式會被映射成一個邏輯資料流(dataflow),每一個dataflow包括三個部分source,transform,sink,
每一個dataflow以一個或多個source開始,以一個或多個sink結束。類似DAG(有向無環圖)。
大部分情況是一對一。
2、執行圖(ExecutionGraph)
flink執行圖可以分為四層: StreamGraph -> JobGraph -> ExecutionGraph -> 實體執行圖
-
StreamGraph(流圖):使用者通過StreamAPI編寫程式生成的最初的圖,用來表示程式的拓撲結構。
真實存在的資料結構:全類名:org.apache.flink.streaming.api.graph.StreamGraph
//源碼中的StreamGraph(流圖)
! wordcount => env.execute();
@ 1681行 execute(DEFAULT_JOB_NAME);
# 1699行 return execute(getStreamGraph(jobName));
$ 1848行 return getStreamGraph(jobName, true);
^ 1863行 StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
-
JobGraph(工作圖):在編譯打包時生成。StreamGraph優化後生成JobGraph,送出到JobManager,主要的優化為,将多個符合條件的節點 chain 在一起作為一個節點。也就是滿足後面的任務鍊。
真實存在的資料結構:全類名:org.apache.flink.runtime.jobgraph.JobGraph
//源碼中的JobGraph(工作圖)
! wordcount => env.execute();
@ 1681行 execute(DEFAULT_JOB_NAME);
# 1699行 return execute(getStreamGraph(jobName));
$ 1713行 final JobClient jobClient = executeAsync(streamGraph);
^ 1812行 .execute(streamGraph, configuration); //将StreamGraph作為參數傳入
* 15行 CompletableFuture<JobClient> execute(Pipeline var1, Configuration var2) throws Exception;的實作LocalExecutor.class中
《 51行 JobGraph jobGraph = this.getJobGraph(pipeline, effectiveConfig);//将StreamGraph轉換為JobGraph
; 63行 return PipelineExecutorUtils.getJobGraph(pipeline, configuration);
[ 27行 JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
] 18行 return pipelineTranslator.translateToJobGraph(pipeline, optimizerConfiguration, defaultParallelism);//将StreamGraph轉換為JobGraph(pipeline就是相當于StreamGraph,底層會強轉至StreamGraph)
+ 進入該方法的實作類:StreamGraphTranslator
- 25行 StreamGraph streamGraph = (StreamGraph)pipeline;//pipeline強制轉換為StreamGraph
= 26行 return streamGraph.getJobGraph((JobID)null);
/ 850行 return StreamingJobGraphGenerator.createJobGraph(this, jobID);
》 109行 return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph(); // 中進入createJobGraph
< 169行 setChaining(hashes, legacyHashes);//核心
個人了解是将滿足需求的算子的id賦予相同的hash-id值。
-
ExecutionGraph(執行圖):JobGraph通過JobManager生成ExecutionGraph。
真實存在的資料結構:全類名:org.apache.flink.runtime.executiongraph.ExecutionGraph
- 實體執行圖:JobManager 根據 ExecutionGraph 對 Job 進行排程後,在各個TaskManager 上部署 Task 後形成的“圖”,并不是一個具體的資料結構。
二、Flink運作架構一、Flink運作元件二、任務執行流程三、程式與資料流
3、任務鍊(Operator Chains)
- Flink 采用了一種稱為任務鍊的優化技術,可以在特定條件下減少本地通信的開銷。為了滿足任務鍊的要求,必須将兩個或多個算子設為相同的并行度,并通過本地轉發(local forward)的方式進行連接配接
- 相同并行度的 one-to-one 操作,Flink 這樣相連的算子連結在一起形成一個 task,原來的算子成為裡面的 subtask
- 并行度相同、并且是 one-to-one 操作,兩個條件缺一不可(例如filter算子和map算子,并行度相同,沒有shuffle操作,類似spark中的窄依賴,合成一起放到同一個taskslot中執行)
二、Flink運作架構一、Flink運作元件二、任務執行流程三、程式與資料流
4、并行度(Parallelism)
- 一個特定算子的 子任務(subtask)的個數被稱之為其并行度(parallelism)。一般情況下,一個 stream 的并行度,可以認為就是其所有算子中最大的并行度。
- 一個TaskManager上可以有多個Taskslot
- 一個Taskslot上隻能運作一個并行度的子任務
- 多個子任務可以運作在多個TaskManager上。
- 一個程式中,不同的算子可能具有不同的并行度
- 算子之間傳輸資料的形式可以是 one-to-one (forwarding) 的模式也可以是redistributing 的模式,具體是哪一種形式,取決于算子的種類,類似spark中的寬依賴和窄依賴。
- One-to-one:stream維護着分區以及元素的順序(比如source和map之間)。這意味着map 算子的子任務看到的元素的個數以及順序跟 source 算子的子任務生産的元素的個數、順序相同。map、fliter、flatMap等算子都是one-to-one的對應關系。
- Redistributing:stream的分區會發生改變。每一個算子的子任務依據所選擇的transformation發送資料到不同的目标任務。例如,keyBy 基于 hashCode 重分區、而 broadcast 和 rebalance 會随機重新分區,這些算子都會引起redistribute過程,而 redistribute 過程就類似于 Spark 中的 shuffle 過程。