天天看點

PySpark進階--深入剖析wordcount.py

在本文中, 我們借由深入剖析wordcount.py, 來揭開Spark内部各種概念的面紗。我們再次回顧wordcount.py代碼來回答如下問題

對于大多數語言的Hello Word示例,都有main()函數, wordcount.py的main函數,或者說調用Spark的main() 在哪裡

資料的讀入,各個RDD資料如何轉換

map與flatMap的工作機制,以及差別

reduceByKey的作用

WordCount.py 的代碼如下:

Spark2.0中引入了SparkSession的概念,它為使用者提供了一個統一的切入點來使用Spark的各項功能,這邊不妨對照Http Session, 在此Spark就在充當Web service的角色,程式調用Spark功能的時候需要先建立一個Session。是以看到getOrCreate()就很容易了解了, 表明可以視情況建立session或利用已有的session。

既然将Spark 想象成一個Web server, 也就意味着可能用多個通路在進行,為了便于監控管理, 對應用命名一個恰當的名稱是個好辦法。Web UI并不是本文的重點,有興趣的同學可以參考  Spark Application’s Web Console

在建立SparkSession之後, 就是讀入資料并寫入到Dateset中。

為了更好的分解執行過程,是時候借助PySpark了, PySpark是python調用Spark的 API,它可以啟動一個互動式Python Shell。為了友善腳本調試,暫時切換到Linux執行

互動式Shell的好處是可以友善的檢視變量内容和類型。此刻檔案a.txt已經加載到lines中,它是RDD(Resilient Distributed Datasets)彈性分布式資料集的執行個體。

RDD在記憶體中的結構可以參考論文, 了解RDD有兩點比較重要:

一是RDD一種隻讀、隻能由已存在的RDD變換而來的共享記憶體,然後将所有資料都加載到記憶體中,友善進行多次重用。

二是RDD的資料預設情況下存放在叢集中不同節點的記憶體中,本身提供了容錯性,可以自動從節點失敗中恢複過來。即如果某個節點上的RDD partition,因為節點故障,導緻資料丢了,那麼RDD會自動通過自己的資料來源重新計算該partition。

為了探究RDD内部的資料内容,可以利用collect()函數, 它能夠以數組的形式,傳回RDD資料集的所有元素。

lines存儲的是Row object類型,而我們希望的是對String類型進行處理,是以需要利用map api進一步轉換RDD

為了統計每個單詞的出現頻率,需要對每個單詞分别統計,那麼第一步需要将上面的字元串以空格作為分隔符将單詞提取出來,并為每個詞設定一個計數器。比如 These出現次數是1, 我們期望的資料結構是['There', 1]。但是如何将包含字元串的RDD轉換成元素為類似 ['There', 1] 的RDD呢?

下圖簡要的講述了flatMap 和 map的轉換過程。

PySpark進階--深入剖析wordcount.py

transfrom.png

不難看出,map api隻是為所有出現的單詞初始化了計數器為1,并沒有統計相同詞,接下來這個任務由reduceByKey()來完成。在rdd_map 中,所有的詞被視為一個key,而key相同的value則執行reduceByKey内的算子操作,因為統計相同key是累加操作,是以可以直接add操作。

根據a.txt 的内容,可知隻有 of 和 the 兩個單詞出現了兩次,符合預期。

以上的分解步驟,可以幫我們了解RDD的操作,需要提示的是,RDD将操作分為兩類:transformation與action。無論執行了多少次transformation操作,RDD都不會真正執行運算,隻有當action操作被執行時,運算才會觸發。也就是說,上面所有的RDD都是通過collect()觸發的, 那麼如果将上述的transformation放入一條簡練語句中, 則展現為原始wordcount.py的書寫形式。

而真正的action 則是由collect()完成。

至此,已經完成了對wordcount.py的深入剖析,但是有意的忽略了一些更底層的執行過程,比如DAG, stage, 以及Driver程式。

作者:或然子

連結:https://www.jianshu.com/p/067907b23546

來源:簡書

簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權并注明出處。

繼續閱讀