天天看點

Apache Spark源碼走讀(六)Task運作期之函數調用關系分析 &存儲子系統分析

本篇主要闡述在taskrunner中執行的task其業務邏輯是如何被調用到的,另外試圖講清楚運作着的task其輸入的資料從哪擷取,處理的結果傳回到哪裡,如何傳回。

spark已經安裝完畢

spark運作在local mode或local-cluster mode

local-cluster模式也稱為僞分布式,可以使用如下指令運作

[1,2,1024] 分别表示,executor number, core number和記憶體大小,其中記憶體大小不應小于預設的512m

sparkcontext.scala       整個初始化過程的入口

sparkenv.scala          建立blockmanager, mapoutputtrackermaster, connectionmanager, cachemanager

dagscheduler.scala       任務送出的入口,即将job劃分成各個stage的關鍵

taskschedulerimpl.scala 決定每個stage可以運作幾個task,每個task分别在哪個executor上運作

schedulerbackend

最簡單的單機運作模式的話,看localbackend.scala

如果是叢集模式,看源檔案sparkdeployschedulerbackend

<b>步驟1</b>: 根據初始化入參生成sparkconf,再根據sparkconf來建立sparkenv, sparkenv中主要包含以下關鍵性元件 1. blockmanager 2. mapoutputtracker 3. shufflefetcher 4. connectionmanager

<b>步驟2</b>:建立taskscheduler,根據spark的運作模式來選擇相應的schedulerbackend,同時啟動taskscheduler,這一步至為關鍵

taskscheduler.start目的是啟動相應的schedulerbackend,并啟動定時器進行檢測

<b>步驟3</b>:以上一步中建立的taskscheduler執行個體為入參建立dagscheduler并啟動運作

<b>步驟4</b>:啟動web ui

還是以最簡單的wordcount為例說明rdd的轉換過程

上述一行簡短的代碼其實發生了很複雜的rdd轉換,下面仔細解釋每一步的轉換過程和轉換結果

textfile先是生成hadooprdd,然後再通過map操作生成mappedrdd,如果在spark-shell中執行上述語句,得到的結果可以證明所做的分析

flatmap将原來的mappedrdd轉換成為flatmappedrdd

利用word生成相應的鍵值對,上一步的flatmappedrdd被轉換成為mappedrdd

步驟2,3中使用到的operation全部定義在rdd.scala中,而這裡使用到的reducebykey卻在rdd.scala中見不到蹤迹。reducebykey的定義出現在源檔案pairrddfunctions.scala

細心的你一定會問reducebykey不是mappedrdd的屬性和方法啊,怎麼能被mappedrdd調用呢?其實這背後發生了一個隐式的轉換,該轉換将mappedrdd轉換成為pairrddfunctions

這種隐式的轉換是scala的一個文法特征,如果想知道的更多,請用關鍵字"scala implicit method"進行查詢,會有不少的文章對此進行詳盡的介紹。

接下來再看一看reducebykey的定義:

reducebykey最終會調用combinebykey, 在這個函數中pairedrddfunctions會被轉換成為shufflerdd,當調用mappartitionswithcontext之後,shufflerdd被轉換成為mappartitionsrdd

log輸出能證明我們的分析

小結一下整個rdd轉換過程

hadooprdd-&gt;mappedrdd-&gt;flatmappedrdd-&gt;mappedrdd-&gt;pairrddfunctions-&gt;shufflerdd-&gt;mappartitionsrdd

整個轉換過程好長啊,這一切的轉換都發生在任務送出之前。

在對任務運作過程中的函數調用關系進行分析之前,我們也來探讨一個偏理論的東西,作用于rdd之上的transformantion為什麼會是這個樣子?

對這個問題的解答和數學搭上關系了,從理論抽象的角度來說,任務處理都可歸結為“input-&gt;processing-&gt;output"。input和output對應于資料集dataset.

在此基礎上作一下簡單的分類

one-one 一個dataset在轉換之後還是一個dataset,而且dataset的size不變,如map

one-one 一個dataset在轉換之後還是一個dataset,但size發生更改,這種更改有兩種可能:擴大或縮小,如flatmap是size增大的操作,而subtract是size變小的操作

many-one 多個dataset合并為一個dataset,如combine, join

one-many 一個dataset分裂為多個dataset, 如groupby

task的送出過程參考本系列中的第二篇文章。本節主要講解當task在運作期間是如何一步步調用到作用于rdd上的各個operation

taskrunner.run

task.run

task.runtask (task是一個基類,有兩個子類,分别為shufflemaptask和resulttask)

rdd.iterator

rdd.computeorreadcheckpoint

rdd.compute 

或許當看到rdd.compute函數定義時,還是覺着f沒有被調用,以mappedrdd的compute定義為例

注意,這裡最容易産生錯覺的地方就是map函數,這裡的map不是rdd中的map,而是scala中定義的iterator的成員函數map, 請自行參考http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.iterator

compute的計算過程對于shufflemaptask比較複雜,繞的圈圈比較多,對于resulttask就直接許多。

上面的分析知道,wordcount這個job在最終送出之後,被dagscheduler分為兩個stage,第一個stage是shufflemaptask,第二個stage是resulttask.

那麼shufflemaptask的計算結果是如何被resulttask取得的呢?這個過程簡述如下

shffulemaptask将計算的狀态(注意不是具體的資料)包裝為mapstatus傳回給dagscheduler

dagscheduler将mapstatus儲存到mapoutputtrackermaster中

resulttask在執行到shufflerdd時會調用blockstoreshufflefetcher的fetch方法去擷取資料

第一件事就是咨詢mapoutputtrackermaster所要取的資料的location

根據傳回的結果調用blockmanager.getmultiple擷取真正的資料

blockstoreshufflefetcher的fetch函數僞碼

注意上述代碼中的getserverstatuses及getmultiple,一個是詢問資料的位置,一個是去擷取真正的資料。

spark計算速度遠勝于hadoop的原因之一就在于中間結果是緩存在記憶體而不是直接寫入到disk,本文嘗試分析spark中存儲子系統的構成,并以資料寫入和資料讀取為例,講述清楚存儲子系統中各部件的互動關系。

Apache Spark源碼走讀(六)Task運作期之函數調用關系分析 &amp;存儲子系統分析

上圖是spark存儲子系統中幾個主要子產品的關系示意圖,現簡要說明如下

cachemanager  rdd在進行計算的時候,通過cachemanager來擷取資料,并通過cachemanager來存儲計算結果

blockmanager   cachemanager在進行資料讀取和存取的時候主要是依賴blockmanager接口來操作,blockmanager決定資料是從記憶體(memorystore)還是從磁盤(diskstore)中擷取

memorystore   負責将資料儲存在記憶體或從記憶體讀取

diskstore        負責将資料寫入磁盤或從磁盤讀入

blockmanagerworker  資料寫入本地的memorystore或diskstore是一個同步操作,為了容錯還需要将資料複制到别的計算結點,以防止資料丢失的時候還能夠恢複,資料複制的操作是異步完成,由blockmanagerworker來處理這一部分事情

connectionmanager 負責與其它計算結點建立連接配接,并負責資料的發送和接收

blockmanagermaster 注意該子產品隻運作在driver application所在的executor,功能是負責記錄下所有blockids存儲在哪個slaveworker上,比如rdd task運作在機器a,所需要的blockid為3,但在機器a上沒有blockid為3的數值,這個時候slave worker需要通過blockmanager向blockmanagermaster詢問資料存儲的位置,然後再通過connectionmanager去擷取,具體參看“資料遠端擷取一節”

由于blockmanager起到實際的存儲管控作用,是以在講支援的操作的時候,以blockmanager中的public api為例

put  資料寫入

get      資料讀取

remoterdd 資料删除,一旦整個job完成,所有的中間計算結果都可以删除

上述的各個子產品由sparkenv來建立,建立過程在sparkenv.create中完成

這段代碼容易讓人疑惑,看起來像是在所有的cluster node上都建立了blockmanagermasteractor,其實不然,仔細看registerorlookup函數的實作。如果目前節點是driver則建立這個actor,否則建立到driver的連接配接。

初始化過程中一個主要的動作就是blockmanager需要向blockmanagermaster發起注冊

Apache Spark源碼走讀(六)Task運作期之函數調用關系分析 &amp;存儲子系統分析

<b>資料寫入的簡要流程:</b>

rdd.iterator是與storage子系統互動的入口

cachemanager.getorcompute調用blockmanager的put接口來寫入資料

資料優先寫入到memorystore即記憶體,如果memorystore中的資料已滿則将最近使用次數不頻繁的資料寫入到磁盤

通知blockmanagermaster有新的資料寫入,在blockmanagermaster中儲存中繼資料

将寫入的資料與其它slave worker進行同步,一般來說在本機寫入的資料,都會另先一台機器來進行資料的備份,即replicanumber=1

寫入的具體内容可以是序列化之後的bytes也可以是沒有序列化的value. 此處有一個對scala的文法中either, left, right關鍵字的了解。

首先在查詢本機的memorystore和diskstore中是否有所需要的block資料存在,如果沒有則發起遠端資料擷取。

遠端擷取調用路徑, getremote-&gt;dogetremote, 在dogetremote中最主要的就是調用blockmanagerworker.syncgetblock來從遠端獲得資料

上述這段代碼中最有意思的莫過于sendmessagereliablysync,遠端資料讀取毫無疑問是一個異步i/o操作,這裡的代碼怎麼寫起來就像是在進行同步的操作一樣呢。也就是說如何知道對方發送回來響應的呢?

别急,繼續去看看sendmessagereliablysync的定義

要是我說秘密在這裡,你肯定會說我在扯淡,但确實在此處。注意到關鍵字promise和future沒。

如果這個future執行完畢,傳回s.ackmessage。我們再看看這個ackmessage是在什麼地方被寫入的呢。看一看connectionmanager.handlemessage中的代碼片段

注意,此處的所調用的sentmessagestatus.markdone就會調用在sendmessagereliablysync中定義的promise.success. 不妨看看messagestatus的定義。

我想至此調用關系搞清楚了,scala中的future和promise了解起來還有有點費勁。

 在spark的最新源碼中,storage子系統引入了tachyonstore. tachyonstore是在記憶體中實作了hdfs檔案系統的接口,主要目的就是盡可能的利用記憶體來作為資料持久層,避免過多的磁盤讀寫操作。

一點點疑問,目前在spark的存儲子系統中,通信子產品裡傳遞的資料即有“心跳檢測消息”,“資料同步的消息”又有“資料擷取之類的資訊流”。如果可能的話,要将心跳檢測與資料同步即資料擷取所使用的網卡分離以提高可靠性。

繼續閱讀