Spark的設計與運作原理
關于Spark
- Spark是基于記憶體計算的大資料并行計算架構,可用于建構大型的、低延遲的資料分析應用程式。
- Spark具有如下幾個主要特點:
- 運作速度快:Spark使用先進的DAG(Directed Acyclic Graph,有向無環圖)執行引擎,以支援循環資料流與記憶體計算,基于記憶體的執行速度可比Hadoop MapReduce快上百倍,基于磁盤的執行速度也能快十倍;
- 容易使用:Spark支援使用Scala、Java、Python和R語言進行程式設計;
- 通用性:Spark提供了完整而強大的技術棧,包括SQL查詢、流式計算、機器學習和圖算法元件,這些元件可以無縫整合在同一個應用中,足以應對複雜的計算;
- 運作模式多樣:Spark可運作于獨立的叢集模式中,或者運作于Hadoop中,也可運作于Amazon EC2等雲環境中,并且可以通路HDFS、Cassandra、HBase、Hive等多種資料源
Spark生态系統
Spark的生态系統主要包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX 等元件,各個元件的具體功能如下:
- Spark Core:Spark Core包含Spark的基本功能,如記憶體計算、任務排程、部署模式、故障恢複、存儲管理等。Spark建立在統一的抽象RDD之上,使其可以以基本一緻的方式應對不同的大資料處理場景;通常所說的Apache Spark,就是指Spark Core;
- Spark SQL:Spark SQL允許開發人員直接處理RDD,同時也可查詢Hive、HBase等外部資料源。Spark SQL的一個重要特點是其能夠統一處理關系表和RDD,使得開發人員可以輕松地使用SQL指令進行查詢,并進行更複雜的資料分析;
- Spark Streaming:Spark Streaming支援高吞吐量、可容錯處理的實時流資料處理,其核心思路是将流式計算分解成一系列短小的批處理作業。Spark Streaming支援多種資料輸入源,如Kafka、Flume和TCP套接字等;
- MLlib(機器學習):MLlib提供了常用機器學習算法的實作,包括聚類、分類、回歸、協同過濾等,降低了機器學習的門檻,開發人員隻要具備一定的理論知識就能進行機器學習的工作;
- GraphX(圖計算):GraphX是Spark中用于圖計算的API,可認為是Pregel在Spark上的重寫及優化,Graphx性能良好,擁有豐富的功能和運算符,能在海量資料上自如地運作複雜的圖算法。
Spark運作架構
基本概念
在具體講解Spark運作架構之前,需要先了解幾個重要的概念:

- RDD:是彈性分布式資料集(Resilient Distributed Dataset)的簡稱,是分布式記憶體的一個抽象概念,提供了一種高度受限的共享記憶體模型;
- DAG:是Directed Acyclic Graph(有向無環圖)的簡稱,反映RDD之間的依賴關系;
- Executor:是運作在工作節點(Worker Node)上的一個程序,負責運作任務,并為應用程式存儲資料;
- 應用:使用者編寫的Spark應用程式;
- 任務:運作在Executor上的工作單元;
- 作業:一個作業包含多個RDD及作用于相應RDD上的各種操作;
- 階段:是作業的基本排程機關,一個作業會分為多組任務,每組任務被稱為“階段”,或者也被稱為“任務集”。
架構設計
Spark運作架構包括叢集資料總管(Cluster Manager)、運作作業任務的工作節點(Worker Node)、每個應用的任務控制節點(Driver)和每個工作節點上負責具體任務的執行程序(Executor)。其中,叢集資料總管可以是Spark自帶的資料總管,也可以是YARN或Mesos等資源管理架構。
與Hadoop MapReduce計算架構相比,Spark所采用的Executor有兩個優點:一是利用多線程來執行具體的任務(Hadoop MapReduce采用的是程序模型),減少任務的啟動開銷;二是Executor中有一個BlockManager存儲子產品,會将記憶體和磁盤共同作為儲存設備,當需要多輪疊代計算時,可以将中間結果存儲到這個存儲子產品裡,下次需要時,就可以直接讀該存儲子產品裡的資料,而不需要讀寫到HDFS等檔案系統裡,因而有效減少了IO開銷;或者在互動式查詢場景下,預先将表緩存到該存儲系統上,進而可以提高讀寫IO性能。
在Spark中,一個應用(Application)由一個任務控制節點(Driver)和若幹個作業(Job)構成,一個作業由多個階段(Stage)構成,一個階段由多個任務(Task)組成。當執行一個應用時,任務控制節點會向叢集管理器(Cluster Manager)申請資源,啟動Executor,并向Executor發送應用程式代碼和檔案,然後在Executor上執行任務,運作結束後,執行結果會傳回給任務控制節點,或者寫到HDFS或者其他資料庫中。
Spark運作基本流程
Spark的基本運作流程如下:
(1)當一個Spark應用被送出時,首先需要為這個應用建構起基本的運作環境,即由任務控制節點(Driver)建立一個SparkContext,由SparkContext負責和資料總管(Cluster Manager)的通信以及進行資源的申請、任務的配置設定和監控等。SparkContext會向資料總管注冊并申請運作Executor的資源;
(2)資料總管為Executor配置設定資源,并啟動Executor程序,Executor運作情況将随着“心跳”發送到資料總管上;
(3)SparkContext根據RDD的依賴關系建構DAG圖,DAG圖送出給DAG排程器(DAGScheduler)進行解析,将DAG圖分解成多個“階段”(每個階段都是一個任務集),并且計算出各個階段之間的依賴關系,然後把一個個“任務集”送出給底層的任務排程器(TaskScheduler)進行處理;Executor向SparkContext申請任務,任務排程器将任務分發給Executor運作,同時,SparkContext将應用程式代碼發放給Executor;
(4)任務在Executor上運作,把執行結果回報給任務排程器,然後回報給DAG排程器,運作完畢後寫入資料并釋放所有資源。
RDD的設計與運作原理
Spark的核心是建立在統一的抽象RDD之上,使得Spark的各個元件可以無縫進行內建,在同一個應用程式中完成大資料計算任務。
1.RDD設計背景
在實際應用中,存在許多疊代式算法(比如機器學習、圖算法等)和互動式資料挖掘工具,這些應用場景的共同之處是,不同計算階段之間會重用中間結果,即一個階段的輸出結果會作為下一個階段的輸入。但是,目前的MapReduce架構都是把中間結果寫入到HDFS中,帶來了大量的資料複制、磁盤IO和序列化開銷。雖然,類似Pregel等圖計算架構也是将結果儲存在記憶體當中,但是,這些架構隻能支援一些特定的計算模式,并沒有提供一種通用的資料抽象。RDD就是為了滿足這種需求而出現的,它提供了一個抽象的資料架構,我們不必擔心底層資料的分布式特性,隻需将具體的應用邏輯表達為一系列轉換處理,不同RDD之間的轉換操作形成依賴關系,可以實作管道化,進而避免了中間結果的存儲,大大降低了資料複制、磁盤IO和序列化開銷。
2.RDD概念
- 一個RDD就是一個分布式對象集合,本質上是一個隻讀的分區記錄集合,每個RDD可以分成多個分區,每個分區就是一個資料集片段,并且一個RDD的不同分區可以被儲存到叢集中不同的節點上,進而可以在叢集中的不同節點上進行并行計算。
- RDD提供了一種高度受限的共享記憶體模型,即RDD是隻讀的記錄分區的集合,不能直接修改,隻能基于穩定的實體存儲中的資料集來建立RDD,或者通過在其他RDD上執行确定的轉換操作(如map、join和groupBy)而建立得到新的RDD。
- RDD提供了一組豐富的操作以支援常見的資料運算,分為“行動”(Action)和“轉換”(Transformation)兩種類型,前者用于執行計算并指定輸出的形式,後者指定RDD之間的互相依賴關系。兩類操作的主要差別是,轉換操作(比如map、filter、groupBy、join等)接受RDD并傳回RDD,而行動操作(比如count、collect等)接受RDD但是傳回非RDD(即輸出一個值或結果)。
Spark用Scala語言實作了RDD的API,程式員可以通過調用API實作對RDD的各種操作。RDD典型的執行過程如下:
1. RDD讀入外部資料源(或者記憶體中的集合)進行建立;
2. RDD經過一系列的“轉換”操作,每一次都會産生不同的RDD,供給下一個“轉換”使用;
3. 最後一個RDD經“行動”操作進行處理,并輸出到外部資料源(或者變成Scala集合或标量)。
RDD采用了惰性調用,即在RDD的執行過程中(如圖9-8所示),真正的計算發生在RDD的“行動”操作,對于“行動”之前的所有“轉換”操作,Spark隻是記錄下“轉換”操作應用的一些基礎資料集以及RDD生成的軌迹,即互相之間的依賴關系,而不會觸發真正的計算。
從輸入中邏輯上生成A和C兩個RDD,經過一系列“轉換”操作,邏輯上生成了F(也是一個RDD),之是以說是邏輯上,是因為這時候計算并沒有發生,Spark隻是記錄了RDD之間的生成和依賴關系。當F要進行輸出時,也就是當F進行“行動”操作的時候,Spark才會根據RDD的依賴關系生成DAG,并從起點開始真正的計算。
上述這一系列處理稱為一個“血緣關系(Lineage)”,即DAG拓撲排序的結果。采用惰性調用,通過血緣關系連接配接起來的一系列RDD操作就可以實作管道化(pipeline),避免了多次轉換操作之間資料同步的等待,而且不用擔心有過多的中間資料,因為這些具有血緣關系的操作都管道化了,一個操作得到的結果不需要儲存為中間資料,而是直接管道式地流入到下一個操作進行處理。同時,這種通過血緣關系把一系列操作進行管道化連接配接的設計方式,也使得管道中每次操作的計算變得相對簡單,保證了每個操作在處理邏輯上的單一性;相反,在MapReduce的設計中,為了盡可能地減少MapReduce過程,在單個MapReduce中會寫入過多複雜的邏輯。
例1:一個Spark的“Hello World”程式
這裡以一個“Hello World”入門級Spark程式來解釋RDD執行過程,這個程式的功能是讀取一個HDFS檔案,計算出包含字元串“Hello World”的行數。
//建立SparkContext對象
val sc= new SparkContext(“spark://localhost:7077”,”Hello World”, “YOUR_SPARK_HOME”,”YOUR_APP_JAR”)
//從HDFS檔案中讀取資料建立一個RDD
val fileRDD = sc.textFile(“hdfs://192.168.0.103:9000/examplefile”)
//對fileRDD進行轉換操作得到一個新的RDD,即filterRDD
val filterRDD = fileRDD.filter(_.contains(“Hello World”))
/*第4行代碼表示對filterRDD進行持久化,
把它儲存在記憶體或磁盤中(這裡采用cache接口把資料集儲存在記憶體中),
友善後續重複使用,當資料被反複通路時(比如查詢一些熱點資料,或者運作疊代算法),
這是非常有用的,而且通過cache()可以緩存非常大的資料集,
支援跨越幾十甚至上百個節點;*/
filterRDD.cache()
//代碼中的count()是一個行動操作,用于計算一個RDD集合中包含的元素個數。
filterRDD.count()
這個程式的執行過程如下:
* 建立這個Spark程式的執行上下文,即建立SparkContext對象;
* 從外部資料源(即HDFS檔案)中讀取資料建立fileRDD對象;
* 建構起fileRDD和filterRDD之間的依賴關系,形成DAG圖,這時候并沒有發生真正的計算,隻是記錄轉換的軌迹;
* 執行到第5行代碼時,count()是一個行動類型的操作,觸發真正的計算,開始實際執行從fileRDD到filterRDD的轉換操作,并把結果持久化到記憶體中,最後計算出filterRDD中包含的元素個數。
3.RDD特性
總體而言,Spark采用RDD以後能夠實作高效計算的主要原因如下:
- 高效的容錯性。現有的分布式共享記憶體、鍵值存儲、記憶體資料庫等,為了實作容錯,必須在叢集節點之間進行資料複制或者記錄日志,也就是在節點之間會發生大量的資料傳輸,這對于資料密集型應用而言會帶來很大的開銷。在RDD的設計中,資料隻讀,不可修改,如果需要修改資料,必須從父RDD轉換到子RDD,由此在不同RDD之間建立了血緣關系。是以,RDD是一種天生具有容錯機制的特殊集合,不需要通過資料備援的方式(比如檢查點)實作容錯,而隻需通過RDD父子依賴(血緣)關系重新計算得到丢失的分區來實作容錯,無需復原整個系統,這樣就避免了資料複制的高開銷,而且重算過程可以在不同節點之間并行進行,實作了高效的容錯。此外,RDD提供的轉換操作都是一些粗粒度的操作(比如map、filter和join),RDD依賴關系隻需要記錄這種粗粒度的轉換操作,而不需要記錄具體的資料和各種細粒度操作的日志(比如對哪個資料項進行了修改),這就大大降低了資料密集型應用中的容錯開銷;
- 中間結果持久化到記憶體。資料在記憶體中的多個RDD操作之間進行傳遞,不需要“落地”到磁盤上,避免了不必要的讀寫磁盤開銷;
- 存放的資料可以是Java對象,避免了不必要的對象序列化和反序列化開銷。
4. RDD之間的依賴關系
RDD中不同的操作會使得不同RDD中的分區會産生不同的依賴。RDD中的依賴關系分為窄依賴(Narrow Dependency)與寬依賴(Wide Dependency).
窄依賴表現為一個父RDD的分區對應于一個子RDD的分區(一對一),或多個父RDD的分區對應于一個子RDD的分區(多對一);
寬依賴則表現為存在一個父RDD的一個分區對應一個子RDD的多個分區。
總體而言,如果父RDD的一個分區隻被一個子RDD的一個分區所使用就是窄依賴,否則就是寬依賴。窄依賴典型的操作包括map、filter、union等,寬依賴典型的操作包括groupByKey、sortByKey等。對于連接配接(join)操作,可以分為兩種情況。
(1)對輸入進行協同劃分,屬于窄依賴(如圖(a)所示)。所謂協同劃分(co-partitioned)是指多個父RDD的某一分區的所有“鍵(key)”,落在子RDD的同一個分區内,不會産生同一個父RDD的某一分區,落在子RDD的兩個分區的情況。
(2)對輸入做非協同劃分,屬于寬依賴,如圖(b)所示。
對于窄依賴的RDD,可以以流水線的方式計算所有父分區,不會造成網絡之間的資料混合。對于寬依賴的RDD,則通常伴随着Shuffle操作,即首先需要計算好所有父分區資料,然後在節點之間進行Shuffle。
Spark的這種依賴關系設計,使其具有了天生的容錯性,大大加快了Spark的執行速度。因為,RDD資料集通過“血緣關系”記住了它是如何從其它RDD中演變過來的,血緣關系記錄的是粗顆粒度的轉換操作行為,當這個RDD的部分分區資料丢失時,它可以通過血緣關系擷取足夠的資訊來重新運算和恢複丢失的資料分區,由此帶來了性能的提升。
相對而言,在兩種依賴關系中,窄依賴的失敗恢複更為高效,它隻需要根據父RDD分區重新計算丢失的分區即可(不需要重新計算所有分區),而且可以并行地在不同節點進行重新計算。而對于寬依賴而言,單個節點失效通常意味着重新計算過程會涉及多個父RDD分區,開銷較大。此外,Spark還提供了資料檢查點和記錄日志,用于持久化中間RDD,進而使得在進行失敗恢複時不需要追溯到最開始的階段。在進行故障恢複時,Spark會對資料檢查點開銷和重新計算RDD分區的開銷進行比較,進而自動選擇最優的恢複政策。
5.階段的劃分
Spark通過分析各個RDD的依賴關系生成了DAG,再通過分析各個RDD中的分區之間的依賴關系來決定如何劃分階段,具體劃分方法是:在DAG中進行反向解析,遇到寬依賴就斷開,遇到窄依賴就把目前的RDD加入到目前的階段中;
将窄依賴盡量劃分在同一個階段中,可以實作流水線計算(具體的階段劃分算法請參見AMP實驗室發表的論文《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》)。
例如,如圖所示,假設從HDFS中讀入資料生成3個不同的RDD(即A、C和E),通過一系列轉換操作後再将計算結果儲存回HDFS。對DAG進行解析時,在依賴圖中進行反向解析,由于從RDD A到RDD B的轉換以及從RDD B和F到RDD G的轉換,都屬于寬依賴,是以,在寬依賴處斷開後可以得到三個階段,即階段1、階段2和階段3。可以看出,在階段2中,從map到union都是窄依賴,這兩步操作可以形成一個流水線操作,比如,分區7通過map操作生成的分區9,可以不用等待分區8到分區9這個轉換操作的計算結束,而是繼續進行union操作,轉換得到分區13,這樣流水線執行大大提高了計算的效率。
5.RDD運作過程
總結一下RDD在Spark架構中的運作過程:
(1)建立RDD對象;
(2)SparkContext負責計算RDD之間的依賴關系,建構DAG;
(3)DAGScheduler負責把DAG圖分解成多個階段,每個階段中包含了多個任務,每個任務會被任務排程器分發給各個工作節點(Worker Node)上的Executor去執行。