天天看點

Spark原理及應用

作者:勇者熱情生活家

Apache Spark是通用的分布式大資料計算引擎。Spark是UC Berkeley AMPLab(美國加州大學伯克利分校的AMP實驗室)開源的通用并行架構。Spark擁有Hadoop MapReduce所具有的優點,但不同于Hadoop MapReduce的是,Hadoop每次經過Job執行的中間結果都存儲到HDFS等磁盤上,而Spark的Job中間輸出結果可以儲存在記憶體中,而不再需要讀寫HDFS。因為記憶體的讀寫速度與磁盤的讀寫速度不在一個數量級上,是以Spark利用記憶體中的資料能更快速地完成資料的處理。Spark啟用了彈性分布式資料集(Resilient Distributed Dataset,RDD),除了能夠提高互動式查詢效率,還可以優化疊代器的工作負載。由于彈性分布式資料集的存在,使得資料挖掘與機器學習等需要疊代的MapReduce的算法更容易實作。

Spark的原理

Spark的特點

1.計算速度快

Spark将每個任務都構造成一個DAG(Directed Acyclic Graph,有向無環圖)來執行,其内部計算過程基于彈性分布式資料集在記憶體中對資料進行疊代計算,是以其運作效率很高。官方資料表明,如果計算的資料從磁盤上讀取,則Spark的速度是Hadoop MapReduce的10倍以上;如果計算的資料從記憶體中讀取,則Spark的計算速度是Hadoop MapReduce的100倍以上。

2.易于使用

Spark提供了80多個進階運算操作,支援豐富的算子,開發人員隻需要按照其封裝好的API實作即可,不需要關心Spark的底層架構。同時,Spark支援多種語言開發,包括Java、Scala、Python。

3.通用大資料架構

Spark提供了多種類型的開發庫,包括Spark Core、Spark SQL(即時查詢)、Spark Streaming(實時流處理)、Spark MLlib、GraphX(圖計算),使得開發人員可以在同一個應用程式中無縫組合使用這些庫,而不用像傳統的大資料方案那樣将離線任務放在Hadoop MapReduce上運作,将實時流計算任務放在Storm上運作,并維護多個平台。Spark提供了從實時流計算、MapReduce離線計算、SQL計算、機器學習到圖計算的一站式整體解決方案。

4.支援多種資料總管

Spark支援單機、Standalone、Hadoop YARN、Apache Mesos等多種資料總管,使用者可以根據現有的大資料平台靈活地選擇運作模式。

5.Spark生态圈豐富

Spark生态圈以Spark Core為核心,支援從HDFS、S3、HBase等多種持久化層讀取資料。同時,Spark支援以Hadoop YARN、Apache Mesos和Standalone為資料總管排程Job,完成Spark應用程式的計算。Spark應用程式可以基于不同的元件實作,如Spark Shell、Spark Submit、Spark Streaming、Spark SQL、BlinkDB(權衡查詢)、MLlib/MLbase(機器學習)、GraphX和SparkR(數學計算)等。Spark生态圈已經從大資料計算和資料挖掘擴充到機器學習、自然語言處理和語音識别等領域。

Spark的子產品

Spark基于Spark Core建立了Spark SQL、Spark Streaming、MLlib、GraphX、SparkR核心元件,基于不同元件可以實作不同的計算任務,這些計算任務的運作模式有:本地模式、獨立模式、Mesos模式、YARN模式。Spark任務的計算可以從HDFS、S3、Hypertable、HBase或Cassandra等多種資料源中存取資料。

Spark原理及應用

Spark Core

Spark的核心功能實作包括基礎設施、存儲系統、排程系統和計算引擎。

(1)基礎設施:Spark中有很多基礎設施,這些基礎設施被Spark中的各種元件廣泛使用,包括SparkConf(配置資訊)、SparkContext(Spark上下文)、Spark RPC(遠端過程調用)、ListenerBus(事件總線)、MetricsSystem(度量系統)、SparkEnv(環境變量)等。

①SparkConf:SparkConf用于定義Spark應用程式的配置資訊。

②SparkContext:SparkContext是Spark應用程式的入口,Spark應用程式的送出與執行離不開SparkContext。SparkContext隐藏了網絡通信、分布式部署、消息通信、存儲體系、計算引擎、度量系統、檔案服務等内容,開發人員隻需要使用SparkContext提供的API完成功能開發即可。

③Spark RPC:Spark元件之間的網絡通信依賴Spark RPC架構。Spark RPC基于Netty實作,使用中分同步和異步兩種方式。

④ListenerBus:ListenerBus即事件總線,主要用于SparkContext内部各元件之間的事件互動。ListenerBus屬于監聽者模式,采用異步調用的方式實作。

⑤MetricsSystem:MetricsSystem為度量系統,用于整個Spark叢集中各個元件運作狀态的監控。度量系統由多種度量源和多種度量輸出組成。

⑥SparkEnv:SparkEnv為Spark的執行環境,SparkEnv内部封裝了RpcEnv(RPC環境)、序列化管理器、BroadcastManager(廣播管理器)、MapOutputTracker(Map任務輸出跟蹤器)、存儲系統、MetricsSystem(度量系統)、OutputCommitCoordinator(輸出送出協調器)等Spark程式運作所需要的基礎環境元件。

(2)存儲系統:Spark存儲系統用于管理Spark運作過程中依賴的資料的存儲方式和存儲位置。Spark存儲系統首先考慮在各節點的記憶體中存儲資料,當記憶體不足時會将資料存儲到磁盤上,這種記憶體優先的存儲政策使得Spark的計算性能無論在實時流計算還是在批量計算的場景下都表現很好。Spark的記憶體存儲空間和執行存儲空間之間的邊界可以靈活控制。

(3)排程系統:Spark排程系統主要由DAGScheduler和TaskScheduler組成。DAGScheduler負責建立Job、将DAG中的RDD劃分到不同Stage中、為Stage建立對應的Task、批量送出Task等。TaskScheduler負責按照FIFO(First Input First Output,先進先出)或者FAIR(公平排程)等排程算法對Task進行批量排程。

(4)計算引擎:計算引擎由記憶體管理器、任務管理器、Task、Shuffle管理器等組成。

Spark SQL

Spark SQL提供基于SQL的資料處理方式,使得分布式資料的處理變得更加簡單。此外,Spark提供了對Hive SQL的支援。

Spark Streaming

Spark Streaming提供流計算能力,支援Kafka、Flume、Kinesis和TCP等多種流式資料源。此外,Spark Streaming提供了基于時間視窗的批量流操作,用于對一定時間周期内的流資料執行批量處理。

GraphX

GraphX用于分布式圖計算。通過Pregel提供的API可以快速解決圖計算中的常見問題。

Spark MLlib

Spark MLlib為Spark的機器學習庫。Spark MLlib提供了統計、分類、回歸等多種機器學習算法的實作。其簡單易用的API接口降低了機器學習的門檻。

SparkR

SparkR是一個R語言包,提供了輕量級的基于R語言使用Spark的方式。SparkR實作了分布式的資料框,支援類似查詢、過濾及聚合的操作(類似R語言中的資料框包dplyr),使得基于R語言能夠更友善地處理大規模的資料集。同時,SparkR支援基于Spark MLlib進行機器學習。

Spark的運作原理

Spark的運作模式

Spark的運作模式主要包括Local模式、Standalone模式、On YARN、On Mesos和運作在AWS等公有雲平台上

Spark原理及應用

Spark的叢集架構

Spark的叢集架構主要由Cluster Manager(管理器)、Worker(工作節點)、Executor(執行器)、Driver(驅動器)、Application(應用程式)5部分組成

Spark原理及應用

(1)Cluster Manager:Spark叢集管理器,主要用于整個叢集資源的管理和配置設定。根據部署模式的不同,可以分為Local、Standalone、YARN、Mesos和AWS。

(2)Worker:Spark的工作節點,用于執行送出的任務。Worker的工作職責如下。

①通過注冊機制向Cluster Manager彙報自身的CPU和記憶體等資源使用資訊。

②在Master的訓示下建立并啟動Executor,Executor是真正的計算單元。

③将資源和任務進一步配置設定給Executor并運作。

④同步資源資訊和Executor狀态資訊給Cluster Manager。

(3)Executor:真正執行計算任務的元件,是某個Application運作在Worker上的一個程序。該程序負責Task的運作并且将運作的結果資料儲存到記憶體或磁盤上。Task是運作在Executor上的任務單元,Spark應用程式最終被劃分為經過優化的多個Task的集合。

(4)Driver:Application的驅動程式,可以了解為驅動程式運作中的main()函數,Driver在運作過程中會建立SparkContext。Application通過Driver與Cluster Manager和Executor進行通信。Driver可以運作在Application上,也可以由Application送出給Cluster Manager,再由Cluster Manager安排Worker運作。Driver的主要職責如下。

①運作應用程式的main()函數。

②建立SparkContext。

③劃分RDD并生成DAG。

④建構Job并将每個Job都拆分為多個Task,這些Task的集合被稱為Stage。各個Stage互相獨立,由于Stage由多個Task構成,是以也被稱為Task Set。Job是由多個Task建構的并行計算任務,具體為Spark中的Action操作(例如collect、save等)。

⑤與Spark中的其他元件進行資源協調。

⑥生成并發送Task到Executor。

(5)Application:基于Spark API編寫的應用程式,其中包括實作Driver功能的代碼和在叢集中多個節點上運作的Executor代碼。Application通過Spark API建立RDD、對RDD進行轉換、建立DAG、通過Driver将Application注冊到Cluster Manager。

Spark的運作流程

Spark的資料計算主要通過RDD的疊代完成,RDD是彈性分布式資料集,可以看作是對各種資料計算模型的統一抽象。在RDD的疊代計算過程中,其資料被分為多個分區并行計算,分區數量取決于應用程式設定的Partition數量,每個分區的資料都隻會在一個Task上計算。所有分區可以在多個機器節點的Executor上并行執行。

Spark原理及應用

(1)建立RDD對象,計算RDD之間的依賴關系,并将RDD生成一個DAG。

(2)DAGScheduler将DAG劃分為多個Stage,并将Stage對應的Task Set送出到叢集管理中心。劃分Stage的一個主要依據是目前計算因子的輸入是否确定。如果确定,則将其分到同一個Stage中,避免多個Stage之間傳遞消息産生的系統資源開銷。

(3)TaskScheduler通過叢集管理中心為每個Task都申請系統資源,并将Task送出到Worker。

(4)Worker的Executor執行具體的Task。

Spark的使用

Spark被廣泛應用于大資料行業的各個領域,包括實時流計算、曆史資料分析、機器學習、圖計算等。本節将從Spark的安裝、Spark RDD的使用、Spark Streaming的使用和Spark SQL的使用等方面來介紹Spark各個元件的特性。

Spark的安裝

這裡以Linux系統單機版為例介紹Spark的安裝方式,具體步驟如下。

(1)到官網下載下傳最新的Spark安裝包,注意這裡下載下傳Spark編譯好的帶Hadoop的版本,即spark-2.4.3-bin-hadoop2.7.tgz。

Spark原理及應用

(2)将安裝包複制到安裝目錄下,執行以下指令解壓Spark安裝包。

Spark原理及應用

(3)配置系統的Spark環境變量。

①執行以下指令編譯profile檔案。

Spark原理及應用

②在profile檔案的最後加上以下内容來設定Spark環境變量。

Spark原理及應用

③鍵盤按下“Esc”,輸入冒号“:”加“wq”儲存,退出vim編輯模式。

④執行以下指令使檔案修改立刻生效。

Spark原理及應用

(4)建立spark-env.sh配置檔案。

①執行以下指令,進入Spark的conf目錄。

Spark原理及應用

②執行以下指令,根據Spark提供的spark-env.sh.template模闆檔案複制一份新的名為spark-env.sh的配置檔案。

Spark原理及應用

③執行以下指令打開spark-env.sh檔案,輸入vim進入編輯模式。

Spark原理及應用

④在spark-env.sh檔案的最後加上以下内容。

Spark原理及應用
Spark原理及應用

⑤鍵盤按下“Esc”,輸入冒号“:”加“wq”儲存,退出vim編輯模式。

(5)建立slaves配置檔案。

①執行以下指令,進入Spark的conf目錄。

Spark原理及應用

②執行以下指令,根據Spark提供的slaves.template模闆檔案複制一份新的名為slaves的配置檔案。

Spark原理及應用

③執行以下指令打開slaves檔案,輸入vim進入編輯模式。

Spark原理及應用

④在slaves檔案的最後加上以下内容,表示在Localhost伺服器上有一個Slave角色。

Spark原理及應用

⑤鍵盤按下“Esc”,輸入冒号“:”加“wq”儲存,退出vim編輯模式。

(6)啟動Spark。

①執行以下指令,進入Spark的sbin目錄。

Spark原理及應用

②執行以下指令,啟動Spark,在啟動過程中會要求輸入Linux的登入密碼,按照提示輸入即可。

Spark原理及應用

③在啟動後,控制台會列印出以下日志,提示日志檔案的目錄。

Spark原理及應用

④檢視Master日志:在Spark啟動後會看到Master的核心日志如下。

Spark原理及應用

⑤檢視Worker日志:在Spark啟動後會看到Worker的核心日志如下。

Spark原理及應用

⑥Jps檢視程序。

Spark原理及應用

⑦在浏覽器位址欄中輸入http://192.168.2.103:8080檢視Master頁面。注意,192.168.2.103是筆者目前的伺服器IP位址

Spark原理及應用

⑧在浏覽器位址欄中輸入http://192.168.2.103:8081檢視Worker頁面

Spark原理及應用

(7)執行Spark預設example:進入Spark安裝目錄,執行以下指令,啟動Spark示例中的SparkPi任務。

Spark原理及應用

Spark RDD的使用

1.RDD的介紹

RDD是Spark中最基本的資料抽象,代表一個不可變、可分區、元素可并行計算的集合。RDD具有自動容錯、位置感覺性排程和可伸縮等特點。RDD允許使用者在執行多個查詢時顯式地将資料集緩存在記憶體中,後續查詢能夠重用該資料集,這極大地提升了查詢效率。

2.RDD的核心結構及概念

(1)Partition:RDD内部的資料集在邏輯上和實體上都被劃分為多個分區(Partition)以提高運作的效率,分區數量決定了計算的并行度,每一個分區内的資料都在一個單獨的任務中被執行,如果在計算過程中沒有指定分區數,那麼Spark會采用預設分區數量。預設分區數量為程式運作配置設定到的CPU核數。

(2)Partitioner:Partitioner是RDD的分區函數。分區函數不但決定了RDD本身的分區數量,也決定了其父RDD Shuffle輸出時的分區數量。Spark實作了基于Hash(HashPartitioner)和基于範圍(RangePartitioner)的兩種分區函數。

注意:隻有對于Key-Value的RDD才會有Partitioner,而非Key-Value的RDD的Parititioner值是None。

(3)RDD的依賴關系:RDD的每次轉換都會生成一個新的RDD,是以RDD之間會有前後依賴關系。當在計算過程中出現異常情況導緻部分分區資料丢失時,Spark可以通過依賴關系從父RDD中重新計算丢失的分區資料,而不需要對RDD上的所有分區全部重新計算。RDD的依賴分為窄依賴和寬依賴。

◎窄依賴:如果父RDD的每個分區最多隻能被子RDD的一個分區使用,則稱之為窄依賴。

◎寬依賴:如果父RDD的每個分區都可以被子RDD的多個分區使用,則稱之為寬依賴。

窄依賴的每個子RDD的Partition的生成操作都是可以并行的,而寬依賴則需要所有父Partition Shuffle結果完成後再被執行

Spark原理及應用

4)Stage:Stage是由一組RDD組成的可進行優化的執行計劃。如果RDD的依賴關系為窄依賴,則可放在同一個Stage中運作;若RDD的依賴關系為寬依賴,則要劃分到不同Stage中。這樣,當Spark執行作業時,會按照Stage劃分不同的RDD,生成一個完整的最優的執行計劃,使每個Stage内的RDD都盡可能在各個節點上并行地被執行

Spark原理及應用

(5)PreferredLocation:PreferredLocation是一個用于存儲每個Partition的優先位置的清單。對于每個HDFS檔案來說,這個清單儲存的是每個Partition所在的塊的位置,也就是該HDFS檔案的“劃分點”。

(6)CheckPoint:CheckPoint是Spark提供的一種基于快照的緩存機制。當需要計算的RDD過多時,為了避免任務執行失敗後重新計算之前的RDD,可以對RDD做快照(CheckPoint)處理,檢查RDD是否被計算,并将結果持久化到磁盤或HDFS上。此外,Spark提供另一種緩存機制Cache,Cache緩存資料由Executor管理,當Executor消失時,Cache緩存的資料将被清除,而CheckPoint将資料儲存到永久性磁盤或HDFS,當計算出現運作錯誤時,Job可以從CheckPoint點繼續計算。

3.建立一個RDD應用

在建立RDD應用前,首先需要建立一個Spark項目,下面以Java基于Maven的項目為例介紹RDD的建立。

(1)建構Maven項目。按照編譯器提示建構一個簡單的Maven項目,并打開pom.xml檔案,将Spark依賴加入項目中。需要注意的是,Spark不再對Java 1.7進行維護,是以必須指明Maven源碼編譯和Target編譯均使用Java 1.8。

在pom.xml檔案中加入Maven的編輯插件,具體代碼如下。

Spark原理及應用

(2)建立RDD類。

通過上述代碼已經建構了一個簡單的Spark應用項目,在項目的Java目錄下建立一個名為RDDSimple的Java類,并在類中輸入下面代碼來建構一個簡單的RDD。

Spark原理及應用

上述代碼定義了一個簡單的RDD,具體過程為:首先定義SparkConf執行個體conf,然後調用sc.parallelize方法将一個數組轉換為一組名為distDatardd的Spark RDD,最後通過調用distDatardd.reduce方法對distDatardd進行操作。上述代碼實作了對RDD中的資料進行加和操作然後輸出。

利用JavaSparkContext的parallelize方法将已經存在的一個集合轉換為RDD,集合中的資料将被複制到RDD并參與并行計算。并行集合的一個重要參數是分區數量(将資料集切割為多份)。Spark将為叢集的每個分區都運作一個任務。一般希望叢集中的每個CPU都有2~4個分區,這樣既能良好地利用CPU,又不至于任務太多導緻任務阻塞等待。通常Spark會嘗試根據叢集的CPU核數自動設定分區數量,也可以手動設定分區大小。設定代碼如下。

Spark原理及應用

(3)打包和作業送出

在項目的根目錄下輸入如下指令對項目進行打包,打包後的程式在Target目錄下。

Spark原理及應用

Spark的作業送出很簡單,隻需要調用spark-submit指令指定主函數入口類并将編譯好的JAR包送出到叢集即可,叢集會自動為程式配置設定資源并執行。在送出的時候,首先需要通過--class指定Spark程式的入口,然後通過--master指定送出給哪個叢集,最後跟上JAR包路徑即可。送出指令如下。

Spark原理及應用

在作業送出後,通過控制台能看到程式輸出如下結果。

Spark原理及應用

4.利用外部資料集生成RDD

Spark可以從Hadoop或者其他外部存儲系統建立RDD,包括本地檔案系統、HDFS、Cassandra、HBase、S3等。Spark RDD支援多種檔案格式,包括文本檔案、SequenceFiles、JSON檔案和任何其他Hadoop InputFormat。通過SparkContext的textFile方法讀取文本檔案建立RDD的代碼如下。

Spark原理及應用

在上述代碼中,textFile()方法的URI參數可以是本地檔案、本地路徑、HDFS路徑、S3路徑等。如果該參數的輸入值是具體的檔案,則Spark會讀取參數中的檔案;如果是路徑,則Spark會讀取該路徑下的所有檔案,并最終将其作為資料源加載到記憶體生成對應的RDD。

Spark加載資料的注意事項包括以下幾個方面。

(1)如果通路的是本地檔案路徑,則必須可以在工作節點上以相同路徑通路該檔案。一般做法是将資料檔案遠端複制到所有工作節點對應的路徑下或使用共享檔案系統實作。

(2)除了支援基于檔案名的方式加載檔案,Spark還支援基于目錄、壓縮檔案和通配符的方式加載檔案。例如,可以使用textFile("/my/directory")表示加載“/my/directory”路徑下的所有檔案;使用textFile("/my/directory/*.txt")表示加載“/my/directory”路徑下所有以.txt為字尾名的檔案;使用textFile("/my/directory/*.gz")表示加載并解壓“/my/directory”目錄下所有以.gz為字尾名的檔案。

(3)Spark加載檔案時可以設定分區數。Spark在預設情況下為每個檔案塊都建立一個分區(HDFS中預設檔案塊大小為128MB),也可以通過傳遞更大值來設定更多分區。需要注意的是,分區數不能小于檔案塊的數量。

(4)除了加載文本檔案,Spark的Java API還支援其他多種資料格式。

①wholeTextFiles:JavaSparkContext.wholeTextFiles允許用戶端程式讀取包含多個小文本檔案的目錄,并将每個檔案都以<檔案名,内容>的鍵值對傳回。該方法與textFile不同,textFile是将每個檔案中的每行都作為一條記錄傳回的。

②SequenceFiles:加載SequenceFiles需要使用SparkContext的sequenceFile[Key,Value]方法實作,其中,Key和Value是檔案中鍵和值的類型。這些鍵值對對應的是Hadoop的Writable接口的子類,比如IntWritable和Text。

③Hadoop InputFormat:對于Hadoop InputFormat類型的資料,可以使用JavaSparkContext.hadoopRDD方法或JavaSparkContext.newAPIHadoopRDD加載并生成RDD。

④檔案儲存和序列化:JavaRDD.saveAsObjectFile方法和JavaSparkContext.objectFile方法采用Java預設序列化的方式将資料序列化并儲存到RDD。同時,使用者可以在儲存資料的時候,使用其他更高效的序列化方法(例如Avro、Kyro等)。

5.RDD的轉換和操作

RDD支援兩種類型的操作:轉換(Transformation)和操作(Action)。轉換指從現有RDD建立新RDD,操作指在RDD上運作計算并将計算結果傳回驅動程式。例如,map是一個轉換:它将RDD的所有元素都調用map函數進行轉換處理,傳回一個表示轉換結果的新RDD;reduce是一個操作:它将RDD的所有元素都調用reduce函數進行聚合操作,将最終計算結果傳回驅動程式。表示聚合處理的函數還有reduceByKey、reduceBy等。

Spark中的所有轉換(Transformation)都是懶加載的,即不會立即執行轉換操作,而是先記錄RDD之間的轉換關系,僅當觸發操作(Action)時才會執行RDD的轉換操作,并将計算結果傳回驅動程式。這種懶加載的設計使Spark能夠更加高效地運作。具體代碼如下。

Spark原理及應用

上述代碼通過sc.textFile()方法定義了名為lines的RDD,此時檔案并沒有加載到記憶體中,僅僅是指向檔案的位置。通過lines.map()方法定義了名為lineLengths的map轉換,同樣由于懶加載機制,lineLengths不會立即執行計算。最終,當運作reduce操作時,Spark将RDD計算分解為不同Stage在不同機器上運作任務,每台機器都運作部分map資料集并将運作結果儲存為本地的reduce,在各個節點都運算完成後将reduce結果傳回driver程式并進行結果的合并。

6.RDD持久化的概念、級别和原則

Spark可以跨節點在記憶體中持久化RDD。當持久化RDD時,每個節點都會在記憶體中緩存計算後的分區資料,當其他操作需要使用該RDD時,可以直接重用該緩存資料,這使得之後的RDD計算速度更快(通常超過10倍)。緩存是疊代計算和互動式計算的關鍵。

應用程式可以使用persist()或cache()标記要緩存的RDD,當調用操作(Action)執行計算時,計算結果将被緩存在節點的記憶體中。Spark緩存具有容錯性,如果RDD的某個分區丢失,則該RDD将被自動重新計算。

每個持久化RDD都可以使用不同存儲級别進行存儲,Spark允許将資料集存儲在磁盤上或記憶體中。Spark将需要緩存的資料序列化為Java對象(序列化可以節省磁盤或記憶體空間),然後跨節點複制到其他節點上,以便其他節點重用該資料。Spark中緩存持久化級别是通過StorageLevel來設定的。具體代碼如下。

(1)Spark持久化的級别

①MEMORY_ONLY:使用未經過序列化的Java對象在記憶體中存儲RDD。當記憶體不夠時,将不會進行持久化;當下次需要該RDD時,再從源頭處重新計算。該政策是預設的持久化政策,當使用cache()時,使用的是該持久化政策。

②MEMORY_AND_DISK:使用未經過序列化的Java對象存儲RDD,優先嘗試将RDD儲存在記憶體中。如果記憶體不夠,則會将RDD寫入磁盤檔案;當下次需要該RDD時,從持久化的磁盤檔案中讀取該RDD即可。

③MEMORY_ONLY_SER:MEMORY_ONLY_SER的含義與MEMORY_ONLY類似,唯一差別是MEMORY_ONLY_SER會将RDD中的資料進行序列化。在序列化過程中,RDD的每個Partition都将會被序列化成一個位元組數組,這種方式更加節省記憶體,進而避免持久化的RDD占用過多記憶體導緻JVM頻繁GC。

④MEMORY_AND_DISK_SER:MEMORY_AND_DISK_SER的含義與MEMORY_AND_DISK類似。唯一差別是MEMORY_AND_DISK_SER會将RDD中的資料進行序列化。在序列化過程中,RDD的每個Partition都會被序列化成一個位元組數組。這種方式更加節省記憶體,進而避免持久化的RDD占用過多記憶體導緻頻繁GC。

⑤DISK_ONLY:使用未序列化的Java對象将RDD全部寫入磁盤檔案。

⑥MEMORY_ONLY_2和MEMORY_AND_DISK_2:對于上述任意一種持久化政策,如果加上字尾_2,代表的是将每個持久化的資料都複制一份副本,并将副本儲存到其他節點上。這種基于副本的持久化機制主要用于容錯。假如某個節點挂掉,節點的記憶體或磁盤中的持久化資料丢失了,那麼後續對RDD計算時還可以使用該資料在其他節點上的副本。如果沒有副本,則隻能将這些資料從頭重新計算一遍。

⑦OFF_HEAP:OFF_HEAP與MEMORY_ONLY_SER類似,但OFF_HEAP将資料存儲在堆外記憶體中。該參數需要Spark啟用堆外記憶體。

(2)持久化的原則

Spark提供了豐富的存儲級别,旨在通過不同存儲級别的設定實作記憶體和CPU的最佳使用,具體開發中該如何選擇持久化方案呢?以下為Spark官方提供的緩存持久化的選擇流程。

①如果RDD在預設存儲級别(MEMORY_ONLY)下運作良好,則建議使用MEMORY_ONLY。該級别是CPU效率最高的類型,基于CPU快速計算可以使RDD上的操作盡可能快地運作。

②如果系統顯示記憶體使用過高,則嘗試使用MEMORY_ONLY_SER,并選擇更快速的序列化庫,以加快序列化時間和節省對象的存儲空間。

③如果要快速恢複故障,則建議使用副本存儲級别。其他存儲級别需要通過重新計算丢失的資料來保障緩存的完整性,而副本存儲級别可以在其緩存對應的副本節點上直接執行任務,不用等待重新計算丢失的分區資料。

(3)删除持久化緩存

Spark會自動監視每個節點上的緩存使用情況,并以LRU方式删除舊的資料分區。如果想手動删除RDD,則可通過RDD.unpersist()方法完成。

繼續閱讀