💫上次寫完rdd的介紹,有同學強烈介意用一些代碼來展示一下rdd,好今天我們就如你所願,我們今天就來以代碼的方式給大家講解一下rdd吧,對以往内容感興趣的同學可以檢視下面👇:
- 連結: Spark之處理布爾、數值和字元串類型的資料.
- 連結: Spark之Dataframe基本操作.
- 連結: Spark之處理布爾、數值和字元串類型的資料.
- 連結: Spark之核心架構.
- 連結: Spark之RDD算子.
💐今天主要講解一下rdd的大緻情況,以及目前的使用場景,然後就是掩飾怎麼使用python操作rdd的幾種方式。
目錄
- 1. 低級API——RDD
- 2. 建立RDD
-
- 2.1 使用進階api轉換
- 2.2 從本地建立rdd
- 2.3 從資料源建立
- 3. 操作RDD
-
- 轉換操作
-
- 3.01 dinstinct
- 3.02 filter
- 3.03 map
- 3.04 flatmap
- 3.05 sortBy
- 動作操作
-
- 3.06 reduce
- 3.07 count
- 3.08 countByValue
- 3.09 first
- 3.10 max 和 mix
- 3.11 take
- 4.參考文獻
1. 低級API——RDD
其實對于初學者,我不會很建議你從這部分開始,因為這部分了解起來很困難,還不如從進階的api開始學,之是以說rdd低級,是因為我們在幾乎所有的場景都能應該使用結構化的api,隻有當我們遇到一些很不常見的功能你才會使用rdd(當然還有其他的場景),低級的api有兩種:
- rdd 彈性分布式資料集
- 廣播變量和累加器 分發和處理分布式的共享變量。
rdd在spark的1.X版本是主要的api,在2.X版本仍然可以使用,但不常用,我們目前使用的是3.X版本,幾乎不用原始的rdd了。但rdd的概念是一直存在的,無論是我們的dataframe和dataset(scala),運作所有的spark代碼都将編譯成rdd。
rdd雖然有很強大的可塑性,可是在完成一些操作時的優化遠遠沒有結構化的api好,是以說結構化的api更高效。
2. 建立RDD
低級api使用spark.sparkContext來調用即可。但我們這裡講解幾種建立方式。
2.1 使用進階api轉換
這裡的進階api主要是指利用DataFrame和Dataset來建立rdd。
- 使用dataframe建立rdd
#使用dataframe建立rdd
a=spark.range(10).rdd
a.collect()
結果如下:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiNx8FesU2cfdGLwczX0xiRGZkRGZ0Xy9GbvNGL0EzXlpXazxCeHpkNQNkY1oXLwVTQClGVF5UMR9Fd4VGdsATNfd3bkFGazxycykFaKdkYzZUbapXNXlleSdVY2pESa9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL1M2M4ImN5EWOkhjZ3EDO4IWYmRzYyMDM1kjZ5EmN3Q2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
- 使用rdd建立dataframe
#使用rdd建立dataframe
b=spark.range(10).rdd.toDF()
b.show()
結果如下:
2.2 從本地建立rdd
從集合中建立rdd需要使用sparkContext中的parallelize方法,該方法會将位于單個節點的資料集合轉換成一個并行集合。在建立該并行集合時,還可以顯示指定并行集合的分片集合。下面我以2個分片作為例子:
#建立單詞rdd
c='hello spark , hello hadoop'.split(" ")
words=spark.sparkContext.parallelize(c,2)
words.collect()
結果如下:
2.3 從資料源建立
從資料源或者文本檔案中都能建立rdd,但最好還是采用結構化的方式來讀取資料。
#從文本中讀取資料
d=spark.sparkContext.textFile("/FileStore/tables/2010_12_01.csv")
d.collect()
結果如下:
3. 操作RDD
轉換操作
3.01 dinstinct
在rdd上調用distinct方法用于删除rdd中的重複項
c='hello spark , hello hadoop'.split(" ")
words=spark.sparkContext.parallelize(c,2)
words.distinct().count()
結果如下:
3.02 filter
過濾器filter類似于sql中的where子句。
#建構一個過濾條件h開頭
def startwith(ones):
return ones.startswith("h")
#filter過濾
words.filter(lambda x:startwith(x)).collect()
結果如下:
3.03 map
map操作,指定一個函數,将給定的資料一條一條地輸入該函數處理以得到你期望的結果,一對一是map最大的特點。
#将單個字元變成一個三元組
words.map(lambda word : (word,word[0],word.startswith('h'))).collect()
結果如下:
3.04 flatmap
flatmap函數是對map函數的拓展,目前行會映射為多行。該函數最大的特點就是一行映射多行。
#将一個一單詞拆成一個個字元
words.flatMap(lambda a:list(a)).take(10)
結果如下:
3.05 sortBy
排序操作
#将單詞按照字元長度從短到長排列
words.sortBy(lambda a:len(a)).take(4)
結果如下:
動作操作
3.06 reduce
該函數指定一個函數将rdd中的任何類型的值“規約”成一個值
#求1-20數字之和(這裡實作的方式,是通過22相加最後求的總和)
spark.sparkContext.parallelize(range(1,21)).reduce(lambda x,y:x+y)
結果如下:
3.07 count
用與統計元素個數
#統計單詞個數
words.count()
結果如下:
3.08 countByValue
根據value值進行計數
結果如下:
3.09 first
傳回資料集中第一個值
#傳回資料集中第一個值
words.first()
結果如下:
3.10 max 和 mix
max和min方法分别傳回最大值和最小值
#最大值
spark.sparkContext.parallelize(range(1,11)).max()
結果如下:
#最小值
spark.sparkContext.parallelize(range(1,11)).min()
結果如下:
3.11 take
take 和它的派生方法是從rdd中擷取一定資料的值
4.參考文獻
《spark權威指南》
《pysaprk教程》
《pyspark實戰》