天天看點

Spark的RDD原理以及2.0特性的介紹(轉)

spark 是什麼 

spark 是 apache 頂級項目裡面最火的大資料處理的計算引擎,它目前是負責大資料計算的工作。包括離線計算或互動式查詢、資料挖掘算法、流式計算以及圖計算等。全世界有許多公司群組織使用或給社群貢獻代碼,社群的活躍度見 www.github.com/apache/spark。

2013 年開始 spark開發團隊成立 databricks,來對 spark 進行運作和管理,并提供 cloud 服務。spark 社群基本保持一個季度一個版本,不出意外的話 spark 2.0 将在五月底釋出。

與 mapreduce 相比,spark 具備 dag 執行引擎以及基于記憶體的多輪疊代計算等優勢,在sql 層面上,比 hive/pig 相比,引入關系資料庫的許多特性,以及記憶體管理技術。另外在 spark 上所有的計算模型最終都統一基于 rdd 之上運作執行,包括流式和離線計算。spark 基于磁盤的性能是 mr 的 10 倍,基于記憶體的性能是 mr 的 100 倍 。

spark 提供 sql、機器學習庫 mllib、流計算 streaming 和圖計算 graphx,同時也支援 scala、java、python 和 r 語言開發的基于 api 的應用程式。

Spark的RDD原理以及2.0特性的介紹(轉)

rdd 的原理

rdd,英文全稱叫 resilient distributed datasets。

an rdd is a read-only, partitioned collection of records. 字面意思是隻讀的分布式資料集。

但其實個人覺得可以把 rdd 了解為關系資料庫 裡的一個個操作,比如 map,filter,join 等。在 spark 裡面實作了許多這樣的 rdd 類,即可以看成是操作類。當我們調用一個 map 接口,底層實作是會生成一個 mappartitionsrdd 對象,當 rdd 真正執行時,會調用 mappartitionsrdd 對象裡面的 compute 方法來執行這個操作的計算邏輯。但是不同的是,rdd 是 lazy 模式,隻有像 count,saveastext 這種 action 動作被調用後再會去觸發 runjob 動作。

rdd 分為二類:transformation 和 action。

transformation 是從一個 rdd 轉換為一個新的 rdd 或者從資料源生成一個新的 rdd;

action 是觸發 job 的執行。所有的 transformation 都是 lazy 執行,隻有在 action 被送出的時候才觸發前面整個 rdd 的執行圖。如下

val file = sc.textfile(args(0))

val words = file.flatmap(line => line.split(" "))

val wordcounts = words.map(x => (x, 1)).reducebykey(_ + _, 2) wordcounts.saveastextfile(args(1))

這段代碼生成的 rdd 的執行樹是如下圖所示:

Spark的RDD原理以及2.0特性的介紹(轉)

最終在 saveastextfile 方法時才會将整個 rdd 的執行圖送出給 dag 執行引擎,根據相關資訊切分成一個一個 stage,每個 stage 去執行多個 task,最終完成整個 job 的執行。

還有一個差別就是,rdd 計算後的中間結果是可以被持久化,當下一次需要使用時,可以直接使用之前持久化好的結果,而不是重新計算,并且這些結果被存儲在各個結點的 executor 上。下一次使用時,排程器可以直接把 task 分發到存儲持久化資料的結點上,減少資料的網絡傳輸開稍。這種場景在資料挖掘疊代計算是經常出現。如下代碼

val links = spark.textfile(...).map(...).persist() var ranks = // rdd of (url, rank) pairs

for (i <- 1 to iterations) {

// build an rdd of (targeturl, float) pairs // with the contributions sent by each page val contribs = links.join(ranks).flatmap {

(url, (links, rank)) =>

links.map(dest => (dest, rank/links.size)) }

// sum contributions by url and get new ranks

ranks = contribs.reducebykey((x,y) => x+y)

.mapvalues(sum => a/n + (1-a)*sum) }

以上代碼生成的 rdd 執行樹如下圖所示:

Spark的RDD原理以及2.0特性的介紹(轉)

計算 contribs-0 時需要使用 links 的計算邏輯,當 links 每個分片計算完後,會将這個結果儲存到本地記憶體或磁盤上,下一次 contribs-1 計算要使用 links 的資料時,直接從上一次儲存的記憶體和磁盤上讀取就可以了。這個持久化系統叫做 blockmanager,類似于在内部再建構了一個 kv 系統,k 表示每個分區 id 号,v 表示這個分區計算後的結果。

另外在 streaming 計算時,每個 batch 會去消息隊列上拉取這個時間段的資料,每個 recevier 接收過來資料形成 block 塊并存放到 blockmanager 上,為了可靠性,這個 block 塊可以遠端備份,後續的 batch 計算就直接在之前已讀取的 block 塊上進行計算,這樣不斷循環疊代來完成流處理。

一個 rdd 一般會有以下四個函數組成。

1. 操作算子的實體執行邏輯

定義為:

def compute(split: partition, context: taskcontext): iterator[t]

如在 mappartitionsrdd 裡的實作是如下:

override def compute(split: partition, context: taskcontext): iterator[u] = f(context, split.index, firstparent[t].iterator(split, context))

函數定義

f: (taskcontext, int, iterator[t]) => iterator[u]

2. 擷取分片資訊

protected def getpartitions: array[partition] 

即這個操作的資料劃分為多少個分 區。跟 mapreduce 裡的 map 上的 split 類似的。

3. 擷取父 rdd 的依賴關系

protected def getdependencies: seq[dependency[_]] 

依賴分二種:如果 rdd 的每個分區最多隻能被一個 child rdd 的一個分區使用,則稱之為 narrow dependency;若依賴于多個 child rdd 分區,則稱之為 wide dependency。不同的操作根據其特性,可能會産生不同的依賴 。如下圖所示

Spark的RDD原理以及2.0特性的介紹(轉)

map 操作前後二個 rdd 操作之間的分區是一對一的關系,故産生 narrow dependency,而 join 操作的分區分别對應于它的二個子操作相對應的分區,故産生 wide dependency。當最後要生成具體的 task 運作時,就需要利用這個依賴關系也生成 stage 的 dag 圖。

4. 擷取該操作對應資料的存放位置資訊,主要是針對 hdfs 這類有資料源的 rdd。

protected def getpreferredlocations(split: partition): seq[string]

spark 的執行模式

spark 的執行模式有 local、yarn、standalone、mesos 四類。後面三個分别有 cluster 和 client 二種。client 和 cluster 的差別就是指 driver 是在程式送出用戶端還是在叢集的 am 上。 比如常見的 yarn-cluster 模式如下圖所示:

Spark的RDD原理以及2.0特性的介紹(轉)

一般來說,運作簡單測試或 ut 用的是 local 模式運作,其實就是用多線程模似分布式執行。 如果業務部門較少且不需要對部門或組之間的資源做劃分和優先級排程的話,可以使用 standalone 模式來部署。

當如果有多個部門或組,且希望每個組織可以限制固定運作的最大資源,另外組或者任務需要有優先級執行的話,可以選擇 yarn 或 mesos。

spark 2.0 的特性

unifying dataframes and datasets in scala/java

dataframe 和 dataset 的功能是什麼?

它們都是提供給使用者使用,包括各類操作接口的 api。1.3 版本引入 dataframe,1.6 版本引入 dataset,2.0 提供的功能是将二者統一,即保留 dataset,而把 dataframe 定義為 dataset[row],即是 dataset 裡的元素對象為 row 的一種(spark-13485)。

dataframe,它就是提供了一系列操作 api,與 rdd api 相比較,dataframe 裡操作的資料都是帶有 schema 資訊,是以 dataframe 裡的所有操作是可以享受 spark sql catalyst optimizer 帶來的性能提升,比如 code generation 以及 tungsten等。執行過程如下圖所示

Spark的RDD原理以及2.0特性的介紹(轉)

但是 dataframe 出來後發現有些情況下 rdd 可以表達的邏輯用 dataframe 無法表達。比如 要對 group by 或 join 後的結果用自定義的函數,可能用 sql 是無法表達的。如下代碼:

case class classdata(a: string, b: int)

case class classnullabledata(a: string, b: integer)

val ds = seq(classdata("a", 1), classdata("a", 2)).tods()

val agged = ds.groupbykey(d => classnullabledata(d.a, null))

.mapgroups {

case (key, values) => key.a + values.map(_.b).sum

}

中間處理過程的資料是自定義的類型,并且 groupby 後的聚合邏輯也是自定義的,故用 sql 比較難以表達,是以提出了 dataset api。dataset api 擴充 dataframe api 支援靜态類型和運作已經存在的 scala 或 java 語言的使用者自定義函數。同時 dataset 也能享受 spark sql 裡所有性能 帶來的提升。

那麼後面發現 dataset 是包含了 dataframe 的功能,這樣二者就出現了很大的備援,故在 2.0 時将二者統一,保留 dataset api,把 dataframe 表示為 dataset[row],即 dataset 的子集。

是以我們在使用 api 時,優先選擇 dataframe & dataset,因為它的性能很好,而且以後的優化它都可以享受到,但是為了相容早期版本的程式,rdd api 也會一直保留着。後續 spark 上層的庫将全部會用 dataframe,比如 mllib、streaming、graphx 等。

whole-stage code generation

其中一個例子:

select count(*) from store_sales where ss_item_sk = 1000

那麼在翻譯成計算引擎的執行計劃如下圖:

Spark的RDD原理以及2.0特性的介紹(轉)

而通常實體計劃的代碼是這樣實作的:

class filter {

def next(): boolean = {

var found = false

while (!found && child.next()) {

found = predicate(child.fetch())

return found

def fetch(): internalrow = {

child.fetch()

}...

但是真正如果我們用 hard code 寫的話,代碼是這樣的:

var count = 0

for (ss_item_sk in store_sales) {

if (ss_item_sk == 1000) {

count += 1

發現二者相關如下圖所示:

Spark的RDD原理以及2.0特性的介紹(轉)

那麼如何使得計算引擎的實體執行速度能達到 hard code 的性能呢?這就提出了 whole-stage code generation,即對實體執行的多次調用轉換為代碼 for 循環,類似 hard code 方式,減少中間執行的函數調用次數,當資料記錄多時,這個調用次數是很大。 最後這個優化帶來的性能提升如下圖所示:

Spark的RDD原理以及2.0特性的介紹(轉)

從 benchmark 的結果可以看出,使用了該特性後各操作的性能都有很大的提升。

structured streaming

spark streaming 是把流式計算看成一個一個的離線計算來完成流式計算,提供了一套 dstream 的流 api,相比于其他的流式計算,spark streaming 的優點是容錯性和吞吐量上要有優勢,關于 spark streaming 的詳細設計思想和分析,可以到 https://github.com/lw-lin/coolplayspark 進行詳細學習和了解。

在 2.0 以前的版本,使用者在使用時,如果有流計算,又有離線計算,就需要用二套 api 去編寫程式,一套是 rdd api,一套是 dstream api。而且 dstream api 在易用性上遠不如 sql 或 dataframe。

為了真正将流式計算和離線計算在程式設計 api 上統一,同時也讓 streaming 作業能夠享受 dataframe/dataset 上所帶來的優勢:性能提升和 api 易用,于是提出了 structured streaming。最後我們隻需要基于 dataframe/dataset 可以開發離線計算和流式計算的程式,很容易使得 spark 在 api 跟業界所說的 dataflow 來統一離線計算和流式計算效果一樣。

比如在做 batch aggregation 時我們可以寫成下面的代碼

Spark的RDD原理以及2.0特性的介紹(轉)

那麼對于流式計算時,我們僅僅是調用了 dataframe/dataset 的不同函數代碼,如下:

Spark的RDD原理以及2.0特性的介紹(轉)

最後,在 dataframe/dataset 這個 api 上可以完成如下圖所示的所有應用:

Spark的RDD原理以及2.0特性的介紹(轉)

其他主要性能提升

采用 vectorized parquet decoder 讀取 parquet 上資料。以前是一行一行的讀取,然後處理。現在改為一次讀取 4096 行記錄,不需要每處理一行記錄去調用一次 parquet 擷取記錄的方法,而是改為一批去調用一次(spark-12854)。加上 parquet 本身是列式存儲,這個優化使得 parquet 讀取速度提高 3 倍。

采有 radix sort 來提高 sort 的性能(spark-14724)。在某些情況下排序性能可以提高 10-20 倍。

使用 vectorizedhashmap 來代替 java 的 hashmap 加速 group by 的執行(spark-14319)。

将 hive 中的 window 函數用 native spark window 實作,因為 native spark window 在記憶體管理上有優勢(spark-8641)。

避免複雜語句中的邏輯相同部分在執行時重複計算(spark-13523)。

壓縮算法預設使用 lz4(spark-12388)。

語句的增強

建立新的文法解析(spark-12362)滿足所有的 sql 文法,這樣即合并 hive 和标準 sql 的語句解析,同時不依賴 hive 的文法解析 jar(spark-14776)。之前版本二者的文法解析是獨立的,這樣導緻在标準 sql 中無法使用視窗函數或者 hive 的文法,而在使用 hive 文法時無法使用标準 sql 的文法,比如 in/exists 子句等。在 sql 編寫時,沒法在一個 context 把二者的範圍全部支援,然而有了這個特性後,使得 sql 語句表達更強大,後續要增加任何文法,隻需要維護這一個文法解析即可。當然缺點是後續 hive 版本的新文法,需要手動添加進來。

支援 intersect/except(spark-12542)。如 select * from t1 except select * from t2 或者 select * from t1 intersect select * from t2。

支援 uncorrelated scalar subquery(spark-13417)。如 select (select min(value) from testdata where key = (select max(key) from testdata) - 1)。

支援 ddl/dml(spark-14118)。之前 ddl/dml 語句是調用 hive 的 ddl/dml 語句指令來完成,而現在是直接在 spark sql 上就可以完成。

支援 multi-insert(spark-13924)。

支援 exist(spark-12545)和 not exists(spark-10600),如 select * from (select 1 as a union all select 2 as a) t where exists (select * from (select 1 as b) t2 where b = a and b < 2)。

支援 subqueries 帶有 in/not in 子句(spark-4226),如 select * from (select 1 as a union all select 2 as a) t where a in (select b as a from t2 where b < 2)。

支援 select/where/having 中使用 subquery(spark-12543),如 select * from t where a = (select max(b) from t2) 或 select max(a) as ma from t having ma = (select max(b) from t2)。

支援 leftsemi/leftanti(spark-14853)。

支援在條件表達式 in/not in 裡使用子句(spark-14781),如 select * from l where l.a in (select c from r) or l.a in (select c from r where l.b < r.d)。

支援所有的 tpcds 語句(spark-12540)。

與以前版本相容(spark-11806)

不支援運作在 hadoop 版本 < 2.2 上(spark-11807)。

去掉 httpbroadcast(spark-12588)。

去掉 hashshufflemanager(spark-14667)。

去掉 akka rpc。

簡化與完善 accumulators and task metrics(spark-14626)。

将 hive 文法解析以及文法移至 core 裡(spark-14825),在沒有 hive 中繼資料庫和 hive 依賴包時,我們可以像之前版本使用标準 sql 一樣去使用 hiveql 語句。

1.6 版本嚴重問題的解決

在 http://geek.csdn.net/news/detail/70162 提到的 1.6 問題中 spillable 集合記憶體溢出問題在 spark-4452 裡已解決,blockmanager 死鎖問題在 spark-12757 裡已解決。

最後 2.0 版本還有一些其他的特性,如:

用 sparksession 替換掉原來的 sqlcontext and hivecontext。

mllib 裡的計算用 dataframe-based api 代替以前的 rdd 計算邏輯。

提供更多的 r 語言算法。

預設使用 scala 2.11 編譯與運作。

繼續閱讀