天天看點

【Spark】(二)RDD和算子一、RDD二、算子三、基于血統的容錯機制

一、RDD

1.1 什麼是RDD?

RDD(Resilient Distributed Dataset)叫做彈性分布式資料集,是Spark計算過程的核心,是spark計算過程中的瞬時結果,下一個RDD依賴于上一個RDD。它代表一個不可變、可分區、裡面的元素可并行計算的集合。

資料集就是由許多資料組成的集合了

RDD本身并不是分布式的,裡面的資料是分布式的。

那麼,彈性是什麼意思呢?彈性有哪些表現呢?

先看一下resilient這個單詞的英文解釋:adj. recovering readily from adversity, depression, or the like  容易地從逆境或類似的情況中恢複過來。《Learning Spark:Lightning-fast Data Analysis》一書中解釋“彈性”是指在任何時候都能進行重算。這樣當叢集中的一台機器挂掉而導緻存儲在其上的RDD丢失後,Spark還可以重新計算出這部分的分區的資料。但使用者感覺不到這部分的内容丢失過。這樣RDD資料集就像塊帶有彈性的海綿一樣,不管怎樣擠壓(分區遭到破壞)都是完整的。

RDD的彈性展現在:

    1.自動進行記憶體和磁盤切換

    2.基于lineage的高效容錯

    3.task如果失敗會特定次數的重試

    4.stage如果失敗會自動進行特定次數的重試,而且隻會隻計算失敗的分片

    5.checkpoint【每次對RDD操作都會産生新的RDD,如果鍊條比較長,計算比較笨重,就把資料放在硬碟中】和persist 【記憶體或磁盤中對資料進行複用】(檢查點、持久化)

    6.資料排程彈性:DAG TASK 和資源管理無關

    7.資料分片的高度彈性repartion

1.2 五大特性

可以看參考一下spark源碼中的注釋:

/**
 * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
 * partitioned collection of elements that can be operated on in parallel. This class contains the
 * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
 * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
 * pairs, such as `groupByKey` and `join`;
 * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
 * Doubles; and
 * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
 * can be saved as SequenceFiles.
 * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)])
 * through implicit.
 *
 * Internally, each RDD is characterized by five main properties:
 *
 *  - A list of partitions
 *  - A function for computing each split
 *  - A list of dependencies on other RDDs
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
 *    an HDFS file)
 *
 * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
 * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
 * reading data from a new storage system) by overriding these functions. Please refer to the
 * <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf" target="_blank" rel="external nofollow" >Spark paper</a>
 * for more details on RDD internals.
 */
           

1.一組分區的集合

2.一個函數作用于每個分區

3.依賴于其他RDDs

4.可以重新分區

5.資料本地性:每個分區的資料優先在本地計算,但并不代表不能在别的機器上計算。

二、算子

算子是RDD中定義的函數,可以對RDD中的資料進行轉換和操作。

2.1 算子分類

● 轉換(transformation)算子 :這種變換并不觸發送出作業,完成作業中間過程處理。Transformation 操作是延遲計算的,也就是說從一個RDD 轉換生成另一個 RDD 的轉換操作不是馬上執行,需要等到有 Action 操作的時候才會真正觸發運算。

轉換算子又可以分為建立算子和緩存算子

● 行動(action)算子 :這類算子會觸發 SparkContext 送出 Job 作業。Action 算子會觸發 Spark 送出作業(Job),并将資料輸出 Spark系統。

2.2 常用算子

轉換算子:

cartesian:兩個RDD進行笛卡爾積合并

coalesce:對RDD重新進行分區,第一個參數為分區數;第二個參數為是否進行shuffle操作,預設值為false,當shuffle=false時,不能增加分區數,但不會報錯,分區個數還是原來的

cogroup:對多個RDD中的Key-Value元素按key值進行合并,每個RDD中相同key中的元素分别聚合成一個集合。

distinct:将原始RDD中重複出現的元素進行過濾,傳回一個新生成的RDD。即原RDD中每個元素在新生成的RDD中隻出現一次

filter:對元素進行過濾,對每個元素應用f函數,傳回值為true的元素在RDD中保留,傳回為false的将被過濾掉

map:和集合中的map作用相同

flatMap:和集合中的flatMap作用相同

flatMapValues:同基本轉換操作中的flatMap,隻不過是針對[K,V]中的V值進行flatMap操作

groupByKey:将Key-Value型RDD中的元素按照Key值進行彙聚,Key值相同的Value值會合并為一個序列。和cogroup差別在于cogroup可以合并多個RDD,而groupByKey則針對一個RDD

intersection:傳回兩個RDD的交集

keys:對Key-Value型RDD擷取所有的key

mapValues:同基本轉換操作中的map,針對[K,V]中的V值進行map操作

reduceByKey:将Key-Value型RDD按照key值進行聚合操作,生成新的RDD。

sample:對RDD進行抽樣,其中參數withReplacement為true時表示抽樣之後還放回,可以被多次抽樣,false表示不放回;fraction表示抽樣比例;seed為随機數種子,比如目前時間戳

sortByKey:對Key-Value型RDD按照key值進行升序或降序排列

subtract:差集,即傳回在oneRDD中出現,并且不在otherRDD中出現的元素,不去重

subtractByKey:對Key-Value型RDD按照key值做差集,即傳回key在oneRDD中出現,并且不在otherRDD中出現的元素,不去重。

union:合并兩個RDD,不去重,要求兩個RDD中的元素類型一緻

values:對Key-Value型RDD擷取所有的value

zip:用于将兩個RDD組合成Key-Value形式的RDD,這裡預設兩個RDD的partition數量以及元素數量都相同,否則會抛出異常。

行動算子:

collectAsMap:如果RDD中同一個Key中存在多個Value,那麼後面的Value将會把前面的Value覆寫,最終得到的結果就是Key唯一,而且對應一個Value

count:傳回RDD中的元素數量

countByKey:用于統計Key-Value型RDD中每個Key的數量

countByValue:用于統計Key-Value型RDD中每個元素的個數

first:傳回RDD中的第一個元素,不排序

lookUp:用于Key-Value類型的RDD,指定K值,傳回RDD中該Key對應的所有Value值

reduce:根據映射函數f對RDD中的元素進行二進制計算,傳回計算結果

saveAsTextFile:将資料集的元素,以textfile的形式儲存到本地檔案系統hdfs或者任何其他hadoop支援的檔案系統,spark将會調用每個元素的toString方法,并将它轉換為檔案中的一行文本

saveAsObjectFile:使用java的序列化方法儲存到本地檔案,可以被sparkContext.objectFile()加載

take:擷取RDD中從0到num-1下标的元素,不排序(即抽取前num個元素)

takeSample:對RDD進行抽樣,其中參數withReplacement為true時表示抽樣之後還放回,可以被多次抽樣,false表示不放回;fraction表示抽樣個數(注意這個和sample的差別);seed為随機數種子。

top:用于從RDD中,按照預設(降序)或者指定的排序規則,傳回前num個元素

takeOrdered:takeOrdered和top類似,隻不過以和top相反的順序傳回元素

三、基于血統的容錯機制

RDD 預設采用任務失敗重新計算的容錯機制,這種容錯機制本身效率并不高,但由于 RDD 為不可變資料集,并且每個 RDD 都會記錄各自的運算過程圖,是以當某個子運算産生 RDD 的過程中發生異常,會依次向上一級RDD找資料,直到找到資料,并根據此結果重新計算(這需要父級RDD持久化),最糟糕的情況就是找到HDFS整體重新計算。

如果長時間沒有結果,可以開辟一個位置從HDFS上讀取資料重新計算,哪個先計算出結果就保留哪個。

繼續閱讀