天天看點

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻

💫上次寫完rdd的介紹,有同學強烈介意用一些代碼來展示一下rdd,好今天我們就如你所願,我們今天就來以代碼的方式給大家講解一下rdd吧,對以往内容感興趣的同學可以檢視下面👇:

  • 連結: Spark之處理布爾、數值和字元串類型的資料.
  • 連結: Spark之Dataframe基本操作.
  • 連結: Spark之處理布爾、數值和字元串類型的資料.
  • 連結: Spark之核心架構.
  • 連結: Spark之RDD算子.

💐今天主要講解一下rdd的大緻情況,以及目前的使用場景,然後就是掩飾怎麼使用python操作rdd的幾種方式。

目錄

  • 1. 低級API——RDD
  • 2. 建立RDD
    • 2.1 使用進階api轉換
    • 2.2 從本地建立rdd
    • 2.3 從資料源建立
  • 3. 操作RDD
    • 轉換操作
      • 3.01 dinstinct
      • 3.02 filter
      • 3.03 map
      • 3.04 flatmap
      • 3.05 sortBy
    • 動作操作
      • 3.06 reduce
      • 3.07 count
      • 3.08 countByValue
      • 3.09 first
      • 3.10 max 和 mix
      • 3.11 take
  • 4.參考文獻

1. 低級API——RDD

其實對于初學者,我不會很建議你從這部分開始,因為這部分了解起來很困難,還不如從進階的api開始學,之是以說rdd低級,是因為我們在幾乎所有的場景都能應該使用結構化的api,隻有當我們遇到一些很不常見的功能你才會使用rdd(當然還有其他的場景),低級的api有兩種:

  • rdd 彈性分布式資料集
  • 廣播變量和累加器 分發和處理分布式的共享變量。

rdd在spark的1.X版本是主要的api,在2.X版本仍然可以使用,但不常用,我們目前使用的是3.X版本,幾乎不用原始的rdd了。但rdd的概念是一直存在的,無論是我們的dataframe和dataset(scala),運作所有的spark代碼都将編譯成rdd。

rdd雖然有很強大的可塑性,可是在完成一些操作時的優化遠遠沒有結構化的api好,是以說結構化的api更高效。

2. 建立RDD

低級api使用spark.sparkContext來調用即可。但我們這裡講解幾種建立方式。

2.1 使用進階api轉換

這裡的進階api主要是指利用DataFrame和Dataset來建立rdd。

  • 使用dataframe建立rdd
#使用dataframe建立rdd
a=spark.range(10).rdd
a.collect()
           

結果如下:

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻
  • 使用rdd建立dataframe
#使用rdd建立dataframe
b=spark.range(10).rdd.toDF()
b.show()
           

結果如下:

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻

2.2 從本地建立rdd

從集合中建立rdd需要使用sparkContext中的parallelize方法,該方法會将位于單個節點的資料集合轉換成一個并行集合。在建立該并行集合時,還可以顯示指定并行集合的分片集合。下面我以2個分片作為例子:

#建立單詞rdd
c='hello spark , hello hadoop'.split(" ")
words=spark.sparkContext.parallelize(c,2)
words.collect()
           

結果如下:

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻

2.3 從資料源建立

從資料源或者文本檔案中都能建立rdd,但最好還是采用結構化的方式來讀取資料。

#從文本中讀取資料
d=spark.sparkContext.textFile("/FileStore/tables/2010_12_01.csv")
d.collect()
           

結果如下:

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻

3. 操作RDD

轉換操作

3.01 dinstinct

在rdd上調用distinct方法用于删除rdd中的重複項

c='hello spark , hello hadoop'.split(" ")
words=spark.sparkContext.parallelize(c,2)
words.distinct().count()
           

結果如下:

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻

3.02 filter

過濾器filter類似于sql中的where子句。

#建構一個過濾條件h開頭
def startwith(ones):
   return ones.startswith("h")
#filter過濾
words.filter(lambda x:startwith(x)).collect()
           

結果如下:

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻

3.03 map

map操作,指定一個函數,将給定的資料一條一條地輸入該函數處理以得到你期望的結果,一對一是map最大的特點。

#将單個字元變成一個三元組
words.map(lambda word : (word,word[0],word.startswith('h'))).collect()
           

結果如下:

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻

3.04 flatmap

flatmap函數是對map函數的拓展,目前行會映射為多行。該函數最大的特點就是一行映射多行。

#将一個一單詞拆成一個個字元
words.flatMap(lambda a:list(a)).take(10)
           

結果如下:

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻

3.05 sortBy

排序操作

#将單詞按照字元長度從短到長排列
words.sortBy(lambda a:len(a)).take(4)
           

結果如下:

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻

動作操作

3.06 reduce

該函數指定一個函數将rdd中的任何類型的值“規約”成一個值

#求1-20數字之和(這裡實作的方式,是通過22相加最後求的總和)
spark.sparkContext.parallelize(range(1,21)).reduce(lambda x,y:x+y)
           

結果如下:

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻

3.07 count

用與統計元素個數

#統計單詞個數
words.count()
           

結果如下:

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻

3.08 countByValue

根據value值進行計數

結果如下:

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻

3.09 first

傳回資料集中第一個值

#傳回資料集中第一個值
words.first()
           

結果如下:

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻

3.10 max 和 mix

max和min方法分别傳回最大值和最小值

#最大值
spark.sparkContext.parallelize(range(1,11)).max()
           

結果如下:

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻
#最小值
spark.sparkContext.parallelize(range(1,11)).min()
           

結果如下:

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻

3.11 take

take 和它的派生方法是從rdd中擷取一定資料的值

Spark之RDD的使用(pyspark版)1. 低級API——RDD2. 建立RDD3. 操作RDD4.參考文獻

4.參考文獻

《spark權威指南》

《pysaprk教程》

《pyspark實戰》