天天看點

颠覆大資料分析之Spark彈性分布式資料集

颠覆大資料分析之Spark彈性分布式資料集
颠覆大資料分析之Spark彈性分布式資料集

圖2.3  spark中進行疊代式計算的資料共享

spark的彈性分布式資料集

rdd這個概念跟我們讨論到的spark的動機有關——就是能讓使用者操作分布式系統上的scala集合。spark中的這個重要的集合就是rdd。rdd可以通過在其它rdd或者穩态存儲中的資料(比如說,hdfs中的檔案)上執行确定性操作來進行建立。建立rdd的另一種方式就是将scala集合并行化。rdd的建立也就是spark中的轉換操作。rdd上除了轉換操作,還有其它的一些操作,比如說動作(action)。像map, filter以及join這些都是常見的轉換操作。rdd有意思的一點在于它可以将自己的世系或者說建立它所需的轉換序列,以及它上面的動作給存儲起來。這意味着spark程式隻能擁有一個rdd引用——它知道自己的世系,包括它是如何建立的,上面執行過哪些操作。世系為rdd提供了容錯性——即使它丢失了,隻要世系本身被持久化或者複制了,就仍能重建整個rdd。rdd的持久化以及分塊可以由程式員來指定。比如說,你可以基于記錄的主鍵來進行分塊。

在rdd上可以執行許多操作。包括count,collect以及save,它們分别可以用來統計元素總數,傳回記錄,以及儲存到磁盤或者hdfs中。世系圖中存儲了rdd的轉換以及動作。表2.1中列舉了一系列的轉換及動作。

表2.1

轉換

描述

map(function f1)

把rdd中的每個元素并行地傳遞給f1,并傳回結果的rdd

filter(function f2)

選取出那些傳遞給函數f2并傳回true的rdd元素

flatmap(function f3)

和map類似,但f3傳回的是一個序列,它能将單個輸入映射成多個輸出。

union(rdd r1)

傳回rdd r1和自身的并集

sample(flag, p, seed)

傳回rdd的百分之p的随機采樣(使用種子seed)

動作

groupbykey(notasks)

隻能在鍵值對資料上進行調用——傳回的資料按值進行分組。并行任務的數量通過一個參數來指定(預設是8)

reducebykey(function f4,notasks)

對相同key元素上應用函數f4的結果進行聚合。第二個參數是并行的任務數

join(rdd r2, notasks)

将rdd r2和對象自身進行連接配接——計算出指定key的所有可能的組合

groupwith(rdd r3, notasks)

将rdd r3與對象自身進行連接配接,并按key進行分組

sortbykey(flag)

根據标記值将rdd自身按升序或降序來進行排序

reduce(function f5)

使用函數f5來對rdd的所有元素進行聚合

collect()

将rdd的所有元素作為一個數組來傳回

count()

計算rdd的元素總數

take(n)

擷取rdd的第n個元素

first()

等價于take(1)

saveastextfile(path)

将rdd持久化成hdfs或者其它hadoop支援的檔案系統中路徑為path的一個檔案

saveassequencefile(path)

将rdd持久化為hadoop的一個序列檔案。隻能在實作了hadoop寫接口或類似接口的鍵值對類型的rdd上進行調用。

foreach(function f6)

并行地在rdd的元素上運作函數f6

下面将通過一個例子來介紹下如何在spark環境中進行rdd的程式設計。這裡是一個呼叫資料記錄(cdr)——基于影響力分析的應用程式——通過cdr來建構使用者的關系圖,并識别出影響力最大的k個使用者。cdr結構包括id,調用方,接收方,計劃類型,呼叫類型,持續時長,時間,日期。具體做法是從hdfs中擷取cdr檔案,接着建立出rdd對象并過濾記錄,然後再在上面執行一些操作,比如說通過查詢提取出特定的字段,或者執行諸如count的聚合操作。最終寫出的spark代碼如下:

val spark = new sparkcontext();

call_record_lines = spark.textfile(“hdfs://….”);

plan_a_users = call_record_lines.filter(_.

contains(“plana”)); // rdd上的過濾操作.

plan_a_users.cache(); // 告訴spark運作時,如果仍有空間,就将這個rdd緩存到記憶體裡plan_a_users.count();

%% 呼叫資料集進行中.

rdd可以表示成一張圖,這樣跟蹤rdd在不同轉換/動作間的世系變化會簡單一些。rdd接口由五部分資訊組成,詳見表2.2。

表2.2  rdd接口

資訊

hadooprdd

filteredrdd

joinedrdd

分區類型

每個hdfs塊一個分區

和父rdd一緻

每個reduce任務一個

依賴類型

無依賴

和父rdd是一對一的依賴

在每一個父rdd上進行shuffle

基于父rdd來計算資料集的函數

讀取對應塊的資料

計算父rdd并進行過濾

讀取洗牌後的資料并進行連接配接

位置中繼資料(preferredlocations)

從命名節點中讀取hdfs塊的位置資訊

無(從父rdd中擷取)

分區中繼資料(partitioningscheme)

hashpartitioner

spark的實作

spark是由大概20000行scala代碼寫就的,核心部分大概是14000行。spark可以運作在mesos, nimbus或者yarn等叢集管理器之上。它使用的是未經修改的scala解釋器。當觸發rdd上的一個動作時,一個被稱為有向無環圖(dag)排程器的spark元件就會去檢查rdd的世系圖,同時會建立各階段的dag。每個階段内都隻會出現窄依賴,寬依賴所需的洗牌操作就是階段的邊界。排程器在dag的不同階段啟動任務來計算出缺失的分區,以便重構整個rdd對象。它将各階段的任務對象送出給任務排程器(task scheduler, ts)。任務對象是一個獨立的實體,它由代碼和轉換以及所需的中繼資料組成。排程器還負責重新送出那些輸出丢失了的階段。任務排程器使用一個被稱為延遲排程(zaharia等 2010)的排程算法來将任務配置設定給各個節點。如果rdd中有指定了優先區域的話,任務會被傳送給這些節點,否則會被配置設定到那些有分區在請求記憶體任務的節點上。對于寬依賴而言,中間記錄會在那些包含父分區的節點上生成。這樣會使得錯誤恢複變得簡單,hadoop mr中map輸出的物化也是類似的。

spark中的worker元件會負責接收任務對象并在一個線程池中調用它們的run方法。它将異常或者錯誤報告給tasksetmanager(tsm)。tsm是任務排程器管理的一個實體——每個任務集都會對應一個tsm,用于跟蹤任務的執行過程。ts是按先進先出的順序來輪詢tsm集的。通過插入不同的政策或者算法,這裡仍有一定的優化空間。執行器會與其它的元件進行互動,比如說塊管理器(bm),通信管理器(cm),map輸出跟蹤器(mot)。塊管理器是節點用于緩存rdd并接收洗牌資料的元件。它也可以看作是每個worker中隻寫一次的k-v存儲。塊管理器和通信管理器進行通信以便擷取到遠端的塊資料。通信管理器是一個異步網絡庫。mot這個元件會負責跟蹤每個map任務都在哪運作并把這些資訊傳回給歸約器——worker會緩存這個資訊。當映射器的輸出丢失了的話,會使用一個“分代id”來将這個緩存置為無效。spark中各元件的互動如圖2.4中所示。

颠覆大資料分析之Spark彈性分布式資料集

圖2.4  spark叢集中的元件

rdd的存儲可以通過下面這三種方式來完成:

作為java虛拟機中反序列化的java對象:由于對象就在jvm記憶體中,這樣做的性能會更佳。

作為記憶體中序列化的java對象:這麼表示記憶體的使用率會更高,但卻犧牲了通路速度。

存儲在磁盤上:這樣做性能最差,但是如果rdd太大以至于無法存放到記憶體中的話就隻能這麼做了。

一旦記憶體滿了,spark的記憶體管理會通過最近最少使用(lru)政策來回收rdd。然而,屬于同一個rdd的分區是無法剔除的——因為通常來說,一個程式可能會在一個大的rdd上進行計算,如果将同一個rdd中的分區剔除的話則會出現系統颠簸。

世系圖擁有足夠的資訊來重建rdd的丢失分區。然而,考慮到效率的因素(重建整個rdd可能會需要很大的計算量),檢查點仍是必需的——使用者可以自主要制哪個rdd作為檢查點。使用了寬依賴的rdd可以使用檢查點,因為在這種情況下,計算丢失的分區會需要顯著的通信及計算量。而對于隻擁有窄依賴的rdd而言,檢查點則不太适合。