天天看點

RDD:基于記憶體的叢集計算容錯抽象

本文提出了分布式記憶體抽象的概念——彈性分布式資料集(rdd,resilient distributed datasets),它具備像mapreduce等資料流模型的容錯特性,并且允許開發人員在大型叢集上執行基于記憶體的計算。現有的資料流系統對兩種應用的處理并不高效:一是疊代式算法,這在圖應用和機器學習領域很常見;二是互動式資料挖掘工具。這兩種情況下,将資料儲存在記憶體中能夠極大地提高性能。為了有效地實作容錯,rdd提供了一種高度受限的共享記憶體,即rdd是隻讀的,并且隻能通過其他rdd上的批量操作來建立。盡管如此,rdd仍然足以表示很多類型的計算,包括mapreduce和專用的疊代程式設計模型(如pregel)等。我們實作的rdd在疊代計算方面比hadoop快20多倍,同時還可以在5-7秒内互動式地查詢1tb資料集。

無論是工業界還是學術界,都已經廣泛使用進階叢集程式設計模型來處理日益增長的資料,如mapreduce和dryad。這些系統将分布式程式設計簡化為自動提供位置感覺性排程、容錯以及負載均衡,使得大量使用者能夠在商用叢集上分析超大資料集。

大多數現有的叢集計算系統都是基于非循環的資料流模型。從穩定的實體存儲(如分布式檔案系統)中加載記錄,記錄被傳入由一組确定性操作構成的dag,然後寫回穩定存儲。dag資料流圖能夠在運作時自動實作任務排程和故障恢複。

盡管非循環資料流是一種很強大的抽象方法,但仍然有些應用無法使用這種方式描述。我們就是針對這些不太适合非循環模型的應用,它們的特點是在多個并行操作之間重用工作資料集。這類應用包括:(1)機器學習和圖應用中常用的疊代算法(每一步對資料執行相似的函數);(2)互動式資料挖掘工具(使用者反複查詢一個資料子集)。基于資料流的架構并不明确支援工作集,是以需要将資料輸出到磁盤,然後在每次查詢時重新加載,這帶來較大的開銷。

我們提出了一種分布式的記憶體抽象,稱為彈性分布式資料集(rdd,resilient distributed datasets)。它支援基于工作集的應用,同時具有資料流模型的特點:自動容錯、位置感覺排程和可伸縮性。rdd允許使用者在執行多個查詢時顯式地将工作集緩存在記憶體中,後續的查詢能夠重用工作集,這極大地提升了查詢速度。

rdd提供了一種高度受限的共享記憶體模型,即rdd是隻讀的記錄分區的集合,隻能通過在其他rdd執行确定的轉換操作(如map、join和group by)而建立,然而這些限制使得實作容錯的開銷很低。與分布式共享記憶體系統需要付出高昂代價的檢查點和復原機制不同,rdd通過lineage來重建丢失的分區:一個rdd中包含了如何從其他rdd衍生所必需的相關資訊,進而不需要檢查點操作就可以重構丢失的資料分區。盡管rdd不是一個通用的共享記憶體抽象,但卻具備了良好的描述能力、可伸縮性和可靠性,但卻能夠廣泛适用于資料并行類應用。

第一個指出非循環資料流存在不足的并非是我們,例如,google的pregel[21],是一種專門用于疊代式圖算法的程式設計模型;twister[13]和haloop[8],是兩種典型的疊代式mapreduce模型。但是,對于一些特定類型的應用,這些系統提供了一個受限的通信模型。相比之下,rdd則為基于工作集的應用提供了更為通用的抽象,使用者可以對中間結果進行顯式的命名和物化,控制其分區,還能執行使用者選擇的特定操作(而不是在運作時去循環執行一系列mapreduce步驟)。rdd可以用來描述pregel、疊代式mapreduce,以及這兩種模型無法描述的其他應用,如互動式資料挖掘工具(使用者将資料集裝入記憶體,然後執行ad-hoc查詢)。

spark是我們實作的rdd系統,在我們内部能夠被用于開發多種并行應用。spark采用scala語言[5]實作,提供類似于dryadlinq的內建語言程式設計接口[34],使使用者可以非常容易地編寫并行任務。此外,随着scala新版本解釋器的完善,spark還能夠用于互動式查詢大資料集。我們相信spark會是第一個能夠使用有效、通用程式設計語言,并在叢集上對大資料集進行互動式分析的系統。

我們通過微基準和使用者應用程式來評估rdd。實驗表明,在處理疊代式應用上spark比hadoop快高達20多倍,計算資料分析類報表的性能提高了40多倍,同時能夠在5-7秒的延時内互動式掃描1tb資料集。此外,我們還在spark之上實作了pregel和haloop程式設計模型(包括其位置優化政策),以庫的形式實作(分别使用了100和200行scala代碼)。最後,利用rdd内在的确定性特性,我們還建立了一種spark調試工具rddbg,允許使用者在任務期間利用lineage重建rdd,然後像傳統調試器那樣重新執行任務。

本文首先在第2部分介紹了rdd的概念,然後第3部分描述spark api,第4部分解釋如何使用rdd表示幾種并行應用(包括pregel和haloop),第5部分讨論spark中rdd的表示方法以及任務排程器,第6部分描述具體實作和rddbg,第7部分對rdd進行評估,第8部分給出了相關研究工作,最後第9部分總結。

本部分描述rdd和程式設計模型。首先讨論設計目标(2.1),然後定義rdd(2.2),讨論spark的程式設計模型(2.3),并給出一個示例(2.4),最後對比rdd與分布式共享記憶體(2.5)。

我們的目标是為基于工作集的應用(即多個并行操作重用中間結果的這類應用)提供抽象,同時保持mapreduce及其相關模型的優勢特性:即自動容錯、位置感覺性排程和可伸縮性。rdd比資料流模型更易于程式設計,同時基于工作集的計算也具有良好的描述能力。

在這些特性中,最難實作的是容錯性。一般來說,分布式資料集的容錯性有兩種方式:即資料檢查點和記錄資料的更新。我們面向的是大規模資料分析,資料檢查點操作成本很高:需要通過資料中心的網絡連接配接在機器之間複制龐大的資料集,而網絡帶寬往往比記憶體帶寬低得多,同時還需要消耗更多的存儲資源(在記憶體中複制資料可以減少需要緩存的資料量,而存儲到磁盤則會拖慢應用程式)。是以,我們選擇記錄更新的方式。但是,如果更新太多,那麼記錄更新成本也不低。是以,rdd隻支援粗粒度轉換,即在大量記錄上執行的單個操作。将建立rdd的一系列轉換記錄下來(即lineage),以便恢複丢失的分區。

雖然隻支援粗粒度轉換限制了程式設計模型,但我們發現rdd仍然可以很好地适用于很多應用,特别是支援資料并行的批量分析應用,包括資料挖掘、機器學習、圖算法等,因為這些程式通常都會在很多記錄上執行相同的操作。rdd不太适合那些異步更新共享狀态的應用,例如并行web爬行器。是以,我們的目标是為大多數分析型應用提供有效的程式設計模型,而其他類型的應用交給專門的系統。

rdd是隻讀的、分區記錄的集合。rdd隻能基于在穩定實體存儲中的資料集和其他已有的rdd上執行确定性操作來建立。這些确定性操作稱之為轉換,如map、filter、groupby、join(轉換不是程開發人員在rdd上執行的操作)。

rdd不需要物化。rdd含有如何從其他rdd衍生(即計算)出本rdd的相關資訊(即lineage),據此可以從實體存儲的資料計算出相應的rdd分區。

在spark中,rdd被表示為對象,通過這些對象上的方法(或函數)調用轉換。

定義rdd之後,程式員就可以在動作中使用rdd了。動作是向應用程式傳回值,或向存儲系統導出資料的那些操作,例如,count(傳回rdd中的元素個數),collect(傳回元素本身),save(将rdd輸出到存儲系統)。在spark中,隻有在動作第一次使用rdd時,才會計算rdd(即延遲計算)。這樣在建構rdd的時候,運作時通過管道的方式傳輸多個轉換。

程式員還可以從兩個方面控制rdd,即緩存和分區。使用者可以請求将rdd緩存,這樣運作時将已經計算好的rdd分區存儲起來,以加速後期的重用。緩存的rdd一般存儲在記憶體中,但如果記憶體不夠,可以寫到磁盤上。

另一方面,rdd還允許使用者根據關鍵字(key)指定分區順序,這是一個可選的功能。目前支援哈希分區和範圍分區。例如,應用程式請求将兩個rdd按照同樣的哈希分區方式進行分區(将同一機器上具有相同關鍵字的記錄放在一個分區),以加速它們之間的join操作。在pregel和haloop中,多次疊代之間采用一緻性的分區置換政策進行優化,我們同樣也允許使用者指定這種優化。

本部分我們通過一個具體示例來闡述rdd。假定有一個大型網站出錯,操作員想要檢查hadoop檔案系統(hdfs)中的日志檔案(tb級大小)來找出原因。通過使用spark,操作員隻需将日志中的錯誤資訊裝載到一組節點的記憶體中,然後執行互動式查詢。首先,需要在spark解釋器中輸入如下scala指令:

<code>1</code>

<code>2</code>

<code>errors</code><code>=</code> <code>lines.filter(</code><code>_</code><code>.startswith(</code><code>"error"</code><code>))</code>

<code>3</code>

<code>errors.cache()</code>

第1行從hdfs檔案定義了一個rdd(即一個文本行集合),第2行獲得一個過濾後的rdd,第3行請求将errors緩存起來。注意在scala文法中filter的參數是一個閉包。

這時叢集還沒有開始執行任何任務。但是,使用者已經可以在這個rdd上執行對應的動作,例如統計錯誤消息的數目:

<code>errors.count()</code>

使用者還可以在rdd上執行更多的轉換操作,并使用轉換結果,如:

<code>// count errors mentioning mysql:</code>

<code>errors.filter(</code><code>_</code><code>.contains(</code><code>"mysql"</code><code>)).count()</code>

<code>// return the time fields of errors mentioning</code>

<code>4</code>

<code>// hdfs as an array (assuming time is field</code>

<code>5</code>

<code>// number 3 in a tab-separated format):</code>

<code>6</code>

<code>errors.filter(</code><code>_</code><code>.contains(</code><code>"hdfs"</code><code>))</code>

<code>7</code>

<code></code><code>.map(</code><code>_</code><code>.split(</code><code>'\t'</code><code>)(</code><code>3</code><code>))</code>

<code>8</code>

<code></code><code>.collect()</code>

使用errors的第一個action運作以後,spark會把errors的分區緩存在記憶體中,極大地加快了後續計算速度。注意,最初的rdd lines不會被緩存。因為錯誤資訊可能隻占原資料集的很小一部分(小到足以放入記憶體)。

最後,為了說明模型的容錯性,圖1給出了第3個查詢的lineage圖。在lines rdd上執行filter操作,得到errors,然後再filter、map後得到新的rdd,在這個rdd上執行collect操作。spark排程器以流水線的方式執行後兩個轉換,向擁有errors分區緩存的節點發送一組任務。此外,如果某個errors分區丢失,spark隻在相應的lines分區上執行filter操作來重建該errors分區。

RDD:基于記憶體的叢集計算容錯抽象

圖1 示例中第三個查詢的lineage圖。(方框表示rdd,箭頭表示轉換)

為了進一步了解rdd是一種分布式的記憶體抽象,表1列出了rdd與分布式共享記憶體(dsm,distributed shared memory)[24]的對比。在dsm系統中,應用可以向全局位址空間的任意位置進行讀寫操作。(注意這裡的dsm,不僅指傳統的共享記憶體系統,還包括那些通過分布式哈希表或分布式檔案系統進行資料共享的系統,比如piccolo[28])dsm是一種通用的抽象,但這種通用性同時也使得在商用叢集上實作有效的容錯性更加困難。

rdd與dsm主要差別在于,不僅可以通過批量轉換建立(即“寫”)rdd,還可以對任意記憶體位置讀寫。也就是說,rdd限制應用執行批量寫操作,這樣有利于實作有效的容錯。特别地,rdd沒有檢查點開銷,因為可以使用lineage來恢複rdd。而且,失效時隻需要重新計算丢失的那些rdd分區,可以在不同節點上并行執行,而不需要復原整個程式。

表1 rdd與dsm對比<b>對比項目</b><b>rdd</b><b>分布式共享記憶體(dsm)</b>讀批量或細粒度操作細粒度操作寫批量轉換操作細粒度操作一緻性不重要(rdd是不可更改的)取決于應用程式或運作時容錯性細粒度,低開銷(使用lineage)需要檢查點操作和程式復原落後任務的處理任務備份很難處理任務安排基于資料存放的位置自動實作取決于應用程式(通過運作時實作透明性)如果記憶體不夠與已有的資料流系統類似性能較差(交換?)

注意,通過備份任務的拷貝,rdd還可以處理落後任務(即運作很慢的節點),這點與mapreduce[12]類似。而dsm則難以實作備份任務,因為任務及其副本都需要讀寫同一個記憶體位置。

與dsm相比,rdd模型有兩個好處。第一,對于rdd中的批量操作,運作時将根據資料存放的位置來排程任務,進而提高性能。第二,對于基于掃描的操作,如果記憶體不足以緩存整個rdd,就進行部分緩存。把記憶體放不下的分區存儲到磁盤上,此時性能與現有的資料流系統差不多。

最後看一下讀操作的粒度。rdd上的很多動作(如count和collect)都是批量讀操作,即掃描整個資料集,可以将任務配置設定到距離資料最近的節點上。同時,rdd也支援細粒度操作,即在哈希或範圍分區的rdd上執行關鍵字查找。

spark用scala[5]語言實作了rdd的api。scala是一種基于jvm的靜态類型、函數式、面向對象的語言。我們選擇scala是因為它簡潔(特别适合互動式使用)、有效(因為是靜态類型)。但是,rdd抽象并不局限于函數式語言,也可以使用其他語言來實作rdd,比如像hadoop[2]那樣用類表示使用者函數。

要使用spark,開發者需要編寫一個driver程式,連接配接到叢集以運作worker,如圖2所示。driver定義了一個或多個rdd,并調用rdd上的動作。worker是長時間運作的程序,将rdd分區以java對象的形式緩存在記憶體中。

RDD:基于記憶體的叢集計算容錯抽象

圖2 spark的運作時。使用者的driver程式啟動多個worker,worker從分布式檔案系統中讀取資料塊,并将計算後的rdd分區緩存在記憶體中。

再看看2.4中的例子,使用者執行rdd操作時會提供參數,比如map傳遞一個閉包(closure,函數式程式設計中的概念)。scala将閉包表示為java對象,如果傳遞的參數是閉包,則這些對象被序列化,通過網絡傳輸到其他節點上進行裝載。scala将閉包内的變量儲存為java對象的字段。例如,var x = 5; rdd.map(_ + x) 這段代碼将rdd中的每個元素加5。總的來說,spark的語言內建類似于dryadlinq。

rdd本身是靜态類型對象,由參數指定其元素類型。例如,rdd[int]是一個整型rdd。不過,我們舉的例子幾乎都省略了這個類型參數,因為scala支援類型推斷。

雖然在概念上使用scala實作rdd很簡單,但還是要處理一些scala閉包對象的反射問題。如何通過scala解釋器來使用spark還需要更多工作,這點我們将在第6部分讨論。不管怎樣,我們都不需要修改scala編譯器。

表2列出了spark中的rdd轉換和動作。每個操作都給出了辨別,其中方括号表示類型參數。前面說過轉換是延遲操作,用于定義新的rdd;而動作啟動計算操作,并向使用者程式傳回值或向外部存儲寫資料。

表3 spark中支援的rdd轉換和動作轉換map(f : t ) u) : rdd[t] ) rdd[u]

filter(f : t ) bool) : rdd[t] ) rdd[t]

flatmap(f : t ) seq[u]) : rdd[t] ) rdd[u]

sample(fraction : float) : rdd[t] ) rdd[t] (deterministic sampling)

groupbykey() : rdd[(k, v)] ) rdd[(k, seq[v])]

reducebykey(f : (v; v) ) v) : rdd[(k, v)] ) rdd[(k, v)]

union() : (rdd[t]; rdd[t]) ) rdd[t]

join() : (rdd[(k, v)]; rdd[(k, w)]) ) rdd[(k, (v, w))]

cogroup() : (rdd[(k, v)]; rdd[(k, w)]) ) rdd[(k, (seq[v], seq[w]))]

crossproduct() : (rdd[t]; rdd[u]) ) rdd[(t, u)]

mapvalues(f : v ) w) : rdd[(k, v)] ) rdd[(k, w)] (preserves partitioning)

sort(c : comparator[k]) : rdd[(k, v)] ) rdd[(k, v)]

partitionby(p : partitioner[k]) : rdd[(k, v)] ) rdd[(k, v)]動作count() : rdd[t] ) long

collect() : rdd[t] ) seq[t]

reduce(f : (t; t) ) t) : rdd[t] ) t

lookup(k : k) : rdd[(k, v)] ) seq[v] (on hash/range partitioned rdds)

save(path : string) : outputs rdd to a storage system, e.g., hdfs

注意,有些操作隻對鍵值對可用,比如join。另外,函數名與scala及其他函數式語言中的api比對,例如map是一對一的映射,而flatmap是将每個輸入映射為一個或多個輸出(與mapreduce中的map類似)。

除了這些操作以外,使用者還可以請求将rdd緩存起來。而且,使用者還可以通過partitioner類擷取rdd的分區順序,然後将另一個rdd按照同樣的方式分區。有些操作會自動産生一個哈希或範圍分區的rdd,像groupbykey,reducebykey和sort等。

現在我們講述如何使用rdd表示幾種基于資料并行的應用。首先讨論一些疊代式機器學習應用(4.1),然後看看如何使用rdd描述幾種已有的叢集程式設計模型,即mapreduce(4.2),pregel(4.3),和hadoop(4.4)。最後讨論一下rdd不适合哪些應用(4.5)。

很多機器學習算法都具有疊代特性,運作疊代優化方法來優化某個目标函數,例如梯度下降方法。如果這些算法的工作集能夠放入記憶體,将極大地加速程式運作。而且,這些算法通常采用批量操作,例如映射和求和,這樣更容易使用rdd來表示。

例如下面的程式是邏輯回歸[15]的實作。邏輯回歸是一種常見的分類算法,即尋找一個最佳分割兩組點(即垃圾郵件和非垃圾郵件)的超平面w。算法采用梯度下降的方法:開始時w為随機值,在每一次疊代的過程中,對w的函數求和,然後朝着優化的方向移動w。

<code>val</code> <code>points</code><code>=</code> <code>spark.textfile(...)</code>

<code></code><code>.map(parsepoint).persist()</code>

<code>var</code> <code>w</code><code>=</code> <code>// random initial vector</code>

<code>for</code> <code>(i &lt;-</code><code>1</code> <code>to iterations) {</code>

<code></code><code>val</code> <code>gradient</code><code>=</code> <code>points.map{ p</code><code>=</code><code>&gt;</code>

<code></code><code>p.x * (</code><code>1</code><code>/(</code><code>1</code><code>+exp(-p.y*(w dot p.x)))-</code><code>1</code><code>)*p.y</code>

<code></code><code>}.reduce((a,b)</code><code>=</code><code>&gt; a+b)</code>

<code></code><code>w -</code><code>=</code> <code>gradient</code>

<code>9</code>

<code>}</code>

首先定義一個名為points的緩存rdd,這是在文本檔案上執行map轉換之後得到的,即将每個文本行解析為一個point對象。然後在points上反複執行map和reduce操作,每次疊代時通過對目前w的函數進行求和來計算梯度。7.1小節我們将看到這種在記憶體中緩存points的方式,比每次疊代都從磁盤檔案裝載資料并進行解析要快得多。

已經在spark中實作的疊代式機器學習算法還有:kmeans(像邏輯回歸一樣每次疊代時執行一對map和reduce操作),期望最大化算法(em,兩個不同的map/reduce步驟交替執行),交替最小二乘矩陣分解和協同過濾算法。chu等人提出疊代式mapreduce也可以用來實作常用的學習算法[11]。

mapreduce模型[12]很容易使用rdd進行描述。假設有一個輸入資料集(其元素類型為t),和兩個函數mymap: t =&gt; list[(ki, vi)] 和 myreduce: (ki; list[vi]) ) list[r],代碼如下:

<code>data.flatmap(mymap)</code>

<code></code><code>.groupbykey()</code>

<code></code><code>.map((k, vs)</code><code>=</code><code>&gt; myreduce(k, vs))</code>

如果任務包含combiner,則相應的代碼為:

<code></code><code>.reducebykey(mycombiner)</code>

<code></code><code>.map((k, v)</code><code>=</code><code>&gt; myreduce(k, v))</code>

reducebykey操作在mapper節點上執行部分聚集,與mapreduce的combiner類似。

pregel[21]是面向圖算法的基于bsp範式[32]的程式設計模型。程式由一系列超步(superstep)協調疊代運作。在每個超步中,各個頂點執行使用者函數,并更新相應的頂點狀态,變異圖拓撲,然後向下一個超步的頂點集發送消息。這種模型能夠描述很多圖算法,包括最短路徑,雙邊比對和pagerank等。

以pagerank為例介紹一下pregel的實作。目前pagerank[7]記為r,頂點表示狀态。在每個超步中,各個頂點向其所有鄰居發送貢獻值r/n,這裡n是鄰居的數目。下一個超步開始時,每個頂點将其分值(rank)更新為 α/n + (1 - α) * Σci,這裡的求和是各個頂點收到的所有貢獻值的和,n是頂點的總數。

pregel将輸入的圖劃分到各個worker上,并存儲在其記憶體中。在每個超步中,各個worker通過一種類似mapreduce的shuffle操作交換消息。

pregel的通信模式可以用rdd來描述,如圖3。主要思想是:将每個超步中的頂點狀态和要發送的消息存儲為rdd,然後根據頂點id分組,進行shuffle通信(即cogroup操作)。然後對每個頂點id上的狀态和消息應用使用者函數(即mapvalues操作),産生一個新的rdd,即(vertexid, (newstate, outgoingmessages))。然後執行map操作分離出下一次疊代的頂點狀态和消息(即mapvalues和flatmap操作)。代碼如下:

<code>val</code> <code>vertices</code><code>=</code> <code>// rdd of (id, state) pairs</code>

<code>val</code> <code>messages</code><code>=</code> <code>// rdd of (id, message) pairs</code>

<code>val</code> <code>grouped</code><code>=</code> <code>vertices.cogroup(messages)</code>

<code>val</code> <code>newdata</code><code>=</code> <code>grouped.mapvalues {</code>

<code></code><code>(vert, msgs)</code><code>=</code><code>&gt; userfunc(vert, msgs)</code>

<code></code><code>// returns (newstate, outgoingmsgs)</code>

<code>}.cache()</code>

<code>val</code> <code>newverts</code><code>=</code> <code>newdata.mapvalues((v,ms)</code><code>=</code><code>&gt; v)</code>

<code>val</code> <code>newmsgs</code><code>=</code> <code>newdata.flatmap((id,(v,ms))</code><code>=</code><code>&gt; ms)</code>

RDD:基于記憶體的叢集計算容錯抽象

圖3 使用rdd實作pregel時,一步疊代的資料流。(方框表示rdd,箭頭表示轉換)

需要注意的是,這種實作方法中,rdd grouped,newdata和newverts的分區方法與輸入rdd vertices一樣。是以,頂點狀态一直存在于它們開始執行的機器上,這跟原pregel一樣,這樣就減少了通信成本。因為cogroup和mapvalues保持了與輸入rdd相同的分區方法,是以分區是自動進行的。

完整的pregel程式設計模型還包括其他工具,比如combiner,附錄a讨論了它們的實作。下面将讨論pregel的容錯性,以及如何在實作相同容錯性的同時減少需要執行檢查點操作的資料量。

我們差不多用了100行scala代碼在spark上實作了一個類pregel的api。7.2小節将使用pagerank算法評估它的性能。

目前,pregel基于檢查點機制來為頂點狀态及其消息實作容錯[21]。然而作者是這樣描述的:通過在其它的節點上記錄已發消息日志,然後單獨重建丢失的分區,隻需要恢複局部資料即可。上面提到這兩種方式,rdd都能夠很好地支援。

通過4.3小節的實作,spark總是能夠基于lineage實作頂點和消息rdd的重建,但是由于過長的lineage鍊,恢複可能會付出高昂的代價。因為疊代rdd依賴于上一個rdd,對于部分分區來說,節點故障可能會導緻這些分區狀态的所有疊代版本丢失,這就要求使用一種“級聯-重新執行”[20]的方式去依次重建每一個丢失的分區。為了避免這個問題,使用者可以周期性地在頂點和消息rdd上執行save操作,将狀态資訊儲存到持久存儲中。然後,spark能夠在失敗的時候自動地重新計算這些丢失的分區(而不是復原整個程式)。

最後,我們意識到,rdd也能夠實作檢查點資料的reduce操作,這要求通過一種高效的檢查點方案來表達檢查點資料。在很多pregel作業中,頂點狀态都包括可變與不可變的元件,例如,在pagerank中,與一個頂點相鄰的頂點清單是不可變的,但是它們的排名是可變的,在這種情況下,我們可以使用一個來自可變資料的單獨rdd來替換不可變rdd,基于這樣一個較短的lineage鍊,檢查點僅僅是可變狀态,圖4解釋了這種方式。

RDD:基于記憶體的叢集計算容錯抽象

圖4 經過優化的pregel使用rdd的資料流。可變狀态rdd必須設定檢查點,不可變狀态才可被快速重建。

在pagerank中,不可變狀态(相鄰頂點清單)遠大于可變狀态(浮點值),是以這種方式能夠極大地降低開銷。

haloop[8]是hadoop的一個擴充版本,它能夠改善具有疊代特性的mapreduce程式的性能。基于haloop程式設計模型的應用,使用reduce階段的輸出作為map階段下一輪疊代的輸入。它的循環感覺任務排程器能夠保證,在每一輪疊代中處理同一個分區資料的連續map和reduce任務,一定能夠在同一台實體機上執行。確定疊代間locality特性,reduce資料在實體節點之間傳輸,并且允許資料緩存在本地磁盤而能夠被後續疊代重用。

使用rdd來優化haloop,我們在spark上實作了一個類似haloop的api,這個庫隻使用了200行scala代碼。通過partitionby能夠保證跨疊代的分區的一緻性,每一個階段的輸入和輸出被緩存以用于後續疊代。

在2.1節我們讨論過,rdd适用于具有批量轉換需求的應用,并且相同的操作作用于資料集的每一個元素上。在這種情況下,rdd能夠記住每個轉換操作,對應于lineage圖中的一個步驟,恢複丢失分區資料時不需要寫日志記錄大量資料。rdd不适合那些通過異步細粒度地更新來共享狀态的應用,例如web應用中的存儲系統,或者增量抓取和索引web資料的系統,這樣的應用更适合使用一些傳統的方法,例如資料庫、ramcloud[26]、percolator[27]和piccolo[28]。我們的目标是,面向批量分析應用的這類特定系統,提供一種高效的程式設計模型,而不是一些異步應用程式。

我們希望在不修改排程器的前提下,支援rdd上的各種轉換操作,同時能夠從這些轉換擷取lineage資訊。為此,我們為rdd設計了一組小型通用的内部接口。

簡單地說,每個rdd都包含:(1)一組rdd分區(partition,即資料集的原子組成部分);(2)對父rdd的一組依賴,這些依賴描述了rdd的lineage;(3)一個函數,即在父rdd上執行何種計算;(4)中繼資料,描述分區模式和資料存放的位置。例如,一個表示hdfs檔案的rdd包含:各個資料塊的一個分區,并知道各個資料塊放在哪些節點上。而且這個rdd上的map操作結果也具有同樣的分區,map函數是在父資料上執行的。表3總結了rdd的内部接口。

表3 spark中rdd的内部接口<b>操作</b><b>含義</b>partitions()傳回一組partition對象preferredlocations(p)根據資料存放的位置,傳回分區p在哪些節點通路更快dependencies()傳回一組依賴iterator(p, parentiters)按照父分區的疊代器,逐個計算分區p的元素partitioner()傳回rdd是否hash/range分區的中繼資料資訊

設計接口的一個關鍵問題就是,如何表示rdd之間的依賴。我們發現rdd之間的依賴關系可以分為兩類,即:(1)窄依賴(narrow dependencies):子rdd的每個分區依賴于常數個父分區(即與資料規模無關);(2)寬依賴(wide dependencies):子rdd的每個分區依賴于所有父rdd分區。例如,map産生窄依賴,而join則是寬依賴(除非父rdd被哈希分區)。另一個例子見圖5。

RDD:基于記憶體的叢集計算容錯抽象

圖5 窄依賴和寬依賴的例子。(方框表示rdd,實心矩形表示分區)

區分這兩種依賴很有用。首先,窄依賴允許在一個叢集節點上以流水線的方式(pipeline)計算所有父分區。例如,逐個元素地執行map、然後filter操作;而寬依賴則需要首先計算好所有父分區資料,然後在節點之間進行shuffle,這與mapreduce類似。第二,窄依賴能夠更有效地進行失效節點的恢複,即隻需重新計算丢失rdd分區的父分區,而且不同節點之間可以并行計算;而對于一個寬依賴關系的lineage圖,單個節點失效可能導緻這個rdd的所有祖先丢失部分分區,因而需要整體重新計算。

通過rdd接口,spark隻需要不超過20行代碼實作便可以實作大多數轉換。5.1小節給出了例子,然後我們讨論了怎樣使用rdd接口進行排程(5.2),最後讨論一下基于rdd的程式何時需要資料檢查點操作(5.3)。

hdfs檔案:目前為止我們給的例子中輸入rdd都是hdfs檔案,對這些rdd可以執行:partitions操作傳回各個資料塊的一個分區(每個partition對象中儲存資料塊的偏移),preferredlocations操作傳回資料塊所在的節點清單,iterator操作對資料塊進行讀取。

map:任何rdd上都可以執行map操作,傳回一個mappedrdd對象。該操作傳遞一個函數參數給map,對父rdd上的記錄按照iterator的方式執行這個函數,并傳回一組符合條件的父rdd分區及其位置。

union:在兩個rdd上執行union操作,傳回兩個父rdd分區的并集。通過相應父rdd上的窄依賴關系計算每個子rdd分區(注意union操作不會過濾重複值,相當于sql中的union all)。

sample:抽樣與映射類似,但是sample操作中,rdd需要存儲一個随機數産生器的種子,這樣每個分區能夠确定哪些父rdd記錄被抽樣。

join:對兩個rdd執行join操作可能産生窄依賴(如果這兩個rdd擁有相同的哈希分區或範圍分區),可能是寬依賴,也可能兩種依賴都有(比如一個父rdd有分區,而另一父rdd沒有)。

排程器根據rdd的結構資訊為每個動作确定有效的執行計劃。排程器的接口是runjob函數,參數為rdd及其分區集,和一個rdd分區上的函數。該接口足以表示spark中的所有動作(即count、collect、save等)。

總的來說,我們的排程器跟dryad類似,但我們還考慮了哪些rdd分區是緩存在記憶體中的。排程器根據目标rdd的lineage圖建立一個由stage構成的無回路有向圖(dag)。每個stage内部盡可能多地包含一組具有窄依賴關系的轉換,并将它們流水線并行化(pipeline)。stage的邊界有兩種情況:一是寬依賴上的shuffle操作;二是已緩存分區,它可以縮短父rdd的計算過程。例如圖6。父rdd完成計算後,可以在stage内啟動一組任務計算丢失的分區。

RDD:基于記憶體的叢集計算容錯抽象

圖6 spark怎樣劃分任務階段(stage)的例子。實線方框表示rdd,實心矩形表示分區(黑色表示該分區被緩存)。要在rdd g上執行一個動作,排程器根據寬依賴建立一組stage,并在每個stage内部将具有窄依賴的轉換流水線化(pipeline)。 本例不用再執行stage 1,因為b已經存在于緩存中了,是以隻需要運作2和3。

排程器根據資料存放的位置配置設定任務,以最小化通信開銷。如果某個任務需要處理一個已緩存分區,則直接将任務配置設定給擁有這個分區的節點。否則,如果需要處理的分區位于多個可能的位置(例如,由hdfs的資料存放位置決定),則将任務配置設定給這一組節點。

對于寬依賴(例如需要shuffle的依賴),目前的實作方式是,在擁有父分區的節點上将中間結果物化,簡化容錯處理,這跟mapreduce中物化map輸出很像。

如果某個任務失效,隻要stage中的父rdd分區可用,則隻需在另一個節點上重新運作這個任務即可。如果某些stage不可用(例如,shuffle時某個map輸出丢失),則需要重新送出這個stage中的所有任務來計算丢失的分區。

最後,lookup動作允許使用者從一個哈希或範圍分區的rdd上,根據關鍵字讀取一個資料元素。這裡有一個設計問題。driver程式調用lookup時,隻需要使用目前排程器接口計算關鍵字所在的那個分區。當然任務也可以在叢集上調用lookup,這時可以将rdd視為一個大的分布式哈希表。這種情況下,任務和被查詢的rdd之間的并沒有明确的依賴關系(因為worker執行的是lookup),如果所有節點上都沒有相應的緩存分區,那麼任務需要告訴排程器計算哪些rdd來完成查找操作。

盡管rdd中的lineage資訊可以用來故障恢複,但對于那些lineage鍊較長的rdd來說,這種恢複可能很耗時。例如4.3小節中的pregel任務,每次疊代的頂點狀态和消息都跟前一次疊代有關,是以lineage鍊很長。如果将lineage鍊存到實體存儲中,再定期對rdd執行檢查點操作就很有效。

一般來說,lineage鍊較長、寬依賴的rdd需要采用檢查點機制。這種情況下,叢集的節點故障可能導緻每個父rdd的資料塊丢失,是以需要全部重新計算[20]。将窄依賴的rdd資料存到實體存儲中可以實作優化,例如前面4.1小節邏輯回歸的例子,将資料點和不變的頂點狀态存儲起來,就不再需要檢查點操作。

目前spark版本提供檢查點api,但由使用者決定是否需要執行檢查點操作。今後我們将實作自動檢查點,根據成本效益分析确定rdd lineage圖中的最佳檢查點位置。

值得注意的是,因為rdd是隻讀的,是以不需要任何一緻性維護(例如寫複制政策,分布式快照或者程式暫停等)帶來的開銷,背景執行檢查點操作。

我們使用10000行scala代碼實作了spark。系統可以使用任何hadoop資料源(如hdfs,hbase)作為輸入,這樣很容易與hadoop環境內建。spark以庫的形式實作,不需要修改scala編譯器。

這裡讨論關于實作的三方面問題:(1)修改scala解釋器,允許互動模式使用spark(6.1);(2)緩存管理(6.2);(3)調試工具rddbg(6.3)。

像ruby和python一樣,scala也有一個互動式shell。基于記憶體的資料可以實作低延時,我們希望允許使用者從解釋器互動式地運作spark,進而在大資料集上實作大規模并行資料挖掘。

scala解釋器通常根據将使用者輸入的代碼行,來對類進行編譯,接着裝載到jvm中,然後調用類的函數。這個類是一個包含輸入行變量或函數的單例對象,并在一個初始化函數中運作這行代碼。例如,如果使用者輸入代碼var x = 5,接着又輸入println(x),則解釋器會定義一個包含x的line1類,并将第2行編譯為println(line1.getinstance().x)。

在spark中我們對解釋器做了兩點改動:

類傳輸:解釋器能夠支援基于http傳輸類位元組碼,這樣worker節點就能擷取輸入每行代碼對應的類的位元組碼。

改進的代碼生成邏輯:通常每行上建立的單态對象通過對應類上的靜态方法進行通路。也就是說,如果要序列化一個閉包,它引用了前面代碼行中變量,比如上面的例子line1.x,java不會根據對象關系傳輸包含x的line1執行個體。是以worker節點不會收到x。我們将這種代碼生成邏輯改為直接引用各個行對象的執行個體。圖7說明了解釋器如何将使用者輸入的一組代碼行解釋為java對象。

RDD:基于記憶體的叢集計算容錯抽象
RDD:基于記憶體的叢集計算容錯抽象

圖8 首輪疊代後hadoop、hadoopbinmen、spark運作時間對比

後續疊代。圖9顯示了後續疊代的平均耗時,圖8對比了不同聚類大小條件下耗時情況,我們發現在100個節點上運作logistic回歸程式,spark比hadoop、hadoopbinmem分别快25.3、20.7倍。從圖8(b)可以看到,spark僅僅比hadoop、hadoopbinmem分别快1.9、3.2倍,這是因為k-means程式的開銷取決于計算(用更多的節點有助于提高計算速度的倍數)。

後續疊代中,hadoop仍然從hdfs讀取文本資料作為輸入,是以從首輪疊代開始hadoop的疊代時間并沒有明顯的改善。使用預先轉換的sequencefile檔案(hadoop内建的二進制檔案格式),hadoopbinmem在後續疊代中節省了解析的代價,但是仍然帶來的其他的開銷,如從hdfs讀sequencefile檔案并轉換成java對象。因為spark直接讀取緩存于rdd中的java對象,随着聚類尺寸的線性增長,疊代時間大幅下降。

RDD:基于記憶體的叢集計算容錯抽象

圖9:首輪及其後續疊代平均時間對比

了解速度提升。我們非常驚奇地發現,spark甚至勝過了基于記憶體存儲二進制資料的hadoop(hadoopbinmem),幅度高達20倍之多,hadoop運作慢是由于如下幾個原因:

hadoop軟體棧的最小開銷

讀資料時hdfs棧的開銷

将二進制記錄轉換成記憶體java對象的代價

為了估測1,我們運作空的hadoop作業,僅僅執行作業的初始化、啟動任務、清理工作就至少耗時25秒。對于2,我們發現為了服務每一個hdfs資料塊,hdfs進行了多次複制以及計算校驗和操作。

為了估測3,我們在單個節點上運作了微基準程式,在輸入的256m資料上計算logistic回歸,結果如表5所示。首先,在記憶體中的hdfs檔案和本地檔案的不同導緻通過hdfs接口讀取耗時2秒,甚至資料就在本地記憶體中。其次,文本和二進制格式輸入的不同造成了解析耗時7秒的開銷。最後,預解析的二進制檔案轉換為記憶體中的java對象,耗時3秒。每個節點處理多個塊時這些開銷都會累積起來,然而通過緩存rdd作為記憶體中的java對象,spark隻需要耗時3秒。

表5 logistic回歸疊代時間

<b>記憶體中的hdfs檔案</b><b>記憶體中的本地檔案</b><b>緩存的rdd</b>文本輸入

二進制輸入15.38 (0.26)

8.38 (0.10)13.13 (0.26)

6.86 (0.02)2.93 (0.31)

2.93 (0.31)

通過使用存儲在hdfs上的49g wikipedia導出資料,我們比較了使用rdd實作的pregel與使用hadoop計算pagerank的性能。pagerank算法通過10輪疊代處理了大約400萬文章的連結圖資料,圖10顯示了在30個節點上,spark處理速度是hadoop的2倍多,改進後對輸入進行hash分區速度提升到2.6倍,使用combiner後提升到3.6倍,這些結果資料也随着節點擴充到60個時同步放大。

RDD:基于記憶體的叢集計算容錯抽象

圖10 疊代時間對比

基于k-means算法應用程式,我們評估了在單點故障(spof)時使用lneage資訊建立rdd分區的開銷。圖11顯示了,k-means應用程式運作在75個節點的叢集中進行了10輪疊代,我們在正常操作和進行第6輪疊代開始時一個節點發生故障的情況下對耗時進行了對比。沒有任何失敗,每輪疊代啟動了400個任務處理100g資料。

RDD:基于記憶體的叢集計算容錯抽象

圖11 spof時k-means應用程式疊代時間

第5輪疊代結束時大約耗時58秒,第6輪疊代時kill掉一個節點,該節點上的任務都被終止(包括緩存的分區資料)。spark排程器排程這些任務在其他節點上重新并行運作,并且重新讀取基于lineage資訊重建的rdd輸入資料并進行緩存,這使得疊代計算耗時增加到80秒。一旦丢失的rdd分區被重建,平均疊代時間又回落到58秒。

到現在為止,我們能保證叢集中的每個節點都有足夠的記憶體去緩存疊代過程中使用的rdd,如果沒有足夠的記憶體來緩存一個作業的工作集,spark又是如何運作的呢?在實驗中,我們通過在每個節點上限制緩存rdd所需要的記憶體資源來配置spark,在不同的緩存配置條件下執行logistic回歸,結果如圖12。我們可以看出,随着緩存的減小,性能平緩地下降。

RDD:基于記憶體的叢集計算容錯抽象

圖12 spark上運作logistic回歸的性能表現

in-memory分析。視訊分發公司conviva使用spark極大地提升了為客戶處理分析報告的速度,以前基于hadoop使用大約20個hive[3]查詢來完成,這些查詢作用在相同的資料子集上(滿足使用者提供的條件),但是在不同分組的字段上執行聚合操作(sum、avg、count distinct等)需要使用單獨的mapreduce作業。該公司使用spark隻需要将相關資料加載到記憶體中一次,然後運作上述聚合操作,在hadoop叢集上處理200g壓縮資料并生成報耗時20小時,而使用spark基于96g記憶體的2個節點耗時30分鐘即可完成,速度提升40倍,主要是因為不需要再對每個作業重複地執行解壓縮和過濾操作。

城市交通模組化。在berkeley的mobile millennium項目[17]中,基于一系列分散的汽車gps監測資料,研究人員使用并行化機器學習算法來推算公路交通擁堵狀況。資料來自市區10000個互聯的公路線路網,還有600000個由汽車gps裝置采集到的樣本資料,這些資料記錄了汽車在兩個地點之間行駛的時間(每一條路線的行駛時間可能跨多個公路線路網)。使用一個交通模型,通過推算跨多個公路網行駛耗時預期,系統能夠估算擁堵狀況。研究人員使用spark實作了一個可疊代的em算法,其中包括向worker節點廣播路線網絡資訊,在e和m階段之間執行reducebykey操作,應用從20個節點擴充到80個節點(每個節點4核),如圖13(a)所示:

RDD:基于記憶體的叢集計算容錯抽象

圖13 每輪疊代運作時間(a)交通模組化應用程式(b)基于spark的社交網絡的spam分類

社交網絡spam分類。berkeley的monarch項目[31]使用spark識别twitter消息上的spam連結。他們在spark上實作了一個類似7.1小節中示例的logistic回歸分類器,不同的是使用分布式的reducebykey操作并行對梯度向量求和。圖13(b)顯示了基于50g資料子集訓練訓練分類器的結果,整個資料集是250000的url、至少10^7個與網絡相關的特征/次元,内容、詞性與通路一個url的頁面相關。随着節點的增加,這并不像交通應用程式那樣近似線性,主要是因為每輪疊代的固定通信代價較高。

為了展示spark互動式處理大資料集的能力,我們在100個m2.4xlarge ec2執行個體(8核68g記憶體)上使用spark分析1tb從2008-10到2009-4這段時間的wikipedia頁面浏覽日志資料,在整個輸入資料集上簡單地查詢如下内容以擷取頁面浏覽總數:(1)全部頁面;(2)頁面的标題能精确比對給定的關鍵詞;(3)頁面的标題能部分比對給定的關鍵詞。

RDD:基于記憶體的叢集計算容錯抽象

圖14 顯示了分别在整個、1/2、1/10的資料上查詢的響應時間,甚至1tb資料在spark上查詢僅耗時5-7秒,這比直接操作磁盤資料快幾個數量級。例如,從磁盤上查詢1tb資料耗時170秒,這表明了rdd緩存使得spark成為一個互動式資料挖掘的強大工具。

分布式共享記憶體(dsm)。rdd可以看成是一個基于dsm研究[24]得到的抽象。在2.5節我們讨論過,rdd提供了一個比dsm限制更嚴格的程式設計模型,并能在節點失效時高效地重建資料集。dsm通過檢查點[19]實作容錯,而spark使用lineage重建rdd分區,這些分區可以在不同的節點上重新并行處理,而不需要将整個程式回退到檢查點再重新運作。rdd能夠像mapreduce一樣将計算推向資料[12],并通過推測執行來解決某些任務計算進度落後的問題,推測執行在一般的dsm系統上是很難實作的。

in-memory叢集計算。piccolo[28]是一個基于可變的、in-memory的分布式表的叢集程式設計模型。因為piccolo允許讀寫表中的記錄,它具有與dsm類似的恢複機制,需要檢查點和復原,但是不能推測執行,也沒有提供類似groupby、sort等更進階别的資料流算子,使用者隻能直接讀取表單中繼資料來實作。可見,piccolo是比spark更低級别的程式設計模型,但是比dsm要進階。

ramclouds[26]适合作為web應用的存儲系統,它同樣提供了細粒度讀寫操作,是以需要通過記錄日志來實作容錯。

資料流系統。rdd借鑒了dryadlinq[34]、pig[25]和flumejava[9]的“并行收集”程式設計模型,通過允許使用者顯式地将未序列化的對象儲存在記憶體中,以此來控制分區和基于key随機查找,進而有效地支援基于工作集的應用。rdd保留了那些資料流系統更進階别的程式設計特性,這對那些開發人員來說也比較熟悉,而且,rdd也能夠支援更多類型的應用。rdd新增的擴充,從概念上看很簡單,其中spark是第一個使用了這些特性的系統,類似dryadlinq程式設計模型,能夠有效地支援基于工作集的應用。

面向基于工作集的應用,已經開發了一些專用系統,像twister[13]、haloop[8]實作了一個支援疊代的mapreduce模型;pregel[21],支援圖應用的bsp計算模型。rdd是一個更通用的抽象,它能夠描述支援疊代的mapreduce、pregel,還有現有一些系統未能處理的應用,如互動式資料挖掘。特别地,它能夠讓開發人員動态地選擇操作來運作在rdd上(如檢視查詢的結果以決定下一步運作哪個查詢),而不是提供一系列固定的步驟去執行疊代,rdd還支援更多類型的轉換。

最後,dremel[22]是一個低延遲查詢引擎,它面向基于磁盤存儲的大資料集,這類資料集是把嵌套記錄資料生成基于列的格式。這種格式的資料也能夠儲存為rdd并在spark系統中使用,但spark也具備将資料加載到記憶體來實作快速查詢的能力。

lineage。我們通過參考[6]到[10]做過調研,在科學計算和資料庫領域,對于一些應用,如需要解釋結果以及允許被重新生成、工作流中發現了bug或者資料集丢失需要重新處理資料,表示資料的lineage和原始資訊一直以來都是一個研究課題。rdd提供了一個受限的程式設計模型,在這個模型中使用細粒度的lineage來表示是非常容易的,是以它可以被用于容錯。

緩存系統。nectar[14]能夠通過識别帶有程式分析的子表達式,跨dryadlinq作業重用中間結果,如果将這種能力加入到基于rdd的系統會非常有趣。但是nectar并沒有提供in-memory緩存,也不能夠讓使用者顯式地控制應該緩存那個資料集,以及如何對其進行分區。ciel[23]同樣能夠記住任務結果,但不能提供in-memory緩存并顯式控制它。

語言疊代。dryadlinq[34]能夠使用linq擷取到表達式樹然後在叢集上運作,spark系統的語言內建與它很類似。不像dryadlinq,spark允許使用者顯式地跨查詢将rdd存儲到記憶體中,并通過控制分區來優化通信。spark支援互動式處理,但dryadlinq卻不支援。

關系資料庫。從概念上看,rdd類似于資料庫中的視圖,緩存rdd類似于物化視圖[29]。然而,資料庫像dsm系統一樣,允許典型地讀寫所有記錄,通過記錄操作和資料的日志來實作容錯,還需要花費額外的開銷來維護一緻性。rdd程式設計模型通過增加更多限制來避免這些開銷。

我們提出的rdd是一個面向,運作在普通商用機叢集之上并行資料處理應用的分布式記憶體抽象。rdd廣泛支援基于工作集的應用,包括疊代式機器學習和圖算法,還有互動式資料挖掘,然而它保留了資料流模型中引人注目的特點,如自動容錯恢複,處理執行進度落後的任務,以及感覺排程。它是通過限制程式設計模型,進而允許高效地重建rdd分區來實作的。rdd實作處理疊代式作業的速度超過hadoop大約20倍,而且還能夠互動式查詢數百g資料。

首先感謝spark使用者,包括timothy hunter、lester mackey、dilip joseph、jibin zhan和teodor moldovan,他們在真實的應用中使用spark,提出了寶貴的建議,同時也發現了一些新的研究挑戰。這次研究離不開以下組織或團體的大力支援:berkeley amp lab創立贊助者google和sap,amp lab贊助者amazon web services、cloudera、huawei、ibm、intel、microsoft、nec、netapp和vmware,國家配套資金加州micro項目(助學金 06-152,07-010),國家自然科學基金 (準許 cns-0509559),加州大學工業/大學合作研究項目 (uc discovery)授予的com07-10240,以及自然科學和加拿大工程研究理事會。

繼續閱讀