天天看點

spark快速大資料分析之讀書筆記

RDD程式設計

1、Spark中的RDD就是一個不可變的分布式對象集合。每個RDD都被分為多個分區,這些分區運作在叢集中的不同節點上。

2、使用者可以使用兩種方法建立RDD:讀取一個外部資料集,以及在驅動器程式中對一個集合進行并行化(比如list和set)。

建立RDD最簡單的方式就是把程式中一個已有的集合傳給SparkContext的parallelize()方法。

val lines = sc.textFile("README.md")
val linesP = sc.parallelize(List("pandas","i like pandas"))
           

建立出來後,RDD支援兩種類型的操作:轉化操作(transformation)和行動操作(action)。轉化操作會由一個RDD生成一個新的RDD。行動操作會對RDD計算出一個結果,并把結果傳回到驅動器程式中,或把結果存儲到外部存儲系統(如HDFS)中

3、轉化操作和行動操作的差別在于spark計算RDD的方式不同。如果對于一個特定的函數是屬于轉化操作還是行動操作感到困惑,可以看看它的傳回值類型:轉化操作傳回的是RDD,而行動操作傳回的是其他的資料類型。

雖然你可以在任何時候定義新的RDD,但spark隻會惰性計算這些RDD。它們隻有第一次在一個行動操作中用到時,才會真正計算。預設情況下,Spark的RDD會在你每次對它們進行行動操作時重新計算。如果想在多個行動操作中重用同一個RDD,可以使用RDD.persist(),這裡也可以使用RDD.cache(),讓spark把這個RDD緩存下來。

來看兩個例子:

轉化操作

val inputRDD = sc.textFile("log.txt")
val errorRDD = inputRDD.filter(line=>line.contains("error"))
           

行動操作

errorRDD.count()
for(String line:errorRDD.take(10)){
    System.out.println(line)
}
           

這裡take()擷取了RDD中的少量元素。除此這外,RDD中還有一個擷取元素的方法,collect()函數,但是隻有當整個資料集能在單台機器的記憶體中放得下時,才能使用collect(),是以,collect()不能用在大規模資料集上。

4、之是以把RDD稱為彈性的,是因為在任何時候都能進行重算。當儲存RDD資料的一台機器失敗時,Spark還可以使用這種特性來重算出丢掉的分區。

5、總結一下,每個spark程式或shell對話都按如下方式工作:

(1)從外部資料建立出輸入RDD。

(2)使用諸如filter()這樣的轉化操作對RDD進行轉化,以定義新的RDD。

(3)告訴spark對需要被重用的中間結果RDD執行persist()操作。

(4)使用行動操作(例如count()和first()等)來觸發一次并行計算,spark會對計算進行優化後再執行。

版權聲明:本文為CSDN部落客「weixin_34413802」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。

原文連結:https://blog.csdn.net/weixin_34413802/article/details/91705756