
11月26日,ibm資深軟體工程師朱志輝老師,在dba+社群db2使用者群進行了一次主題為“初窺spark”的線上分享。小編特别整理出其中精華内容,供大家學習交流。同時,也非常感謝朱志輝老師對dba+社群給予的大力支援。
嘉賓簡介
ibm中國開發中心進階軟體工程師
具有多年的資料庫軟體開發設計經驗,擅長解決複雜的資料庫應用系統問題及性能優化,擁有db2多項認證和oracle ocp證書
合作出版了《db2設計,管理與性能優化藝術》《db2性能管理與實戰》
自從2007年加入ibm以來,一直從事db2相關工具的開發與研究,現在專注spark工具的初創研究
演講實錄
一、前言
spark作為現在大資料領域最火熱的技術,被稱為将會是下個十所最重要的開源技術,其基于記憶體的計算速度100倍速于hadoop mapreduce,基于磁盤的運算速度也比hadoop mapreduce要快10倍,它易于使用,spark提供了80個以上的高階操作使你很容易的使用java,scala,python,r等語言快速地建構分布式應用,你也可以scala,python和r shells進行互動式分析。作為一種通用資料處理引擎,你可以組合使用sql,流處理等技術來進行複雜的分析,它能夠運作在hadoop,mesos,standalone或者雲環境下,也能夠通路hdfs,cassandra,hbase和s3等多種存儲,下面我們從spark的誕生背景開始讨論它是如何實作這些優勢的。
二、spark誕生的背景
mapreduce計算模型的誕生,極大的加速了大資料時代的到來(如果不熟悉mapreduce概念可以參考“我是如何向老婆解釋mapreduce的?”這篇blog),在許多情況下,可以将mapreduce視為關系型資料庫管理系統的補充。兩個系統的差異如下表。
傳統關系型資料庫
mapreduce
資料大小
gb
pb
資料存取
多次讀寫
一次寫入,多次讀取
結構
靜态模式
動态模式
完整性
高
低
橫向擴充
非線性
線性的
mapreduce比較适合以批處理方式處理需要分析整個資料集的問題,随着mapreduce技術的發展,除了批處理類型的工作負載外,越來越多獨立系統被開發出處理不同的工作負載。
google就開發了pregel來處理圖形計算,dremel來處理互動式sql,也有自己流處理引擎。而開源的hadoop體系也自己對應的處理引擎,每一種工作負責的處理都是由不同的引擎來負責,這樣很難去部署,優化和管理衆多的系統,而且這些工作流之種也很組合在一起形成流水線(pipeline)。
為了消除這種複雜性,spark被設計為統一的大資料處理引擎,将批處理,互動式,疊代和流處理等有機的組合在spark中。
spark創始人認為,大多數的資料分析活動是探索性互動式的,spark為這種探索性互動方式設計了resilient distributed datasets(rdds),對具有簡單函數式程式設計接口的分布式資料集合的抽象。可以了解rdd就是分頁在不同機器上的list,當遇到錯誤的時候,這些list能夠被恢複。
lines=sc.textfile(“hdfs://....”)
points = lines.map(line => parsepoint(line))
points.filter(p => p.x>100).count()
如上面的scala代碼,rdd通常開始從hdfs檔案系統讀取文本檔案開始,lines這個rdd由字元串組成,每個元素都對應着文本檔案中的一行。調用map函數,可以将rdd轉換成包含點元素的ponits集合。可以過濾這個集合并計算符合要求的點的數量。
總而言之:spark是一個統一的大資料處理引擎,它有機的組合了批處理,互動式,流處理和圖計算處理能力,并以rdd抽象為基礎,提供了簡潔的函數式程式設計api來支援互動式的探索性資料分析處理手段。
三、spark的核心概念
1)resilient distributed dataset (rdd)彈性分布資料集
rdd彈性分布式資料集是分布式的隻讀的且已分區的集合對象,是對分布式記憶體的抽象使用,實作了以操作本地集合的方式來操作分布式資料集的抽象實作。這些集合是彈性的,如果資料集的一部分丢失,則可以對它們進行重建。具有自動容錯,位置感覺調試和可伸縮性,而容錯性最難實作的。大多數分布式資料集的容錯性有兩種方式:資料檢查點和記錄資料的更新,對于大規模資料分析系統,資料檢查點操作成本很高,主要原因是大規模資料在伺服器之間的傳輸會帶來各方面的問題,相比記錄資料的更新,rdd隻支援粗粒度的輪換,也就是記錄如何從其他rdd轉換而來,以便恢複丢失的分區。rdd必須是可序列化的。rdd可以cache到記憶體 中,每次對rdd資料集的操作之後的結果,都可以存放到記憶體中,下一個操作可以直接從記憶體中輸入,省去了mapreduce大量的磁盤io操作。這對于疊代運算比較常見的機器學習算法, 互動式資料挖掘來說,效率提升比較大。
2)rdd程式設計接口
作為spark的目标之一,spark提供了豐富的api來操作這些資料集,rdd包含2類api。
transformations——轉換操作,這類操作的傳回值還是一個rdd,常用的有map、filter、sort等,變形操作采用的是懶政策,如果隻是将轉換操作送出是不會送出任務來執行的。
spark支援的轉換操作可參考以下連結:transformations
map(func)
傳回一個新的分布式資料集,由每個原元素經過func函數轉換後組成
filter(func)
傳回一個新的資料集,由經過func函數後傳回值為true的原元素組成
flatmap(func)
類似于map,但是每一個輸入元素,會被映射為0到多個輸出元素(是以,func函數的傳回值是一個seq,而不是單一進制素)
sample(withreplacement, frac, seed)
根據給定的随機種子seed,随機抽樣出數量為frac的資料
union(otherdataset)
傳回一個新的資料集,由原資料集和參數聯合而成
groupbykey([numtasks])
在一個由(k,v)對組成的資料集上調用,傳回一個(k,seq[v])對的資料集。注意:預設情況下,使用8個并行任務進行分組,你可以傳入numtask可選參數,根據資料量設定不同數目的task
(groupbykey和filter結合,可以實作類似hadoop中的reduce功能)
reducebykey(func, [numtasks])
在一個(k,v)對的資料集上使用,傳回一個(k,v)對的資料集,key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的
join(otherdataset, [numtasks])
在類型為(k,v)和(k,w)類型的資料集上調用,傳回一個(k,(v,w))對,每個key中的所有元素都在一起的資料集
groupwith(otherdataset, [numtasks])
在類型為(k,v)和(k,w)類型的資料集上調用,傳回一個資料集,組成元素為(k, seq[v], seq[w]) tuples。這個操作在其它架構,稱為cogroup
cartesian(otherdataset)
笛卡爾積。但在資料集t和u上調用時,傳回一個(t,u)對的資料集,所有元素互動進行笛卡爾積
sortbykey([ascendingorder])
在類型為( k, v )的資料集上調用,傳回以k為鍵進行排序的(k,v)對資料集。升序或者降序由boolean型的ascendingorder參數決定
(類似于hadoop的map-reduce中間階段的sort,按key進行排序)
actions——動作操作,這類操作或者傳回結果,或者将rdd存儲起來,如count,save等等,當動作操作送出時,任務執行立即被觸發。
spark支援的動作操作可參考以下連結:actions
reduce(func)
通過函數func聚集資料集中的所有元素。func函數接受2個參數,傳回一個值。這個函數必須是關聯性的,確定可以被正确的并發執行
collect()
在driver的程式中,以數組的形式,傳回資料集的所有元素。這通常會在使用filter或者其它操作後,傳回一個足夠小的資料子集再使用,直接将整個rdd集collect傳回,很可能會讓driver程式oom
count()
傳回資料集的元素個數
take(n)
傳回一個數組,由資料集的前n個元素組成。注意,這個操作目前并非在多個節點上,并行執行,而是driver程式所在機器,單機計算所有的元素
(gateway的記憶體壓力會增大,需要謹慎使用)
first()
傳回資料集的第一個元素(類似于take(1))
saveastextfile(path)
将資料集的元素,以textfile的形式,儲存到本地檔案系統,hdfs或者任何其它hadoop支援的檔案系統。spark将會調用每個元素的tostring方法,并将它轉換為檔案中的一行文本
saveassequencefile(path)
将資料集的元素,以sequencefile的格式,儲存到指定的目錄下,本地系統,hdfs或者任何其它hadoop支援的檔案系統。rdd的元素必須 由key-value對組成,并都實作了hadoop的writable接口,或隐式可以轉換為writable(spark包括了基本類型的轉換,例如 int,double,string等等)
foreach(func)
在資料集的每一個元素上,運作函數func。這通常用于更新一個累加器變量,或者和外部存儲系統做互動
3)rdd依賴關系
如上圖,rdd之間有兩種依賴:
窄依賴(narrow dependency)——一個父rdd最多被一個子rdd引用,如map,filter,union等等。
寬依賴(wide dependencies)——一個父rdd被多個子rdd引用,如groupbykey。
4)stage dag
如圖spark送出job之後會把job生成多個stage,多個stage之間是有依賴的,如上面stage3就依賴于stage1和stage2,stage之間的依賴關系就構成了dag(有向無環圖)。spark通常會将窄依賴的rdd轉換放在同一個stage中,而對于寬依賴,通常會發生shuffle操作,spark通常将shuffle操作定義為stage的邊界。
5)spark運作模式
在了解rdd和dag的基礎下,我們來看看,spark是如果對資源管理與作業排程來完成實際的分析任務。
spark支援如下運作模式:
local模式:單機運作環境,通常用于測試開發
僞分布模式:在單機環境下,模仿分布叢集運作模式,也用于測試開發
叢集模式:spark支援多種叢集管理器
1.standalone模式,對于資源管理與作業排程由spark叢集來完成。
在這種模式下,driver負責建立sparkcontext來為應用的運作準備運作環境,sparkcontext負責與叢集管理器通信,進行資源的申請,任務的配置設定和監控等,在worker節點上運作的executor工作程序在完成配置設定的任務後,driver同時負責将sparkcontext關閉。通常用sparkcontext代表driver。
2. spark on yarn模式
spark應用的資源與排程由yarn來管理 ,spark on yarn遵循yarn的官方規範實作,得益于spark天生支援多種scheduler和executor的良好設計,對yarn的支援也就非常容 易,spark on yarn的大緻架構圖。
3.spark也可運作在mesos叢集管理器上
四、spark的元件
1)spark sql
spark sql是spark用來處理結構化資料的子產品,它提供了叫做dataframes的程式設計抽象,也叫做分布式sql查詢引擎,這個概念在spark1.5中引入。一個dataframe就是一個由命名列組織的分布式資料集。它在概念上等同關系型資料庫中表或者r/python中的資料框。dataframes能夠由多種方式建構,例如:結構化資料檔案,hive中的表,外部資料庫或者已有rdd。
dataframe api支援scala,java,python和r語言,下面是使用語言scala操作dataframe的簡單執行個體。
2)spark streaming
spark流程處理子產品擴充了spark的核心api來支援,可伸縮,高吞吐量,可容錯的實時的流資料處理,資料可以從不同的來源注入,例如kafka,flume,twitter,zeromq,kinesis或者tcp sockets. 可以對這些實時資料運用進階函數例如map,reduce,join結合視窗機制作一些複雜的數理運算,最終的處理結果可以推送到檔案系統,資料庫,實時儀表盤顯示,也可使用spark機器學習算法或者圖計算處理引擎來處理這些實時資料。
spark的内部工作機制如下,spark stream接收到實時資料流,按照一定時間将資料流分隔成批,然後由spark引擎依次處理這些批量資料來擷取最終的結果。
spark streaming提供的高階抽象叫做“離散流”或者dstream,它代表一個持續的資料流。dstreams能夠從kafka,flumea和kinesis資料源建立或者從其它dstreams轉換而來。在内部,dstream代表序列化rdds。
流計算技術通常用來處理實時業務分析,例如實時日志收集和預警系統等等。
3)machine learning library
mllib是spark的機器學習庫,其目标使實際的機器學習運算簡單和可伸縮。它由一些通用的學習算法和輔助類組成,包括分類、回歸、聚類、協同過濾、降維等和一些底層的優化手段和api。
4)graphx
graphx是 spark中用于圖(e.g., web-graphs and social networks)和圖并行計算(e.g., pagerank and collaborative filtering)的api,可以認為是graphlab(c++)和pregel(c++)在spark(scala)上的重寫及優化,跟其他分布式 圖計算架構相比,graphx最大的貢獻是,在spark之上提供一棧式資料解決方案,可以友善且高效地完成圖計算的一整套流水作業。
graphx能過引入graph抽象來擴充spark rdd:由附有屬性的點和邊組成的有向多邊形。graphx提供了一些基本的圖計算操作集(如:subgraph,joinvertices和aggregatemessages)和一個經過優化的pregel api變體。graphx所包含的圖形算法還在持續的增加和建構以使對于圖分析任務更簡單。
圖算法通常用來處理最短路徑,社交網絡分析等等,比較有名的圖算法就是google計算網頁連結權重的pagerank。
五、結束語
spark基于rdd的抽象,結合dag,延遲計算等技術,盡可能充分利用記憶體疊代避免低效的磁盤讀寫,來提高運算速度,并以這個抽象為基礎,将批處理,機器學習,流處理和圖計算能多種工作負載有機的統一到其運算平台中,并且能夠組合這些技術來進行複雜的分析任務,例如将流處理獲得資料實時的分發的機器學習子產品進行實時預測。其提供的高階操作接口和各種便利的計算庫,使得開發能夠專注于自己的業務,使用它們快速的開發自己的應用。
通過上面的介紹,相信讀者已經了解spark的基本的了解,如果想進一步的學習spark的知識,可以通路big data university,其中包含許多免費的mooc、ibm也提供了spark服務平台來免費試用。
<b></b>
<b>本文來自雲栖社群合作夥伴"dbaplus",原文釋出時間:2015-11-28</b>