一、彈性分布式資料集(RDD)
- 1、RDD介紹
簡介:(存放資料的集合,資料是分布式存儲的,可以儲存在記憶體或磁盤中。是spark中最基本的資料抽象)
RDD(Resilient Distributed Dataset)彈性分布式資料集。
Resilient: 彈性的(可儲存在記憶體或磁盤)
Distributed:資料是分布式存儲的,便于分布式計算
Dataset: 一個集合,存放資料的
-
2、RDD五大屬性
(1)分區清單 【存資料集的,目前rdd的上一個】
(2)作用分區中的函數
(3)RDD之前的依賴關系 :每次一個rdd通過函數操作會生成新的rdd 【spark容錯機制的依據:會根據依賴關系進行資料恢複】
(4)分區函數 * 隻有k-v類型的RDD 才有分區函數(必須産生shuffle) (2種: Range和Hash)
* 決定了資料的來源和資料處理後的去向
(5)一個清單,存儲每個Partition的最優位置
【資料本地性和資料最優 ,優先配置設定給有資料的節點上 進行計算】 —— ——移動資料 不如 移動計算
- 3、為什麼會産生RDD
解決了mapReduce 不能 循環式的資料流模型。可以把RDD的結果資料進行緩存,友善多次重用 可疊代計算,避免重複計算。提示IO操作。
- 4、RDD在Spark中的作用和地位
RDD是Spark中的底層核心,Spark是這個抽象方法的實作。 疊代運算 和 互動式
- 5、建立RDD(3種)
(1)通過sparkContext調用parallelize方法,需要一個已經存在的Scala集合建立 val rdd1 = sc.parallelize(集合/數組) (2)由外部存儲系統的檔案建立 val rdd2=sc.textFile("xxx") (3)已有的RDD經過算子轉換生成新的RDD val rdd3=rdd2.flatMap(_.split(" "))
- 6、RDD算子
- 算子分類(2類)
- Transformation(轉換): 一個rdd轉換成新的rdd;所有轉換都是延遲加載,是懶加載,隻記錄轉換,隻有遇到Action再執行。
- Action (動作): 它會觸發整個任務真正運作
-
Transformation 常見算子:
map、filter、flatMap、mapPartitions(這四個面試常問到)、mapPartitionsWithIndex、union(并集)、intersection(交集)、distinct(去重)、groupByKey、reduceByKey(聚合)、sortBy、sortByKey、join(相同key提取,v依次排在後面)、cogroup、coalesce、repartition、repartitionAndSortWithKey
mapParitions和map的差別 :一個隻作用于分區,一個作用于每個元素 ???
-
Action 常見算子:
reduce、collect、count、first、take、takeOrdered、saveAsTextFile、saveAsObjectFile、countByKey、foreach、foreachPartition
- 算子分類(2類)
** RDD常用算子操作
1)map、filter
2)flatMap
3)交集、并集
4)join、groupByKey 差別
5)cogroup 與groupByKey差別
6)reduce 聚合
7)reduceByKey、sortByKey
8)repartition、coalesce (分區)repartition 改變分區數後,最開始的rdd1 分區後不會改變的。而是生成一個新的rdd1的分區數。
差別:前者可以添加或者減少分區數。後者隻能減少,不能增加。
9)檢視分區數 調用 .partitions.size
- 7、RDD的依賴關系(寬、窄依賴;血統)
-
RDD的依賴 —— 我這裡寫的不太準确
窄依賴 —— 每一個父RDD的Partition最多被子RDD的一個Partition使用 【獨生子】
寬依賴 —— 多個子RDD的Partition會依賴同一個父RDD的Partition 【超生】
-
Lineage(血統)
記錄RDD的中繼資料資訊和轉換行為。如果目前RDD的某些分區資料丢失後,可以通過血統來恢複丢失的分區。血統應該存在Application裡面
-
- 8、RDD的緩存
- 把rdd1的資料進行緩存,以便之後對其能快速讀取。
- RDD的緩存方式
- persist方法: 可以設定豐富的存儲級别。 如:設定 序列化、記憶體、磁盤、幾份 等
- cache方法: 預設儲存在記憶體中,本質調用的是persist的預設方法 即儲存在記憶體中。
- 對需要進行緩存的RDD調用persist和cache,不會立即執行,而是 有action算子時候才真正執行緩存操作。
- spark有自帶的機制,如果資料量太小,沒有強制設定緩存在記憶體 而是 正常設定緩存在磁盤 但是依然會存在記憶體的。
- 9、DAG(有向無環圖)
簡介:原始的RDD通過一系列的轉換就形成了DAG
為什麼要劃分Stage:就是讓每一個Stage中的多任務可以并行的運作,互不幹擾,劃分給對應的task執行。
劃分Stage流程:
1、從最後一個rdd開始往前推,首先把目前rdd加入到一個stage中,這個stage就是最後一個stage
2、如果遇到窄依賴,就把目前rdd加入本stage中,如果遇到寬依賴,就從寬依賴切開,這樣最後一個stage就已經結束了。
3、從切開的地方 重複執行1、2 規則, 繼續往前推,依次推到最前面的rdd。
* 寬依賴是劃分Stage的依據
* 每一個stage裡面包含一組task,這些task都儲存在taskSet集合裡面,這些task是可以并行計算的。
stage與stage之間存在依賴關系,後面的要等它前面的都完成後才可以開始計算。
-
10、Spark任務排程
流程圖
DAGScheduler
TaskScheduler
二、checkpoint(檢查點) (RDD容錯機制之一)
* checkpoint 介紹: 為了更可靠的持久化資料,可以放在hdfs目錄裡,借用了hdfs的高容錯、高可靠。
* checkpoint與persist、cache的差別:
* persist和cache對資料持久化後,之前的血統沒有改變,如果資料丢失,是通過血統重新得到。
* checkpoint在持久化後,依賴關系會改變(就不能再通過血統得到了)。如果持久化的資料丢了,不能重新計算恢複得到,隻能再重新送出了。但是可能會浪費資源,因為新開了一個job。
* persist隻能在目前Linux下緩存,checkpoint是可以放hdfs上。
* 資料丢失恢複順序: 優先找cache;沒有再找checkpoint;還沒有再 重新計算 (從前面的rdd開始重新算)。
checkpoint 的使用和原理
* 1.sparkContext.setCheckpointDir(“hdfs上目錄”)
* 2.針對于需要緩存的rdd資料調用
* rdd.checkpoint
* 調用算法後不是立馬序列化而是觸發action算子才執行
* 3、在運作的時候首先會先執行action這樣一個job,執行完成之後,會單獨開啟一個新的job來執行checkpoint
在RDD所處的job運作結束後,會啟動一個單獨的job來将checkpoint過的資料寫入之前設定的檔案系統持久化,進行高可用。
三、Spark運作架構
* Spark運作基本流程:
各個RDD之間存在着依賴關系,這些依賴關系就形成有向無環圖DAG;DAGScheduler對這些依賴關系形成的DAG進行Stage劃分,劃分的規則很簡單,從後往前回溯,遇到窄依賴加入本stage,遇見寬依賴進行Stage切分。完成了Stage的劃分;DAGScheduler基于每個Stage生成TaskSet,并将TaskSet送出給TaskScheduler;TaskScheduler負責具體的task排程(推測機制、本地性等),最後在Worker節點上啟動task。