rdd是spark最基本,也是最根本的資料抽象。http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf 是關于rdd的論文。如果覺得英文閱讀太費時間,可以看這篇譯文:http://shiyanjun.cn/archives/744.html
本文也是基于這篇論文和源碼,分析rdd的實作。
第一個問題,rdd是什麼?resilient distributed datasets(rdd,) 彈性分布式資料集。rdd是隻讀的、分區記錄的集合。rdd隻能基于在穩定實體存儲中的資料集和其他已有的rdd上執行确定性操作來建立。這些确定性操作稱之為轉換,如map、filter、groupby、join(轉換不是程開發人員在rdd上執行的操作)。
rdd不需要物化。rdd含有如何從其他rdd衍生(即計算)出本rdd的相關資訊(即lineage),據此可以從實體存儲的資料計算出相應的rdd分區。
看一下内部實作對于rdd的概述:
internally, each rdd is characterized by five main properties:
*
* - a list of partitions
* - a function for computing each split
* - a list of dependencies on other rdds
* - optionally, a partitioner for key-value rdds (e.g. to say that the rdd is hash-partitioned)
* - optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an hdfs file)
每個rdd有5個主要的屬性:
一組分片(partition),即資料集的基本組成機關
一個計算每個分片的函數
對parent rdd的依賴,這個依賴描述了rdd之間的lineage
對于key-value的rdd,一個partitioner
一個清單,存儲存取每個partition的preferred位置。對于一個hdfs檔案來說,存儲每個partition所在的塊的位置。
org.apache.spark.rdd.rdd是一個抽象類,定義了rdd的基本操作和屬性。這些基本操作包括map,filter和persist。另外,org.apache.spark.rdd.pairrddfunctions定義了key-value類型的rdd的操作,包括groupbykey,join,reducebykey,countbykey,saveashadoopfile等。org.apache.spark.rdd.sequencefilerddfunctions包含了所有的rdd都适用的saveassequencefile。
rdd支援兩種操作:轉換(transformation)從現有的資料集建立一個新的資料集;而動作(actions)在資料集上運作計算後,傳回一個值給驅動程式。 例如,map就是一種轉換,它将資料集每一個元素都傳遞給函數,并傳回一個新的分布資料集表示結果。另一方面,reduce是一種動作,通過一些函數将所有的元素疊加起來,并将最終結果傳回給driver程式。(不過還有一個并行的reducebykey,能傳回一個分布式資料集)
spark中的所有轉換都是惰性的,也就是說,他們并不會直接計算結果。相反的,它們隻是記住應用到基礎資料集(例如一個檔案)上的這些轉換動作。隻有當發生一個要求傳回結果給driver的動作時,這些轉換才會真正運作。這個設計讓spark更加有效率的運作。例如,我們可以實作:通過map建立的一個新資料集,并在reduce中使用,最終隻傳回reduce的結果給driver,而不是整個大的新資料集。
預設情況下,每一個轉換過的rdd都會在你在它之上執行一個動作時被重新計算。不過,你也可以使用persist(或者cache)方法,持久化一個rdd在記憶體中。在這種情況下,spark将會在叢集中,儲存相關元素,下次你查詢這個rdd時,它将能更快速通路。在磁盤上持久化資料集,或在叢集間複制資料集也是支援的。
下表列出了spark中的rdd轉換和動作。每個操作都給出了辨別,其中方括号表示類型參數。前面說過轉換是延遲操作,用于定義新的rdd;而動作啟動計算操作,并向使用者程式傳回值或向外部存儲寫資料。
表1 spark中支援的rdd轉換和動作
轉換
map(f : t ) u) : rdd[t] ) rdd[u]
filter(f : t ) bool) : rdd[t] ) rdd[t]
flatmap(f : t ) seq[u]) : rdd[t] ) rdd[u]
sample(fraction : float) : rdd[t] ) rdd[t] (deterministic sampling)
groupbykey() : rdd[(k, v)] ) rdd[(k, seq[v])]
reducebykey(f : (v; v) ) v) : rdd[(k, v)] ) rdd[(k, v)]
union() : (rdd[t]; rdd[t]) ) rdd[t]
join() : (rdd[(k, v)]; rdd[(k, w)]) ) rdd[(k, (v, w))]
cogroup() : (rdd[(k, v)]; rdd[(k, w)]) ) rdd[(k, (seq[v], seq[w]))]
crossproduct() : (rdd[t]; rdd[u]) ) rdd[(t, u)]
mapvalues(f : v ) w) : rdd[(k, v)] ) rdd[(k, w)] (preserves partitioning)
sort(c : comparator[k]) : rdd[(k, v)] ) rdd[(k, v)]
partitionby(p : partitioner[k]) : rdd[(k, v)] ) rdd[(k, v)]
動作
count() : rdd[t] ) long
collect() : rdd[t] ) seq[t]
reduce(f : (t; t) ) t) : rdd[t] ) t
lookup(k : k) : rdd[(k, v)] ) seq[v] (on hash/range partitioned rdds)
save(path : string) : outputs rdd to a storage system, e.g., hdfs
注意,有些操作隻對鍵值對可用,比如join。另外,函數名與scala及其他函數式語言中的api比對,例如map是一對一的映射,而flatmap是将每個輸入映射為一個或多個輸出(與mapreduce中的map類似)。
除了這些操作以外,使用者還可以請求将rdd緩存起來。而且,使用者還可以通過partitioner類擷取rdd的分區順序,然後将另一個rdd按照同樣的方式分區。有些操作會自動産生一個哈希或範圍分區的rdd,像groupbykey,reducebykey和sort等。
下面的例子摘自rdd的論文,實作了處理一個hdfs日志檔案中錯誤日志的邏輯。
spark是一個org.apache.spark.sparkcontext的執行個體,基本上spark的應用都是以定義一個sparkcontext開始的。textfile的定義如下:
hadoopfile建立了一個org.apache.spark.rdd.hadooprdd,而在hadooprdd上調用map則生成了一個mappedrdd:
errors.cache()并不會立即執行,它的作用是在rdd的計算完成後,将結果cache起來,以供以後的計算使用,這樣的話可以加快以後運算的速度。
errors.count() 就觸發了一個action,這個時候就需要向叢集送出job了:
送出後,sparkcontext會将runjob送出到dagscheduler,dagscheduler會将目前的dag劃分成stage,然後生成taskset後通過taskscheduler的submittasks送出tasks,而這又會調用schedulerbackend,schedulerbackend會将這些任務發送到executor去執行。
如何劃分stage?如何生成tasks?接下來會進行解析。明天要上班了,今天早點休息吧。