天天看點

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

圖的并行化處理一直是一個非常熱門的話題,這裡頭的重點有兩個,一是如何将圖的算法并行化,二是找到一個合适的并行化處理架構。spark作為一個非常優秀的并行處理架構,将一些并行化的算法移到其上面就成了一個很自然的事情。

graphx是一些圖的常用算法在spark上的并行化實作,同時提供了豐富的api接口。本文就graphx的代碼架構及pagerank在graphx中的具體實作做一個初步的學習。

當google還在起步的時候,在搜尋引擎領域,yahoo正如日中天,紅的發紫。顯然,在google面前的是一堵讓人幾乎沒有任何希望的牆。

但世事難料,現在“外事問谷歌”成了不争的事實,yahoo應也陪客了。

這種轉換到底是如何形成的了,有一個因素是這樣的,那就是google發明了顯著提高搜尋準确率的pagerank算法。如果說pagerank算法的提出讓谷歌牢牢站穩了搜尋引擎大戰的腳跟,這是毫不誇張的。

搜尋引擎有幾個要考慮的關鍵因素(個人觀點而已)。

要想吸引使用者,就必須要有出色的搜尋準确率

有了使用者,才能做廣告投放,提高廣告投放的針對性就可以盈利

上述兩個方面都有非常優秀的算法。

廢話少述,回到正題。pagerank算法是圖論的一個具體應用,ok, 轉到圖論。

離散數學中非常重要的一個部分就是圖論,下面是一個無向連通圖:

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

上圖中的a,b,c,d,e稱為圖的頂點。

頂點與頂點之間的連線稱之為邊。

讀大學的時候,一直沒有想明白為什麼要學勞什子的線性代數。直到這兩天看《數學之美》一書時,才發覺,線性代數在一些計算機應用領域,那簡直就是不可或缺啊。

我們比較容易了解的平面幾何和立體幾何(一個是二維,一個是三維),而線性代數解決的其實是一個高維問題,由于無法直覺的感受到,是以很難。如果想比較通俗的了解一下數學為什麼有這麼多的分支及其内在關聯,強烈推薦讀一下《數學橋 對高等數學的一次觀賞之旅》。

在數學中,用什麼來表示圖呢,答案就是線性代數裡面的矩陣,想想看,圖的關聯矩陣,圖的鄰接矩陣。總之就是矩陣啦,線性代數一下子有用了。下面是一個具體的例子。

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解
Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

剛才說到圖可以用矩陣來表示,圖的并行化問題在某種程度上就被轉化為矩陣運算的并行化問題。

那麼以矩陣的乘法為例,看看其是否可以并行化處理。

以矩陣 a x b 為例,說明并行化處理過程。

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

将上述的矩陣a和b劃分為四個部分,如下圖所示:

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

首次對齊之後:

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

子矩陣相乘

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

相乘之後,a的子矩陣左移,b的子矩陣上移

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

計算結果合并

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

上一節的重點有兩點:

圖用矩陣來表示,對圖的運算就是矩陣的運算

矩陣乘法運算可以并行化,動态示範其并行化的原理

你說ok,我明白了。哪有沒有一種合适的并行化處理架構可以用來進行圖的計算呢,那你肯定想到了mapreduce。

mapreduce盡管也是一個不錯的并行化處理架構,但在圖計算方面,有許多缺點,主要是計算的中間過程需要存儲到硬碟,效率很低。

google針對圖的并行處理,專門提出了一個了不起的架構pregel。其執行時的動态視圖如下所示。

pregel有如下優點

級聯可擴性好 scalability

容錯性強

能夠很好的表示各種圖的常用算法

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

計算模型如下圖所示,重要的有三個:

作用于每個頂點的處理邏輯 vertexprogram

消息發送,用于相鄰節點間的通訊 sendmessage

消息合并邏輯 messagecombining

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

非常感謝你能堅持看到現在,這篇部落格内容很多,有點難。我想還是上一幅圖将其内在邏輯整一下再繼續說下去。

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

該圖要表示的意思是這樣的,graphx利用了spark這樣了一個并行處理架構來實作了圖上的一些可并行化執行的算法。

本篇部落格要表達的意思就是上面加紅的這句話,請諸位看官仔細了解。

算法是否能夠并行化與spark本身無關

算法并行化與否的本身,需要通過數學來證明

已經證明的可并行化算法,利用spark來實作會是一個錯的選擇,因為graphx支援pregel的圖計算模型

毫無疑問,圖本身是graphx中一個非常重要的概念。

graph中重要的成員變量分别為

vertices

edges

triplets

為什麼要引入triplets呢,主要是和pregel這個計算模型相關,在triplets中,同時記錄着edge和vertex. 具體代碼就不羅列了。

函數分成幾大類:

對所有頂點或邊的操作,但不改變圖結構本身,如mapedges, mapvertices

子圖,類似于集合操作中的filter subgraph

圖的分割,即paritition操作,這個對于spark計算來說,很關鍵,正是因為有了不同的partition,才有了并行處理的可能, 不同的partitionstrategy,其收益不同。最容易想到的就是利用hash來将整個圖分成多個區域。

outerjoinvertices 頂點的外連接配接操作

圖的常用算法是集中抽象到graphops這個類中,在graph裡作了隐式轉換,将graph轉換為graphops

支援的操作如下

collectneighborids

collectneighbors

collectedges

joinvertices

filter

pickrandomvertex

pregel

pagerank

staticpagerank

connectedcomponents

trianglecount

stronglyconnectedcomponents

rdd是spark體系的核心,那麼graphx中引入了哪些新的rdd呢,有倆,分别為

vertexrdd

edgerdd

較之edgerdd,vertexrdd更為重要,其上的操作也很多,主要集中于vertex之上屬性的合并,說到合并就不得不扯到關系代數和集合論,是以在vertexrdd中能看到許多類似于sql中的術語,如

leftjoin

innerjoin

至于leftjoin, innerjoin, outerjoin的差別,建議谷歌一下,不再贅述。

在進行數學計算的時候,圖用線性代數中的矩陣來表示,那麼如何進行存儲呢?

學資料結構的時候,老師肯定說過好多的辦法,不再啰嗦了。

不過在大資料的環境下,如果圖很巨大,表示頂點和邊的資料不足以放在一個檔案中怎麼辦? 用hdfs

加載的時候,一台機器的記憶體不足以容下怎麼辦? 延遲加載,在真正需要資料時,将資料分發到不同機器中,采用級聯方式。

一般來說,我們會将所有與頂點相關的内容儲存在一個檔案中vertexfile,所有與邊相關的資訊儲存在另一個檔案中edgefile。

生成某一個具體的圖時,用edge就可以表示圖中頂點的關聯關系,同時圖的結構也表示出來了。

graphloader是graphx中專門用于圖的加載和生成,最重要的函數就是edgelistfile,定義如下。

”在網際網路上,如果一個網頁被很多其它網頁所連結,說明它受到普遍的承認和依賴,那麼它的排名就很高。“  (摘自數學之美第10章)

你說這也太簡單了吧,不是跟沒說一個樣嗎,怎麼用數學來表示呢?

呵呵,起初我也這麼想的,後來多看了幾遍之後,明白了一點點。分析步驟用文字表述如下,

網頁和網頁之間的關系用圖來表示

網頁a和網頁b之間的連接配接關系表示任意一個使用者從網頁a到轉到網頁b的可能性(機率)

所有網頁的排名用一維向量來b來表示

所有網頁之間的連接配接用矩陣a來表示,所有網頁排名用b來表示。

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

好了,上面的數學闡述說明了“網頁排名的計算可以最終抽象為矩陣相乘”,而在開始的時候已經證明過矩陣相乘可以并行化處理。

理論研究結束了,接下來的就是工程實作了,借用pregel模型,pagerank中定義的各主要函數分别如下。

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

通過pagerank這個例子,我們能夠搞清楚如何将平素學習的數學理論用以解決實際問題。

“學習的東西總是有價值的,至于用的上用不上,全靠造化了”

本篇講來講去就在強調一個問題,spark是一個分布式并行計算架構。能不能用spark,其實大體取決于問題的數學模型本身,如果可以并行化處理,則用之,切不可削足适履。

另一個用張圖來總結一下提到的數學知識吧。

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

再一次強烈推薦《數學橋》

《數學之美》

《數學橋 高等數學的觀賞之旅》

《大資料》

之是以對spark shell的内部實作産生興趣全部緣于好奇代碼的編譯加載過程,scala是需要編譯才能執行的語言,但提供的scala repl可以實作代碼的實時互動式執行,這是為什麼呢?

既然scala已經提供了repl,為什麼spark還要自己單獨搞一套spark repl,這其中的緣由到底何在?

顯然,這些都是問題,要解開這些謎團,隻有再次開啟一段源碼分析之旅了。

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

上圖顯示了java源檔案從編譯到加載執行的全局視圖,整個過程中最主要的步驟是:

編譯成過程,由編譯器對java源檔案進行編譯整理,生成java bytecodes

類的加載和初始化,主要由classloader參與

執行引擎 将位元組碼翻譯成機器碼,然後排程執行

這一部分的内容,解釋的非常詳細的某過于《深入了解jvm》和撒迦的jvm分享,這裡就不班門弄斧了。

那麼講上述這些内容的目的又何在呢,我們知道scala也是需要編譯執行的,那麼編譯的結果是什麼樣呢,要符合什麼标準?在哪裡執行。

答案比較明顯,scala源檔案也需要編譯成java bytecodes,和java的編譯結果必須符合同一份标準,生成的bytecode都是由jvm的執行引擎轉換成為機器碼之後排程執行。

也就是說盡管scala和java源檔案的編譯器不同,但它們生成的結果必須符合同一标準,否則jvm無法正确了解,執行也就無從談起。至于scala的編譯器是如何實作的,文中後續章節會涉及。

”cpu是很傻的,加電後,它就會一直不斷的讀取指令,執行指令,不能停的哦。“ 如果有了這個意識,看源碼的時候你就會有無窮的疑惑,無數想不通的地方,這也能讓你不斷的進步。

再繼續講scala源檔案的編譯細節之前,我們還是來溫習一下基礎的内容,即一個efl可執行檔案是如何加載到記憶體真正運作起來的。(本篇部落格的内容相對比較底層,很費腦子的,:)

linux平台上基本采用elf作為可執行檔案的格式,java可執行檔案本身也是elf格式的,使用file指令來作檢驗。

下面是輸出的結果,從結果中可以證明java也是elf格式。

elf檔案的執行過程大緻如下

fork建立一個程序

調用execve來執行elf

elf的加載過程中會有動态加載和連結發生

全局變量的初始化,這一部分和glibc相關

執行main函數

我講述的過程非常籠統,要想更清楚的了解細節,請參閱《深入了解linux核心》中的程式的執行一章,或是《深入linux核心架構》中的啟動新程式一節。

現在打開核心中相應的源碼,看看execve函數是如何找到elf格式的處理句柄的。

在檔案$kernel_home/fs/binfmt_elf.c中,init_elf_binfmt函數就實作了注冊任務

來看一看elf_format的定義是什麼

execve是一個系統調用,核心中對應的函數是do_execve,具體代碼不再列出。

do_execve->do_execve_common->search_binary_hander

注意search_binary_handler會找到上一步中注冊的binary_handler即elf_format,找到了對應的handler之後,關鍵的一步就是load_binary了。動态連結過程調用的是load_shlib,這一部分的内容細細展開的話,夠寫幾本書了。

search_binary_handler的部分代碼

 要想對這一部分内容有個比較清楚的了解,建議看一下台灣黃敬群先生的《深入淺出helloworld》和國内出版的《程式員的自我修養》。

另外一個值得推薦的是黑客級的網站phrack.org,可惜現在不更新了。

之是以講elf的加載和運作,是因為要打通java源檔案的編譯執行過程的話,必然會步步深入到此,其實到這還不夠,再往下走就是cpu指令,隻有到達cpu指令才算真正到底。這個時候就需要去讀intel ia-64 software programmer guide了。

 源碼走讀其實隻是個形式,重要的是能理清楚其執行流程,以到達指令級的了解為最佳。

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解
Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

在各位java達人面前,我就不顯示自己java水準有多爛了。隻是将兩幅最基本的圖搬出來,展示一下java類的加載過程,以及classloader的層次關系。記住這些東東會為我們在後頭讨論scala repl奠定良好基礎。

java體系中,另一個重要的基石就是類的序列化和反序列化。這裡要注意的就是當有繼承體系時,類的序列化和反序列化順序,以及類中有靜态成員變量的時候,如何處理序列化。諸如此類的文章,一搜一大把,我再多加解釋實在是畫蛇添足,列出來隻是說明其重要性罷了。

前面進行了這麼多的鋪墊之後,我想可以進入正題了。即spark-shell的執行調用路徑到底怎樣。

首次使用spark一般都是從執行spark-shell開始的,當在鍵盤上敲入spark-shell并回車時,後面究竟發生了哪些事情呢?

可以看出spark-shell其實是對spark-submit的一層封裝,但事情到這還沒有結束,畢竟還沒有找到調用java的地方,繼續往下搜尋看看spark-submit腳本的内容。

離目标越來越近了,spark-class中會調用到java程式,與java相關部分的代碼摘錄如下

sparksubmit當中定義了main函數,在它的進行中會将spark repl運作起來,spark repl能夠接收使用者的輸入,通過編譯與運作,傳回結果給使用者。這就是為什麼spark具有互動處理能力的原因所在。調用順序如下

sparksubmit

repl.main

sparkiloop

修改spark-class,使得java_opts看起來如下圖所示:

修改完上述腳本之後先啟動spark-shell,然後再啟動jvisualvm

 在java visualvm中選擇程序org.apache.spark.deploy.sparksubmit,如果已經為jvisualvm安裝了插件threads inspector,其界面将會與下圖很類似:

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

 在右側選擇“線程”這一tab頁,選擇線程main,然後可以看到該線程的thread dump資訊

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

既然scala已經提供了repl, spark還是要自己去實作一個repl,你不覺着事有可疑麼?我谷歌了好長時間,終于找到了大神的讨論文章,不容易啊,原文摘錄如下。

thanks for looping me in! just fyi, i would also be okay if instead of making the wrapper code pluggable, the repl just changed to one based on classes, as in prashant's example, rather than singleton objects. to give you background on this, the problem with the "object" wrappers is that initialization code goes into a static initializer that will have to run on all worker nodes, making the repl unusable with distributed applications. as an example, consider this: // file.txt is a local file on just the master val data = scala.io.source.fromfile("file.txt").mkstring // now we use the derived string, "data", in a closure that runs on the cluster spark.textfile.map(line => dostuff(line, data)) the current scala repl creates an object line1 whose static initializer sets data with the code above, then does import line1.data in the closure, which will cause the static initializer to run *again* on the remote node and fail. this issue definitely affects spark, but it could also affect other interesting projects that could be built on scala's repl, so it may be an interesting thing to consider supporting in the standard interpreter. matei

上述内容估計第一次看了之後,除了一頭霧水還是一頭霧水。翻譯成為白話就是利用scala原生的repl,是使用object來封裝輸入的代碼的,這有什麼不妥,“序列化和反序列化”的問題啊。反序列化的過程中,對象的構造函數會被再次調用,而這并不是我們所期望的。我們希望生成class而不是object,如果你不知道object和class的差別,沒關系,看一下scala的簡明手冊,馬上就明白了。

最重要的一點:scala repl預設輸入的代碼都是在本地執行,故使用objectbasedwraper是沒有問題的。但在spark環境下,輸入的内容有可能需要在遠端執行,這樣objectbasedwrapper的源碼生成方式經序列化反序列化會有相應的副作用,導緻出錯不可用。

再啰嗦一次,scala是需要編譯執行的,而repl給我們的錯覺是scala是解釋執行的。那我們在repl中輸入的語句是如何被真正執行的呢?

簡要的步驟是這樣的

在repl中輸入的每一行語句,都會被封裝為一個object, 這一工作主要由interpreter完成

對該object進行編譯

由classloader加載編譯後的java bytecode

執行引擎負責真正執行加載入記憶體的bytecode

那麼怎麼證明我說的是對的呢?很簡單,做個實驗,利用下述語句了啟動scala repl

 如果我們輸入這樣一條語句 val c = 10,由interpreter生成的scala源碼會如下所列

注意啰,是object哦,不是class。

那我們再看看spark repl生成的scala源碼是什麼樣子的?

啟動spark-shell之前,修改一下spark-class,在java_opts中加入如下内容

啟動spark-shell,輸入val b = 10,生成的scala源碼如下所示

注意到與scala repl中的差異了麼,此處是class而非object

是什麼導緻有上述的差異的呢?我們可以下載下傳scala的源碼,對是scala本身的源碼在github上可以找到。interpreter中代碼生成部分的處理邏輯主要是在imain.scala,在spark中是sparkimain.scala。

比較兩個檔案的異同。

gvimdiff是個好工具,兩個檔案的差異一目了然,emacs和vim總要有一樣玩的轉才行啊。來個螢幕截圖吧,比較炫吧。

Apache Spark源碼走讀(八)Graphx實作剖析&spark repl實作詳解

注:spark開發團隊似乎給scala的開發小組提了一個case,在最新的scala中似乎已經支援classbasedwrapper,可以通過現應的選項來設定來選擇classbasedwraper和objectbasedwrapper.

下述代碼見最新版scala,scala-2.12.x中的imain.scala:

scala實作了自己的編譯器,處理邏輯的代碼實作見scala源碼中的src/compiler目錄下的源檔案。有關其處理步驟不再贅述,請參考ref3,ref4中的描述。

有一點想要作個小小提醒的時,當你看到sparkimain.scala中有new run的語句卻不知道這個run在哪的時候,兄弟跟你講在scala中的global.scala裡可以找到, :)

編譯和加載是一個非常有意思的話題,即可以說是很基礎也可以說很冷門,有無動力就這部分進行深究,就看個人的興趣了。

<a href="http://phrack.org/issues/58/5.html">http://phrack.org/issues/58/5.html</a>

<a href="http://blog.linux.org.tw/~jserv/archives/002125.html">http://blog.linux.org.tw/~jserv/archives/002125.html</a>

<a href="http://lampwww.epfl.ch/~magarcia/scalacompilercornerreloaded/2012q2/genasm.pdf">http://lampwww.epfl.ch/~magarcia/scalacompilercornerreloaded/2012q2/genasm.pdf</a>

<a href="https://wiki.scala-lang.org/display/siw/overview+of+compiler+phases">https://wiki.scala-lang.org/display/siw/overview+of+compiler+phases</a>

《深入了解linux核心》

《深入linux核心架構》

《深入了解java虛拟機》<b></b>