天天看點

二、Flink運作架構一、Flink運作元件二、任務執行流程三、程式與資料流

Flink運作架構

  • 一、Flink運作元件
    • 1、作業管理器(JobManager)
    • 2、任務管理器(TaskManager)
    • 3、資料總管(ResourceManager)
    • 4、分發器(Dispatcher)
  • 二、任務執行流程
    • 1.任務送出流程
    • 2.任務送出流程(with Yarn)
    • 3、任務排程原理
    • 4、TaskManager和Taskslot關系
      • 4.1、對slot(插槽),以及core,partition,Parallelism(并行度)了解。
  • 三、程式與資料流
    • 1、Dataflow
    • 2、執行圖(ExecutionGraph)
    • 3、任務鍊(Operator Chains)
    • 4、并行度(Parallelism)

一、Flink運作元件

  • 作業管理器(JobManager)
  • 任務管理器(TaskManager)
  • 資料總管(ResourceManager)
  • 分發器(Dispacher)

同時運作在一個Master程序中,以上管理器都為線程。

1、作業管理器(JobManager)

控制一個應用程式執行的主程序,

  • 每一個flink程式都會被一個不同的JobManager所控制執行。相當于一段flink的程式代碼。
  • 一個JobManager會接收一個需要執行的應用程式,這個應用程式中包含:作業圖(JobGraph),邏輯資料流圖(Local dataFlow graph),和打包的所有的類,庫,以及其他所需要對應的jar包。
  • JobManager會将擷取的JobGraph轉換為對應的實體層面的資料流圖,也就是ExecutionGraph(執行圖),執行圖中包含了所有可以并行執行的任務。
  • JobManager會向ResourceManager申請此次任務所需要的資源,該資源即為TaskManager上的插槽(slot),擷取資源之後,會将得到的ExecutionGraph發送到對應的TaskManager上。
  • JobManager在發送執行圖之後,會負責對中央協調的操作。例如對于檢查點checkpoints的協調。

2、任務管理器(TaskManager)

是Flink運作中,執行任務的工作程序。

  • flink中會有多個TaskManager運作。每一個TaskManager上的插槽(slot)限制着每個TaskManager能夠執行任務的數量。
  • 啟動之後,TaskManager會向資料總管注冊它的插槽;收到資料總管的指令後,TaskManager就會将一個或者多個插槽提供給JobManager調用。JobManager就可以向插槽配置設定任務(tasks)來執行了。
  • 在執行過程中,一個TaskManager可以跟其它運作同一應用程式的TaskManager交換資料。

3、資料總管(ResourceManager)

主要負責管理任務管理器(TaskManager)的插槽(slot)

  • TaskManager 插槽是Flink中定義的處理資源單元。
  • Flink為不同的環境和資源管理工具提供了不同資料總管,比如YARN、Mesos、K8s,以及Standalone部署,一般為yarn。
  • 當JobManager申請插槽資源時,ResourceManager會将有空閑插槽的TaskManager配置設定給JobManager。如果Flink的資料總管沒有足夠的插槽來滿足JobManager的請求,它還可以向資源提供平台(YARN的資料總管)發起會話,以提供啟動TaskManager程序的容器。

4、分發器(Dispatcher)

  • 可以跨作業運作,它為應用送出提供了REST接口。
  • 當一個應用被送出執行時,分發器就會啟動并将應用移交給一個JobManager。
  • 分發器也會啟動一個Web UI,用來友善地展示和監控作業執行的資訊。
  • 分發器在架構中可能并不是必需的,這取決于應用送出運作的方式。

二、任務執行流程

1.任務送出流程

二、Flink運作架構一、Flink運作元件二、任務執行流程三、程式與資料流

個人了解:

  1. 首先App送出應用,兩種方式。①通過flink run指令送出,該送出方式不經過Dispatcher,直接送出。②通過flink-web頁面8081端口進行送出,會經過Dispatcher。
  2. 送出的應用會送出到JobManager上,JobManager收到應用後,向ResourceManager請求資源(slots),同時TaskManager啟動,并向ResourceManager上請求注冊slots。
  3. TaskManager向JobManager提供執行該應用所需要的資源。
  4. JobManager将該任務配置設定到對應的TaskManager的插槽中(slots),任務在TaskManagre的插槽中執行。

2.任務送出流程(with Yarn)

二、Flink運作架構一、Flink運作元件二、任務執行流程三、程式與資料流

每次JobManager收到作業時,都會向ResourceManager請求資源,當ResourceManager資源不夠時,會向yarn叢集申請資源。

3、任務排程原理

二、Flink運作架構一、Flink運作元件二、任務執行流程三、程式與資料流

代碼(流程圖) -> StreamGraph -> JobGraph -> ExecutionGraph -> 實體執行圖

  1. 我們寫的flink代碼會在打包編譯的過程中,會存在優化,生成一個作業圖。
  2. 然後通過分發器(Dispatcher)将作業圖送出到JobManager。
  3. 通過排程器(scheduler)将任務排程到任務管理器(TaskManager)上執行。
  4. 每一個任務管理器都有對應的插槽(配置檔案可以指定),每個任務管理器都是一個jvm程序,而每個任務插槽為一個線程。
  5. flink中通信子產品用的是Actor System。

4、TaskManager和Taskslot關系

二、Flink運作架構一、Flink運作元件二、任務執行流程三、程式與資料流
  • Flink 中每一個 TaskManager 都是一個JVM程序,它中的一個或多個 subtask可能會在獨立的線程上執行。
  • 為了控制一個 TaskManager 能接收多少個 task, TaskManager 通過 task slot 來進行控制(一個 TaskManager 至少有一個 slot)
    二、Flink運作架構一、Flink運作元件二、任務執行流程三、程式與資料流
  • 預設情況下,Flink 允許子任務共享 slot。 這樣的結果是,一個 slot 可以儲存作業的整個管道。
  • Task Slot 是靜态的概念,是指 TaskManager 具有的并發執行能力

4.1、對slot(插槽),以及core,partition,Parallelism(并行度)了解。

  • 對全局設定并行度就是全局的并行度,對單個算子設定并行度,隻作用于單個算子,一次程式申請的插槽的個數是max(全局的并行度,單個算子的并行度)。

三、程式與資料流

1、Dataflow

二、Flink運作架構一、Flink運作元件二、任務執行流程三、程式與資料流
  • flink所有的程式都是由三部分組成:Source,transform,Sink。
  • Source:讀取資料源。
  • transform:通過對應的算子對資料進行轉換和處理。
  • Sink:對流的輸出。

代碼中還有,程式開始時的擷取開發環境,程式完成後,需要對程式進行執行(對于代碼而言)。

flink上運作的程式會被映射成一個邏輯資料流(dataflow),每一個dataflow包括三個部分source,transform,sink,

每一個dataflow以一個或多個source開始,以一個或多個sink結束。類似DAG(有向無環圖)。

大部分情況是一對一。

2、執行圖(ExecutionGraph)

flink執行圖可以分為四層: StreamGraph -> JobGraph -> ExecutionGraph -> 實體執行圖

  1. StreamGraph(流圖):使用者通過StreamAPI編寫程式生成的最初的圖,用來表示程式的拓撲結構。

    真實存在的資料結構:全類名:org.apache.flink.streaming.api.graph.StreamGraph

//源碼中的StreamGraph(流圖)

! wordcount => env.execute();
	@ 1681行 execute(DEFAULT_JOB_NAME);
		# 1699行 return execute(getStreamGraph(jobName));
			$ 1848行 return getStreamGraph(jobName, true);
				^ 1863行 StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();	

           
  1. JobGraph(工作圖):在編譯打包時生成。StreamGraph優化後生成JobGraph,送出到JobManager,主要的優化為,将多個符合條件的節點 chain 在一起作為一個節點。也就是滿足後面的任務鍊。

    真實存在的資料結構:全類名:org.apache.flink.runtime.jobgraph.JobGraph

//源碼中的JobGraph(工作圖)

! wordcount => env.execute();
	@ 1681行 execute(DEFAULT_JOB_NAME);
	 # 1699行 return execute(getStreamGraph(jobName)); 
	  $ 1713行 final JobClient jobClient = executeAsync(streamGraph);
	   ^ 1812行 .execute(streamGraph, configuration);  //将StreamGraph作為參數傳入
	    * 15行    CompletableFuture<JobClient> execute(Pipeline var1, Configuration var2) throws Exception;的實作LocalExecutor.class中
	     《 51行 JobGraph jobGraph = this.getJobGraph(pipeline, effectiveConfig);//将StreamGraph轉換為JobGraph
	      ; 63行 return PipelineExecutorUtils.getJobGraph(pipeline, configuration);
	       [ 27行 JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
	        ] 18行 return pipelineTranslator.translateToJobGraph(pipeline, optimizerConfiguration, defaultParallelism);//将StreamGraph轉換為JobGraph(pipeline就是相當于StreamGraph,底層會強轉至StreamGraph)
	         + 進入該方法的實作類:StreamGraphTranslator
	          - 25行  StreamGraph streamGraph = (StreamGraph)pipeline;//pipeline強制轉換為StreamGraph
			   = 26行 return streamGraph.getJobGraph((JobID)null);
			   	/ 850行 return StreamingJobGraphGenerator.createJobGraph(this, jobID);
			   	 》 109行 return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph(); // 中進入createJobGraph
			   	  < 169行 setChaining(hashes, legacyHashes);//核心 
			   	  個人了解是将滿足需求的算子的id賦予相同的hash-id值。


           
  1. ExecutionGraph(執行圖):JobGraph通過JobManager生成ExecutionGraph。

    真實存在的資料結構:全類名:org.apache.flink.runtime.executiongraph.ExecutionGraph

  2. 實體執行圖:JobManager 根據 ExecutionGraph 對 Job 進行排程後,在各個TaskManager 上部署 Task 後形成的“圖”,并不是一個具體的資料結構。
    二、Flink運作架構一、Flink運作元件二、任務執行流程三、程式與資料流

3、任務鍊(Operator Chains)

  • Flink 采用了一種稱為任務鍊的優化技術,可以在特定條件下減少本地通信的開銷。為了滿足任務鍊的要求,必須将兩個或多個算子設為相同的并行度,并通過本地轉發(local forward)的方式進行連接配接
  • 相同并行度的 one-to-one 操作,Flink 這樣相連的算子連結在一起形成一個 task,原來的算子成為裡面的 subtask
  • 并行度相同、并且是 one-to-one 操作,兩個條件缺一不可(例如filter算子和map算子,并行度相同,沒有shuffle操作,類似spark中的窄依賴,合成一起放到同一個taskslot中執行)
    二、Flink運作架構一、Flink運作元件二、任務執行流程三、程式與資料流

4、并行度(Parallelism)

二、Flink運作架構一、Flink運作元件二、任務執行流程三、程式與資料流
  • 一個特定算子的 子任務(subtask)的個數被稱之為其并行度(parallelism)。一般情況下,一個 stream 的并行度,可以認為就是其所有算子中最大的并行度。
二、Flink運作架構一、Flink運作元件二、任務執行流程三、程式與資料流
  • 一個TaskManager上可以有多個Taskslot
  • 一個Taskslot上隻能運作一個并行度的子任務
  • 多個子任務可以運作在多個TaskManager上。
  • 一個程式中,不同的算子可能具有不同的并行度
  • 算子之間傳輸資料的形式可以是 one-to-one (forwarding) 的模式也可以是redistributing 的模式,具體是哪一種形式,取決于算子的種類,類似spark中的寬依賴和窄依賴。
  • One-to-one:stream維護着分區以及元素的順序(比如source和map之間)。這意味着map 算子的子任務看到的元素的個數以及順序跟 source 算子的子任務生産的元素的個數、順序相同。map、fliter、flatMap等算子都是one-to-one的對應關系。
  • Redistributing:stream的分區會發生改變。每一個算子的子任務依據所選擇的transformation發送資料到不同的目标任務。例如,keyBy 基于 hashCode 重分區、而 broadcast 和 rebalance 會随機重新分區,這些算子都會引起redistribute過程,而 redistribute 過程就類似于 Spark 中的 shuffle 過程。