一、RDD
1.1 什麼是RDD?
RDD(Resilient Distributed Dataset)叫做彈性分布式資料集,是Spark計算過程的核心,是spark計算過程中的瞬時結果,下一個RDD依賴于上一個RDD。它代表一個不可變、可分區、裡面的元素可并行計算的集合。
資料集就是由許多資料組成的集合了
RDD本身并不是分布式的,裡面的資料是分布式的。
那麼,彈性是什麼意思呢?彈性有哪些表現呢?
先看一下resilient這個單詞的英文解釋:adj. recovering readily from adversity, depression, or the like 容易地從逆境或類似的情況中恢複過來。《Learning Spark:Lightning-fast Data Analysis》一書中解釋“彈性”是指在任何時候都能進行重算。這樣當叢集中的一台機器挂掉而導緻存儲在其上的RDD丢失後,Spark還可以重新計算出這部分的分區的資料。但使用者感覺不到這部分的内容丢失過。這樣RDD資料集就像塊帶有彈性的海綿一樣,不管怎樣擠壓(分區遭到破壞)都是完整的。
RDD的彈性展現在:
1.自動進行記憶體和磁盤切換
2.基于lineage的高效容錯
3.task如果失敗會特定次數的重試
4.stage如果失敗會自動進行特定次數的重試,而且隻會隻計算失敗的分片
5.checkpoint【每次對RDD操作都會産生新的RDD,如果鍊條比較長,計算比較笨重,就把資料放在硬碟中】和persist 【記憶體或磁盤中對資料進行複用】(檢查點、持久化)
6.資料排程彈性:DAG TASK 和資源管理無關
7.資料分片的高度彈性repartion
1.2 五大特性
可以看參考一下spark源碼中的注釋:
/**
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
* partitioned collection of elements that can be operated on in parallel. This class contains the
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
* [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
* pairs, such as `groupByKey` and `join`;
* [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
* Doubles; and
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
* can be saved as SequenceFiles.
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)])
* through implicit.
*
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
*
* All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
* reading data from a new storage system) by overriding these functions. Please refer to the
* <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf" target="_blank" rel="external nofollow" >Spark paper</a>
* for more details on RDD internals.
*/
1.一組分區的集合
2.一個函數作用于每個分區
3.依賴于其他RDDs
4.可以重新分區
5.資料本地性:每個分區的資料優先在本地計算,但并不代表不能在别的機器上計算。
二、算子
算子是RDD中定義的函數,可以對RDD中的資料進行轉換和操作。
2.1 算子分類
● 轉換(transformation)算子 :這種變換并不觸發送出作業,完成作業中間過程處理。Transformation 操作是延遲計算的,也就是說從一個RDD 轉換生成另一個 RDD 的轉換操作不是馬上執行,需要等到有 Action 操作的時候才會真正觸發運算。
轉換算子又可以分為建立算子和緩存算子
● 行動(action)算子 :這類算子會觸發 SparkContext 送出 Job 作業。Action 算子會觸發 Spark 送出作業(Job),并将資料輸出 Spark系統。
2.2 常用算子
轉換算子:
cartesian:兩個RDD進行笛卡爾積合并
coalesce:對RDD重新進行分區,第一個參數為分區數;第二個參數為是否進行shuffle操作,預設值為false,當shuffle=false時,不能增加分區數,但不會報錯,分區個數還是原來的
cogroup:對多個RDD中的Key-Value元素按key值進行合并,每個RDD中相同key中的元素分别聚合成一個集合。
distinct:将原始RDD中重複出現的元素進行過濾,傳回一個新生成的RDD。即原RDD中每個元素在新生成的RDD中隻出現一次
filter:對元素進行過濾,對每個元素應用f函數,傳回值為true的元素在RDD中保留,傳回為false的将被過濾掉
map:和集合中的map作用相同
flatMap:和集合中的flatMap作用相同
flatMapValues:同基本轉換操作中的flatMap,隻不過是針對[K,V]中的V值進行flatMap操作
groupByKey:将Key-Value型RDD中的元素按照Key值進行彙聚,Key值相同的Value值會合并為一個序列。和cogroup差別在于cogroup可以合并多個RDD,而groupByKey則針對一個RDD
intersection:傳回兩個RDD的交集
keys:對Key-Value型RDD擷取所有的key
mapValues:同基本轉換操作中的map,針對[K,V]中的V值進行map操作
reduceByKey:将Key-Value型RDD按照key值進行聚合操作,生成新的RDD。
sample:對RDD進行抽樣,其中參數withReplacement為true時表示抽樣之後還放回,可以被多次抽樣,false表示不放回;fraction表示抽樣比例;seed為随機數種子,比如目前時間戳
sortByKey:對Key-Value型RDD按照key值進行升序或降序排列
subtract:差集,即傳回在oneRDD中出現,并且不在otherRDD中出現的元素,不去重
subtractByKey:對Key-Value型RDD按照key值做差集,即傳回key在oneRDD中出現,并且不在otherRDD中出現的元素,不去重。
union:合并兩個RDD,不去重,要求兩個RDD中的元素類型一緻
values:對Key-Value型RDD擷取所有的value
zip:用于将兩個RDD組合成Key-Value形式的RDD,這裡預設兩個RDD的partition數量以及元素數量都相同,否則會抛出異常。
行動算子:
collectAsMap:如果RDD中同一個Key中存在多個Value,那麼後面的Value将會把前面的Value覆寫,最終得到的結果就是Key唯一,而且對應一個Value
count:傳回RDD中的元素數量
countByKey:用于統計Key-Value型RDD中每個Key的數量
countByValue:用于統計Key-Value型RDD中每個元素的個數
first:傳回RDD中的第一個元素,不排序
lookUp:用于Key-Value類型的RDD,指定K值,傳回RDD中該Key對應的所有Value值
reduce:根據映射函數f對RDD中的元素進行二進制計算,傳回計算結果
saveAsTextFile:将資料集的元素,以textfile的形式儲存到本地檔案系統hdfs或者任何其他hadoop支援的檔案系統,spark将會調用每個元素的toString方法,并将它轉換為檔案中的一行文本
saveAsObjectFile:使用java的序列化方法儲存到本地檔案,可以被sparkContext.objectFile()加載
take:擷取RDD中從0到num-1下标的元素,不排序(即抽取前num個元素)
takeSample:對RDD進行抽樣,其中參數withReplacement為true時表示抽樣之後還放回,可以被多次抽樣,false表示不放回;fraction表示抽樣個數(注意這個和sample的差別);seed為随機數種子。
top:用于從RDD中,按照預設(降序)或者指定的排序規則,傳回前num個元素
takeOrdered:takeOrdered和top類似,隻不過以和top相反的順序傳回元素
三、基于血統的容錯機制
RDD 預設采用任務失敗重新計算的容錯機制,這種容錯機制本身效率并不高,但由于 RDD 為不可變資料集,并且每個 RDD 都會記錄各自的運算過程圖,是以當某個子運算産生 RDD 的過程中發生異常,會依次向上一級RDD找資料,直到找到資料,并根據此結果重新計算(這需要父級RDD持久化),最糟糕的情況就是找到HDFS整體重新計算。
如果長時間沒有結果,可以開辟一個位置從HDFS上讀取資料重新計算,哪個先計算出結果就保留哪個。