天天看點

SparkCore 運作過程剖析(基本運作流程, DAG,Lineage(血緣關系) 寬依賴和窄依賴)

之前的Spark總結,我提到了Spark的學習主要分為四個部分:

  • 1.Spark Core用于離線計算;
  • 2.Spark SQL用于互動式查詢
  • 3.Spark Streaming用于實時流式計算
  • 4.Spark MLlib用于機器學習

    這一篇部落格我來講講Spark内部的運作過程剖析,比較偏理論 但是絕對值得你一看。

Spark中的專業術語

  • 1.Application: 基于Spark的使用者程式,包含了driver program和叢集上多個executor

    Spark中隻要有一個sparkcontext就是一個application;

    啟動一個spark-shell也是一個application,因為在啟動spark-shell是就内置了一個sc(SparkContext的執行個體)

  • 2.執行器(executor):在Worker Node上為某Application啟動一個程序,該程序負責運作任務,并且負責将資料在硬碟或者記憶體中;每個Application都有各自獨立的executors;
  • 3.Driver Program:Spark中的Driver即運作上述Application的main()函數并且建立 SparkContext,其中建立SparkContext的目的是為了準備Spark應用程式的運作環境。在Spark中由SparkContext負責和ClusterManager通信,進行資源的申請、任務的配置設定和監控等;當Executor部分運作完畢後,Driver負責将SparkContext關閉。通常用SparkContext代表Driver
  • 4.Cluster Manager: 在叢集上擷取資源的外部服務(例如standalone,Mesos,Yarn )
  • 5.Worker Node 叢集中任何可以運⾏行應⽤用代碼的節點
  • 6.Master,是個程序,主要是負責資源的排程和配置設定,還有叢集的監控等等職責。
  • 7.Worker,同樣是個程序,主要負責兩個,一個是用自己的記憶體存儲RDD的某個或者某些partition;另一個是啟動其他線程或程序,對RDD上的partition進行處理和計算。
  • 8.Task: 被送到某個executor上的工作單元
  • 9.Job 包含很多任務的并⾏行計算,Spark中的一個action對應一個job,如:collect, count, saveAsTextFile;

    使用者送出的Job會送出給DAGScheduler,Job會被分解成Stage(TaskSet) DAG;

    RDD的transformation隻會記錄對中繼資料的操作(map/filter),而不會真正執行,隻有action觸發時才會執行job;

  • 10.Stage ⼀個Job會被拆分很多組任務,每組任務被稱為一個Stage,也可稱為 TaskSet(就像Mapreduce分map任務和reduce任務⼀一樣)

    一個stage的邊界往往是從某個地方取資料開始(如:sc.readTextFile),在shuffle時(如join,reduceByKey等)終止;

    一個job的結束(如:count、saveAsTextFile等)往往也是一個stage的邊界;

    有兩種類型的stage:ShuffleMapStage和ResultStage

  • 11.Task 被送到executor的工作單元;

    在Spark中有兩類Task:shuffleMap和ResultTask,第一類Task的輸出時shuffle所需資料,第二類Task的輸出時result;

    Stage的劃分也以此為依據,shuffle之前的所有變換是一個stage,shuffle之後的操作時另一個stage;

  • 12.Partition 類似hadoop中的split,計算是以partittion為機關進行的。

    備注:以上這些涉及到spark内部運作過程的專業詞彙在初學者第一次接觸時 可能不能完全明白了解 ,這是人之常情。通過後面長時間的學習,慢慢我們就會掌握了解。

相關概念的邏輯關系圖:

SparkCore 運作過程剖析(基本運作流程, DAG,Lineage(血緣關系) 寬依賴和窄依賴)

Spark的基本運作流程(面試常考)

SparkCore 運作過程剖析(基本運作流程, DAG,Lineage(血緣關系) 寬依賴和窄依賴)

如上圖所示,基本運作過程:

(1):建構Spark Application的運作環境,啟動SparkContext

(2):SparkContext向資料總管(可以是Standalone,Mesos,Yarn)申請運作Executor資源,并啟動StandaloneExecutorbackend,Executor向SparkContext申請Task

(3):SparkContext将應用程式分發給Executor

(4):SparkContext建構成DAG圖,将DAG圖分解成Stage、将Taskset發送給Task Scheduler,最後由Task Scheduler将Task發送給Executor運作

(5):Task在Executor上運作,運作完釋放所有資源

Spark運作架構特點:

  • 每個Application擷取專屬的executor進 程,該程序在Application期間一直駐留,并以多線程方式運作tasks。這種Application隔離機制有其優勢的,無論是從排程角度看 (每個Driver排程它自己的任務),還是從運作角度看(來自不同Application的Task運作在不同的JVM中)。當然,這也意味着 Spark Application不能跨應用程式共享資料,除非将資料寫入到外部存儲系統。
  • Spark與資料總管無關,隻要能夠擷取executor程序,并能保持互相通信就可以了。
  • 提 交SparkContext的Client應該靠近Worker節點(運作Executor的節點),最好是在同一個Rack裡,因為Spark Application運作過程中SparkContext和Executor之間有大量的資訊交換;如果想在遠端叢集中運作,最好使用RPC将 SparkContext送出給叢集,不要遠離Worker運作SparkContext。
  • Task采用了資料本地性和推測執行的優化機制。

注意,所有的spark應用程式都離不開SparkContext和Executor兩部分,Executor負責執行任務,運作Executor的機器成為work節點,SparkContext由使用者啟動,通過資源排程子產品和Executor通信。SparkContext和Executor這兩部分的核心代碼是在在各種運作模式下都是公用的,在這之上,根據運作模式部署的不停,包裝了不同排程子產品以及相關的适配代碼。

在SparkContext的初始化過程中,Spark會分别建立DAGScheduler作業排程和TaskSchduler任務排程兩級排程子產品。

DAG Scheduler

DAG Scheduler把一個Spark作業轉換成Stage的DAG(Directed Acyclic Graph有向無環圖),根據RDD和Stage之間的關系找出開銷最小的排程方法,然後把Stage以TaskSet的形式送出給TaskScheduler。具體過程如下:

  • 基于Stage建構DAG,決定每個任務的最佳位置
  • 記錄哪個RDD或者Stage輸出被物化
  • 将taskset傳給底層排程器TaskScheduler
  • 重新送出shuffle輸出丢失的stage

Task Scheduler

DAG Scheduler決定了Task的理想位置,并把這些資訊傳遞給下層的Task Scheduler。此外,DAG Scheduler還處理由于Shuffle資料丢失導緻的失敗,還有可能需要重新送出運作之前的Stage(非Shuffle資料丢失導緻的Task失敗由Task Scheduler處理)

Task Scheduler維護所有TaskSet,當Executor向Driver發生心跳時,Task Scheduler會根據資源剩餘情況配置設定相應的Task。另外Task Scheduler還維護着所有Task的運作标簽,重試失敗的Task。

具體過程如下:

  • 送出taskset(一組task)到叢集運⾏行并彙報結果
  • 出現shuffle輸出lost要報告fetch failed錯誤
  • 碰到straggle任務需要放到别的節點上重試
  • 為每⼀個TaskSet維護⼀一個TaskSetManager(追蹤本地性及錯誤資訊)

在不同運作模式中任務排程器具體為:

  • Spark on Standalone模式為TaskScheduler;
  • YARN-Client模式為YarnClientClusterScheduler
  • YARN-Cluster模式為YarnClusterScheduler
    SparkCore 運作過程剖析(基本運作流程, DAG,Lineage(血緣關系) 寬依賴和窄依賴)

相關基礎類

(2020.2.6更新)

TaskScheduler/SchedulerBackend

SparkCore 運作過程剖析(基本運作流程, DAG,Lineage(血緣關系) 寬依賴和窄依賴)
SparkCore 運作過程剖析(基本運作流程, DAG,Lineage(血緣關系) 寬依賴和窄依賴)
SparkCore 運作過程剖析(基本運作流程, DAG,Lineage(血緣關系) 寬依賴和窄依賴)
SparkCore 運作過程剖析(基本運作流程, DAG,Lineage(血緣關系) 寬依賴和窄依賴)

還記得我們在上一篇部落格Spark02提到的RDD嗎?RDD是Spark程式設計的基礎。在那篇部落格中,我們主要講到的是RDD的基礎知識。

RDD在Spark中運作,主要分為一下三步:

  • 1.建立RDD對象
  • 2.DAGscheduler子產品介入運算,計算RDD之間的依賴關系,RDD之間的依賴關系就形成了DAG
  • 3.每一Job被劃分為多個Stage。劃分Stage的一個主要依據是目前計算因子的輸入是否确定,如果是将其分在同一個Stage,避免多個Stage之間的消息傳遞開銷

排程器根據RDD的結構資訊為每個動作确定有效的執行計劃。排程器的接口是runJob函數,參數為RDD及其分區集,和一個RDD分區上的函數。該接口足以表示Spark中的所有動作(即count、collect、save等)。

排程器根據目标RDD的血統關系(Lineage)建立一個由stage構成的有向無環圖(DAG)。每個stage内部盡可能多地包含一組具有窄依賴關系的轉換,并将它們流水線并行化(pipeline)。stage的邊界有兩種情況:一是寬依賴上的Shuffle操作;二是已緩存分區,它可以縮短父RDD的計算過程。

SparkCore 運作過程剖析(基本運作流程, DAG,Lineage(血緣關系) 寬依賴和窄依賴)

以下面一個按 A-Z 首字母分類,查找相同首字母下不同姓名總個數的例子來看一下 RDD 是如何運作起來的。

SparkCore 運作過程剖析(基本運作流程, DAG,Lineage(血緣關系) 寬依賴和窄依賴)

步驟 1 :建立 RDD 。上面的例子除去最後一個 collect 是個動作不會建立 RDD 之外,前面四個轉換都會建立出新的 RDD 。是以第一步就是建立好所有 RDD( 内部的五項資訊 ) 。

步驟 2 :建立執行計劃。 Spark 會盡可能地管道化,并基于是否要重新組織資料來劃分 階段 (stage) ,例如本例中的 groupBy() 轉換就會将整個執行計劃劃分成兩階段執行。最終會産生一個 DAG(directed acyclic graph ,有向無環圖 ) 作為邏輯執行計劃。

SparkCore 運作過程剖析(基本運作流程, DAG,Lineage(血緣關系) 寬依賴和窄依賴)

步驟 3 :排程任務。将各階段劃分成不同的任務 (task) ,每個任務都是資料和計算的合體。在進行下一階段前,目前階段的所有任務都要執行完成。因為下一階段的第一個轉換一定是重新組織資料的,是以必須等目前階段所有結果資料都計算出來了才能繼續。

假設本例中的 hdfs://names 下有四個檔案塊,那麼 HadoopRDD 中 partitions 就會有四個分區對應這四個塊資料,同時 preferedLocations 會指明這四個塊的最佳位置。現在,就可以建立出四個任務,并排程到合适的叢集結點上。

SparkCore 運作過程剖析(基本運作流程, DAG,Lineage(血緣關系) 寬依賴和窄依賴)

何為Lineage(血統關系)?

利用記憶體加快資料加載,在衆多的其它的In-Memory類資料庫或Cache類系統中也有實作,Spark的主要差別在于它處理分布式運算環境下的資料容錯性(節點實效/資料丢失)問題時采用的方案。為了保證RDD中資料的魯棒性,RDD資料集通過所謂的血統關系(Lineage)記住了它是如何從其它RDD中演變過來的。相比其它系統的細顆粒度的記憶體資料更新級别的備份或者LOG機制,RDD的Lineage記錄的是粗顆粒度的特定資料轉換(Transformation)操作(filter, map, join etc.)行為。當這個RDD的部分分區資料丢失時,它可以通過Lineage擷取足夠的資訊來重新運算和恢複丢失的資料分區。這種粗顆粒的資料模型,限制了Spark的運用場合,但同時相比細顆粒度的資料模型,也帶來了性能的提升。

RDD在Lineage依賴方面分為兩種==窄依賴(Narrow Dependencies)與寬依賴(Wide Dependencies)==用來解決資料容錯的高效性。

寬依賴和窄依賴

窄依賴(Narrow Dependencies)是指父RDD的每一個分區最多被一個子RDD的分區所用,表現為一個父RDD的分區對應于一個子RDD的分區或多個父RDD的分區對應于一個子RDD的分區,也就是說一個父RDD的一個分區不可能對應一個子RDD的多個分區。寬依賴(Wide Dependencies)是指子RDD的分區依賴于父RDD的多個分區或所有分區,也就是說存在一個父RDD的一個分區對應一個子RDD的多個分區。

對于Wide Dependencies,這種計算的輸入和輸出在不同的節點上,lineage方法對與輸入節點完好,而輸出節點當機時,通過重新計算,這種情況下,這種方法容錯是有效的,否則無效,因為無法重試,需要向上其祖先追溯看是否可以重試(這就是lineage,血統的意思),Narrow Dependencies對于資料的重算開銷要遠小于Wide Dependencies的資料重算開銷。

關于寬窄依賴的了解,這裡我已wordCount舉了例子:

SparkCore 運作過程剖析(基本運作流程, DAG,Lineage(血緣關系) 寬依賴和窄依賴)

這一篇關于spark的部落格 偏理論基礎,涉及到許多原理剖析的基礎知識(DAG RDD的血緣關系 寬依賴和窄依賴等),但卻是大資料相關崗位經常會遇到的面試高頻問題。說實話,南國前段時間面試cvte資料挖掘崗的時候,就被問到了spark運作的基本過程,但一時之間有點卡殼 隻記得大概和一些名詞 沒有回答好。是以自己回顧之前的一些學習筆記和查找資料,總結出這篇。如有問題,還請多指教!

參考資料:

https://www.cnblogs.com/1130136248wlxk/articles/6289717.html

繼續閱讀