天天看點

強者聯盟——Python語言結合Spark架構全棧架構環境搭建分布式部署示例分析兩類算子map與reduceAMPLab的野心

引言:spark由amplab實驗室開發,其本質是基于記憶體的快速疊代架構,“疊代”是機器學習最大的特點,是以非常适合做機器學習。得益于在資料科學中強大的表現,python語言的粉絲遍布天下,如今又遇上強大的分布式記憶體計算架構spark,兩個領域的強者走到一起,自然能碰出更加強大的火花(spark可以翻譯為火花),是以本文主要講述了pyspark。 本文選自《全棧資料之門》。

  spark由amplab實驗室開發,其本質是基于記憶體的快速疊代架構,“疊代”是機器學習最大的特點,是以非常适合做機器學習。

  架構由scala語言開發,原生提供4種api,scala、java、python以及最近版本開始支援的r。python不是spark的“親兒子”,在支援上要略差一些,但基本上常用的接口都支援。得益于在資料科學中強大的表現,python語言的粉絲遍布天下,如今又遇上強大的分布式記憶體計算架構spark,兩個領域的強者走到一起,自然能碰出更加強大的火花(spark可以翻譯為火花),是以pyspark是本節的主角。

  在hadoop發行版中,cdh5和hdp2都已經內建了spark,隻是內建的版本比官方的版本要略低一些。目前最新的hdp2.4已經內建了1.6.1(官方最新為2.0),可以看出,hortonworks的更新速度非常快,緊跟上遊的步伐。

  除hadoop的map-reduce計算架構之外,spark能異軍突起,而且慢慢地建立自己的全棧生态,那還真得了解下spark到底提供了哪些全棧的技術。spark目前主要提供了以下6大功能。

spark core: rdd及其算子。

spark-sql: dataframe與sql。

spark ml(mllib): 機器學習架構。

spark streaming: 實時計算架構。

spark graphx: 圖計算架構。

pyspark(sparkr): spark之上的python與r架構。

從rdd的離線計算到streaming的實時計算;從dataframe及sql的支援,到mllib機器學習架構;從graphx的圖計算到對統計學家最愛的r的支援,可以看出spark在建構自己的全棧資料生态。從目前學術界與工業界的回報來看,spark也已經做到了。

  是騾子是馬,拉出來遛一遛就知道了。要嘗試使用spark是非常簡單的事情,一台機器就可以做測試和開發了。

  假設解壓到目錄/opt/spark,那麼在$home目錄的.bashrc檔案中添加一個path:

強者聯盟——Python語言結合Spark架構全棧架構環境搭建分布式部署示例分析兩類算子map與reduceAMPLab的野心

  記得source一下.bashrc檔案,讓環境變量生效:

強者聯盟——Python語言結合Spark架構全棧架構環境搭建分布式部署示例分析兩類算子map與reduceAMPLab的野心

  接着執行指令pyspark或者spark-shell,如果看到了spark那帥帥的文本logo和相應的指令行提示符>>>,則說明成功進入互動式界面,即配置成功。

  pyspark與spark-shell都能支援互動式測試,此時便可以進行測試了。相比于hadoop來說,基本上是零配置即可以開始測試。

  spark-shell測試:

強者聯盟——Python語言結合Spark架構全棧架構環境搭建分布式部署示例分析兩類算子map與reduceAMPLab的野心

  pyspark測試:

強者聯盟——Python語言結合Spark架構全棧架構環境搭建分布式部署示例分析兩類算子map與reduceAMPLab的野心

  上面的環境測試成功,證明spark的開發與測試環境已經配置好了。但是說好的分布式呢?我把别人的庫都拖下來了,就是想嘗試spark的分布式環境,你就給我看這個啊?

  上面說的是單機的環境部署,可用于開發與測試,隻是spark支援的部署方式的其中一種。這種是local方式,好處是用一台筆記本電腦就可以運作程式并在上面進行開發。雖然是單機,但有一個非常有用的特性,那就是可以實作多程序,比如8核的機器,隻需要運作代碼的時候指定--master local[],就可以用8個程序的方式運作程式。代表使用全部cpu核心,也可以使用如local[4],意為隻使用4個核心。

  單機的local模式寫的代碼,隻需要做少量的修改即可運作在分布式環境中。spark的分布式部署支援好幾種方式,如下所示。

  standalone:本身自帶的叢集(友善測試和spark本身架構的推廣)。

  mesos:一個新的資源管理架構。

  yarn:hadoop上新生的資源與計算管理架構,可以了解為hadoop的作業系統,

  可以支援各種不同的計算架構。

  ec2:亞馬遜的機器環境的部署。

  從難易程度上來說,standalone分布式最簡單,直接把解壓好的包複制到各台機器上去,配置好master檔案和slave檔案,訓示哪台機器做master,哪些機器做salve。然後在master機器上,通過自帶的腳本啟動叢集即可。

  從使用率上來說,應該是yarn被使用得最多,因為通常是直接使用發行版本中的spark內建套件,cdh和hdp中都已經把spark和yarn內建了,不用特别關注。

  分布式的優勢在于多cpu與更大的記憶體,從cpu的角度再來看spark的三種方式。

本機單cpu:“local”,資料檔案在本機。

本機多cpu:“local[4]”,資料檔案在本機。

standalone叢集多cpu:“spark://master-ip:7077”,需要每台機器都能通路資料檔案。

  

  yarn叢集多cpu:使用“yarn-client”送出,需要每台機器都能通路到資料檔案。

  互動式環境的部署也與上面的部署有關系,直接使用spark-shell或者pyspark是local的方式啟動,如果需要啟動單機多核或者叢集模式,需要指定--master參數,如下所示。

強者聯盟——Python語言結合Spark架構全棧架構環境搭建分布式部署示例分析兩類算子map與reduceAMPLab的野心

  如果使用pyspark,并且習慣了ipython的互動式風格,還可以加上環境變量來啟動ipython的互動式,或者使用ipython提供的notebook:

強者聯盟——Python語言結合Spark架構全棧架構環境搭建分布式部署示例分析兩類算子map與reduceAMPLab的野心

  ipython風格如下所示:

強者聯盟——Python語言結合Spark架構全棧架構環境搭建分布式部署示例分析兩類算子map與reduceAMPLab的野心

  環境部署是新手最頭痛的問題,前面環境已經部署好了,接下來才是正題。因為scala較python複雜得多,是以先學習使用pyspark來寫程式。

  spark有兩個最基礎的概念,sc與rdd。sc是sparkcontext的縮寫,顧名思義,就是spark上下文語境,sc連接配接到叢集并做相應的參數配置,後面所有的操作都在這個上下文語境中進行,是一切spark的基礎。在啟動互動式界面的時候,注意有一句提示:

sparkcontext available as sc, hivecontext available as sqlcontext.

  意思是,sc這個變量代表了sparkcontext上下文,可以直接使用,在啟動互動式的時候,已經初始化好了。

如果是非互動式環境,需要在自己的代碼中進行初始化:

強者聯盟——Python語言結合Spark架構全棧架構環境搭建分布式部署示例分析兩類算子map與reduceAMPLab的野心

  rdd是resilient distributed datasets(彈性分布式資料集)的縮寫,是spark中最主要的資料處理對象。生成rdd的方式有很多種,其中最主要的一種是通過讀取檔案來生成:

強者聯盟——Python語言結合Spark架構全棧架構環境搭建分布式部署示例分析兩類算子map與reduceAMPLab的野心

  讀取joy.txt檔案後,就是一個rdd,此時的rdd的内容就是一個字元串,包含了檔案的全部内容。

  還記得前面使用python來編寫的wordcount代碼嗎?通過hadoop的streaming接口提到map-reduce計算架構上執行,那段代碼可不太好了解,現在簡單的版本來了。

  wordcount例子的代碼如下所示:

強者聯盟——Python語言結合Spark架構全棧架構環境搭建分布式部署示例分析兩類算子map與reduceAMPLab的野心

  在上面的代碼中,我個人喜歡用括号的閉合來進行分行,而不是在行尾加上續行符。

  pyspark中大量使用了匿名函數lambda,因為通常都是非常簡單的處理。核心代碼解讀如下。

flatmap:對lines資料中的每行先選擇map(映射)操作,即以空格分割成一系列單詞形成一個清單。然後執行flat(展開)操作,将多行的清單展開,形成一個大清單。此時的資料結構為:['one','two','three',...]。

map:對清單中的每個元素生成一個key-value對,其中value為1。此時的資料結構為:[('one', 1), ('two',1), ('three',1),...],其中的'one'、'two'、'three'這樣的key,可能會出現重複。

reducebykey:将上面清單中的元素按key相同的值進行累加,其資料結構為:[('one', 3), ('two', 8),

('three', 1), ...],其中'one', 'two','three'這樣的key不會出現重複。

最後使用了wc.collect()函數,它告訴spark需要取出所有wc中的資料,将取出的結果當成一個包含元組的清單來解析。

相比于用python手動實作的版本,spark實作的方式不僅簡單,而且很優雅。

  spark的基礎上下文語境為sc,基礎的資料集為rdd,剩下的就是對rdd所做的操作了。

  對rdd所做的操作有transform與action,也稱為rdd的兩個基本算子。

  transform是轉換、變形的意思,即将rdd通過某種形式進行轉換,得到另外一個rdd,比如對清單中的資料使用map轉換,變成另外一個清單。

  當然,spark能在hadoop的map-reduce模型中脫穎而出的一個重要因素就是其強大的算子。spark并沒有強制将其限定為map和reduce模型,而是提供了更加強大的變換能力,使得其代碼簡潔而優雅。

  下面列出了一些常用的transform。

map(): 映射,類似于python的map函數。

filter(): 過濾,類似于python的filter函數。

reducebykey(): 按key進行合并。

groupbykey(): 按key進行聚合。

rdd一個非常重要的特性是惰性(lazy)原則。在一個rdd上執行一個transform後,并不立即運作,而是遇到action的時候,才去一層層建構運作的dag圖,dag圖也是spark之是以快的原因。

first(): 傳回rdd裡面的第一個值。

take(n): 從rdd裡面取出前n個值。

collect(): 傳回全部的rdd元素。

sum(): 求和。

count(): 求個數。

回到前面的wordcount例子,程式隻有在遇到wc.collect()這個需要取全部資料的action時才執行前面rdd的各種transform,通過建構執行依賴的dag圖,也保證了運作效率。

  初始的資料為一個清單,清單裡面的每一個元素為一個元組,元組包含三個元素,分别代表id、name、age字段。rdd正是對這樣的基礎且又複雜的資料結構進行處理,是以可以使用pprint來列印結果,友善更好地了解資料結構,其代碼如下:

強者聯盟——Python語言結合Spark架構全棧架構環境搭建分布式部署示例分析兩類算子map與reduceAMPLab的野心

  parallelize這個算子将一個python的資料結構序列化成一個rdd,其接受一個清單參數,還支援在序列化的時候将資料分成幾個分區(partition)。分區是spark運作時的最小粒度結構,多個分區會在叢集中進行分布式并行計算。

  使用python的type方法列印資料類型,可知base為一個rdd。在此rdd之上,使用了一個map算子,将age增加3歲,其他值保持不變。map是一個高階函數,其接受一個函數作為參數,将函數應用于每一個元素之上,傳回應用函數用後的新元素。此處使用了匿名函數lambda,其本身接受一個參數v,将age字段v[2]增加3,其他字段原樣傳回。從結果來看,傳回一個pipelinerdd,其繼承自rdd,可以簡單了解成是一個新的rdd結構。

  要列印rdd的結構,必須用一個action算子來觸發一個作業,此處使用了collect來擷取其全部的資料。

  接下來的操作,先使用map取出資料中的age字段v[2],接着使用一個reduce算子來計算所有的年齡之和。reduce的參數依然為一個函數,此函數必須接受兩個參數,分别去疊代rdd中的元素,進而聚合出結果。效果與python中的reduce相同,最後隻傳回一個元素,此處使用x+y計算其age之和,是以傳回為一個數值,執行結果如下圖所示。

強者聯盟——Python語言結合Spark架構全棧架構環境搭建分布式部署示例分析兩類算子map與reduceAMPLab的野心

  他們的目的就是bdas(berkeley data analytics stack),基于記憶體的全棧大資料分析。前面介紹過的mesos是叢集資料總管。還有tachyon,是基于記憶體的分布式檔案系統,類似于hadoop的hdfs檔案系統,而spark streaming則類似于storm實時計算。

  強大的全棧式spark,撐起了大資料的半壁江山。

                    

強者聯盟——Python語言結合Spark架構全棧架構環境搭建分布式部署示例分析兩類算子map與reduceAMPLab的野心

  想及時獲得更多精彩文章,可在微信中搜尋“博文視點”或者掃描下方二維碼并關注。

                       

強者聯盟——Python語言結合Spark架構全棧架構環境搭建分布式部署示例分析兩類算子map與reduceAMPLab的野心