天天看點

深入了解Spark:核心思想與源碼分析

深入了解Spark:核心思想與源碼分析

大資料技術叢書

深入了解spark:核心思想與源碼分析

耿嘉安 著

圖書在版編目(cip)資料

深入了解spark:核心思想與源碼分析/耿嘉安著. —北京:機械工業出版社,2015.12

(大資料技術叢書)

isbn 978-7-111-52234-8

i. 深… ii.耿… iii.資料處理軟體 iv. tp274

中國版本圖書館cip資料核字(2015)第280808号

深入了解spark:核心思想與源碼分析

出版發行:機械工業出版社(北京市西城區百萬莊大街22号 郵政編碼:100037)

責任編輯:高婧雅 責任校對:董紀麗

印  刷: 版  次:2016年1月第1版第1次印刷

開  本:186mm×240mm 1/16 印  張:30.25

書  号:isbn 978-7-111-52234-8 定  價:99.00元

凡購本書,如有缺頁、倒頁、脫頁,由本社發行部調換

客服熱線:(010)88379426 88361066 投稿熱線:(010)88379604

購書熱線:(010)68326294 88379649 68995259 讀者信箱:[email protected]

版權所有·侵權必究

封底無防僞标均為盜版

本書法律顧問:北京大成律師事務所 韓光/鄒曉東 

preface  前言

為什麼寫這本書

要回答這個問題,需要從我個人的經曆說起。說來慚愧,我第一次接觸計算機是在高三。當時跟大家一起去網吧玩cs,跟身邊的同學學怎麼“玩”。正是通過這種“玩”的過程,讓我了解到計算機并沒有那麼神秘,它也隻是台機器,用起來似乎并不比打開電視機費勁多少。聯考填志願的時候,憑着直覺“糊裡糊塗”就選擇了計算機專業。等到真正學習計算機課程的時候卻又發現,它其實很難!

早在2004年,還在學校的我跟很多同學一樣,喜歡看flash,也喜歡談論flash甚至做flash。感覺flash正如它的名字那樣“閃光”。那些年,在學校裡,知道flash的人可要比知道java的人多得多,這說明當時的flash十分火熱。此外,oracle也成為關系型資料庫裡的領軍人物,很多人甚至覺得懂oracle要比懂flash、java及其他資料庫要厲害得多!

2007年,我剛剛參加工作不久。那時struts1、spring、hibernate幾乎可以稱為那些用java作為開發語言的軟體公司的三駕馬車。很快,struts2替代了struts1的地位,讓我第一次意識到it領域的技術更新竟然如此之快!随着很多傳統軟體公司向網際網路公司轉型,hibernate也難以確定其地位,ibatis誕生了!

2010年,有關hadoop的技術圖書湧入中國,當時很多公司用它隻是為了資料統計、資料挖掘或者搜尋。一開始,人們對于hadoop的認識和使用可能相對有限。大約2011年的時候,關于雲計算的概念在網上炒得火熱,當時依然在做網際網路開發的我,對其隻是“道聽途說”。後來跟同僚借了一本有關雲計算的書,回家挑着看了一些内容,也沒什麼收獲,怅然若失!20世紀60年代,美國的軍用網絡作為網際網路的雛形,很多内容已經與雲計算中的某些說法類似。到20世紀80年代,網際網路就已經啟用了雲計算,如今為什麼又要重提這樣的概念?這個問題我可能回答不了,還是交給曆史吧。

2012年,國内又呈現出大資料熱的态勢。從國家到媒體、教育、it等幾乎所有領域,人人都在談大資料。我的親戚朋友中,無論老師、銷售人員,還是工程師們都可以針對大資料談談自己的看法。我也找來一些hadoop的書籍進行學習,希望能在其中探索到大資料的奧妙。

有幸在工作過程中接觸到阿裡的開放資料處理服務(open data processing service,odps),并且基于odps與其他小夥伴一起建構阿裡的大資料商業解決方案—禦膳房。去杭州出差的過程中,有幸認識和仲,跟他學習了阿裡的實時多元分析平台—garuda和實時計算平台—galaxy的部分知識。和仲推薦我閱讀spark的源碼,這樣會對實時計算及流式計算有更深入的了解。2015年春節期間,自己初次上網查閱spark的相關資料學習,開始研究spark源碼。還記得那時隻是出于對大資料的熱愛,想使自己在這方面的技術能力有所提升。

從閱讀hibernate源碼開始,到後來閱讀tomcat、spring的源碼,我也在從學習源碼的過程中成長,我對源碼閱讀也越來越感興趣。随着對spark源碼閱讀的深入,發現很多内容從網上找不到答案,隻能自己“硬啃”了。随着自己的積累越來越多,突然有一天發現,我所總結的這些内容好像可以寫成一本書了!從閃光(flash)到火花(spark),足足有11個年頭了。無論是flash、java,還是spring、ibatis,我一直扮演着一個追随者,我接受這些書籍的洗禮,從未給予。如今我也是spark的追随者,不同的是,我不再隻想簡單攫取,還要給予。

最後還想說一下,2016年是我從事it工作的第10個年頭,此書特别作為送給自己的10周年禮物。

本書特色

按照源碼分析的習慣設計,從腳本分析到初始化再到核心内容,最後介紹spark的擴充内容。整個過程遵循由淺入深、由深到廣的基本思路。

本書涉及的所有内容都有相應的例子,以便于讀者對源碼的深入研究。

本書盡可能用圖來展示原理,加速讀者對内容的掌握。

本書講解的很多實作及原理都值得借鑒,能幫助讀者提升架構設計、程式設計等方面的能力。

本書盡可能保留較多的源碼,以便于初學者能夠在像地鐵、公交這樣的地方,也能輕松閱讀。

讀者對象

源碼閱讀是一項苦差事,人力和時間成本都很高,尤其是對于spark陌生或者剛剛開始學習的人來說,難度可想而知。本書盡可能保留源碼,使得分析過程不至于産生跳躍感,目的是降低大多數人的學習門檻。如果你是從事it工作1~3年的新人或者是希望學習spark核心知識的人,本書非常适合你。如果你已經對spark有所了解或者已經在使用它,還想進一步提高自己,那麼本書更适合你。

如果你是一個開發新手,對java、linux等基礎知識不是很了解,那麼本書可能不太适合你。如果你已經對spark有深入的研究,本書也許可以作為你的參考資料。

總體說來,本書适合以下人群:

想要使用spark,但對spark實作原理不了解,不知道怎麼學習的人;

大資料技術愛好者,以及想深入了解spark技術内部實作細節的人;

有一定spark使用基礎,但是不了解spark技術内部實作細節的人;

對性能優化和部署方案感興趣的大型網際網路工程師和架構師;

開源代碼愛好者。喜歡研究源碼的同學可以從本書學到一些閱讀源碼的方式與方法。

本書不會教你如何開發spark應用程式,隻是用一些經典例子示範。本書簡單介紹hadoop mapreduce、hadoop yarn、mesos、tachyon、zookeeper、hdfs、amazon s3,但不會過多介紹這些架構的使用,因為市場上已經有豐富的這類書籍供讀者挑選。本書也不會過多介紹scala、java、shell的文法,讀者可以在市場上選擇适合自己的書籍閱讀。

如何閱讀本書

本書分為三大部分(不包括附錄):

準備篇(第1~2章),簡單介紹了spark的環境搭建和基本原理,幫助讀者了解一些背景知識。

核心設計篇(第3~7章),着重講解sparkcontext的初始化、存儲體系、任務送出與執行、計算引擎及部署模式的原理和源碼分析。

擴充篇(第8~11章),主要講解基于spark核心的各種擴充及應用,包括:sql處理引擎、hive處理、流式計算架構spark streaming、圖計算架構graphx、機器學習庫mllib等内容。

本書最後還添加了幾個附錄,包括:附錄a介紹的spark中最常用的工具類utils;附錄b是akka的簡介與工具類akkautils的介紹;附錄c為jetty的簡介和工具類jettyutils的介紹;附錄d為metrics庫的簡介和測量容器metricregistry的介紹;附錄e示範了hadoop1.0版本中的word count例子;附錄f介紹了工具類commandutils的常用方法;附錄g是關于netty的簡介和工具類nettyutils的介紹;附錄h列舉了筆者編譯spark源碼時遇到的問題及解決辦法。

為了降低讀者閱讀了解spark源碼的門檻,本書盡可能保留源碼實作,希望讀者能夠懷着一顆好奇的心,spark目前很火熱,其版本更新也很快,本書以spark 1.2.3版本為主,有興趣的讀者也可按照本書的方式,閱讀spark的最新源碼。

勘誤和支援

本書内容很多,限于筆者水準有限,書中内容難免有錯誤之處。在本書出版後的任何時間,如果你對本書有任何問題或者意見,都可以通過郵箱[email protected]或部落格http://www.cnblogs.com/jiaan-geng/聯系我,說出你的建議或者想法,希望與大家共同進步。

緻謝

感謝蒼天,讓我生活在這樣一個時代,能接觸網際網路和大資料;感謝父母,這麼多年來,在學習、工作及生活上的幫助與支援;感謝妻子在生活中的照顧和謙讓。

感謝楊福川和高婧雅給予本書出版的大力支援與幫助。

感謝冰夷老大和王贲老大讓我有幸加入阿裡,接觸大資料應用;感謝和仲對galaxy和garuda耐心細緻的講解以及對spark的推薦;感謝張中在百忙之中給本書寫評語;感謝周亮、澄蒼、民瞻、石申、清無、少俠、征宇、三步、謝衣、曉五、法星、曦軒、九翎、峰閱、丁卯、阿末、紫丞、海炎、涵康、雲飏、孟天、零一、六仙、大知、井凡、隆君、太奇、晨炫、既望、寶升、都靈、鬼厲、歸鐘、梓撤、昊蒼、水村、惜冰、惜陌、元乾等同仁在工作上的支援和幫助。

耿嘉安 于北京

 

contents  目錄

前言

準 備 篇

第1章 環境準備2

1.1 運作環境準備2

1.1.1 安裝jdk3

1.1.2 安裝scala3

1.1.3 安裝spark4

1.2 spark初體驗4

1.2.1 運作spark-shell4

1.2.2 執行word count5

1.2.3 剖析spark-shell7

1.3 閱讀環境準備11

1.4 spark源碼編譯與調試13

1.5 小結17

第2章 spark設計理念與基本架構18

2.1 初識spark18

2.1.1 hadoop mrv1的局限18

2.1.2 spark使用場景20

2.1.3 spark的特點20

2.2 spark基礎知識20

2.3 spark基本設計思想22

2.3.1 spark子產品設計22

2.3.2 spark模型設計24

2.4 spark基本架構25

2.5 小結26

核心設計篇

第3章 sparkcontext的初始化28

3.1 sparkcontext概述28

3.2 建立執行環境sparkenv30

3.2.1 安全管理器securitymanager31

3.2.2 基于akka的分布式消息系統actorsystem31

3.2.3 map任務輸出跟蹤器mapoutputtracker32

3.2.4 執行個體化shufflemanager34

3.2.5 shuffle線程記憶體管理器shufflememorymanager34

3.2.6 塊傳輸服務blocktransferservice35

3.2.7 blockmanagermaster介紹35

3.2.8 建立塊管理器blockmanager36

3.2.9 建立廣播管理器broadcast-manager36

3.2.10 建立緩存管理器cachemanager37

3.2.11 http檔案伺服器httpfile-server37

3.2.12 建立測量系統metricssystem39

3.2.13 建立sparkenv40

3.3 建立metadatacleaner41

3.4 sparkui詳解42

3.4.1 listenerbus詳解43

3.4.2 構造jobprogresslistener46

3.4.3 sparkui的建立與初始化47

3.4.4 spark ui的頁面布局與展示49

3.4.5 sparkui的啟動54

3.5 hadoop相關配置及executor環境變量54

3.5.1 hadoop相關配置資訊54

3.5.2 executor環境變量54

3.6 建立任務排程器taskscheduler55

3.6.1 建立taskschedulerimpl55

3.6.2 taskschedulerimpl的初始化57

3.7 建立和啟動dagscheduler57

3.8 taskscheduler的啟動60

3.8.1 建立localactor60

3.8.2 executorsource的建立與注冊62

3.8.3 executoractor的建構與注冊64

3.8.4 spark自身classloader的建立64

3.8.5 啟動executor的心跳線程66

3.9 啟動測量系統metricssystem69

3.9.1 注冊sources70

3.9.2 注冊sinks70

3.9.3 給sinks增加jetty的servlet-contexthandler71

3.10 建立和啟動executorallocation-manager72

3.11 contextcleaner的建立與啟動73

3.12 spark環境更新74

3.13 建立dagschedulersource和blockmanagersource76

3.14 将sparkcontext标記為激活77

3.15 小結78

第4章 存儲體系79

4.1 存儲體系概述79

4.1.1 塊管理器blockmanager的實作79

4.1.2 spark存儲體系架構81

4.2 shuffle服務與用戶端83

4.2.1 block的rpc服務84

4.2.2 構造傳輸上下文transpor-tcontext85

4.2.3 rpc用戶端工廠transport-clientfactory86

4.2.4 netty伺服器transportserver87

4.2.5 擷取遠端shuffle檔案88

4.2.6 上傳shuffle檔案89

4.3 blockmanagermaster對block-manager的管理90

4.3.1 blockmanagermasteractor90

4.3.2 詢問driver并擷取回複方法92

4.3.3 向blockmanagermaster注冊blockmanagerid93

4.4 磁盤塊管理器diskblockmanager94

4.4.1 diskblockmanager的構造過程94

4.4.2 擷取磁盤檔案方法getfile96

4.4.3 建立臨時block方法create-tempshuffleblock96

4.5 磁盤存儲diskstore97

4.5.1 nio讀取方法getbytes97

4.5.2 nio寫入方法putbytes98

4.5.3 數組寫入方法putarray98

4.5.4 iterator寫入方法putiterator98

4.6 記憶體存儲memorystore99

4.6.1 資料存儲方法putbytes101

4.6.2 iterator寫入方法putiterator詳解101

4.6.3 安全展開方法unrollsafely102

4.6.4 确認空閑記憶體方法ensurefreespace105

4.6.5 記憶體寫入方法putarray107

4.6.6 嘗試寫入記憶體方法trytoput108

4.6.7 擷取記憶體資料方法getbytes109

4.6.8 擷取資料方法getvalues110

4.7 tachyon存儲tachyonstore110

4.7.1 tachyon簡介111

4.7.2 tachyonstore的使用112

4.7.3 寫入tachyon記憶體的方法putintotachyonstore113

4.7.4 擷取序列化資料方法getbytes113

4.8 塊管理器blockmanager114

4.8.1 移出記憶體方法dropfrom-memory114

4.8.2 狀态報告方法reportblockstatus116

4.8.3 單對象塊寫入方法putsingle117

4.8.4 序列化位元組塊寫入方法putbytes118

4.8.5 資料寫入方法doput118

4.8.6 資料塊備份方法replicate121

4.8.7 建立diskblockobjectwriter的方法getdiskwriter125

4.8.8 擷取本地block資料方法getblockdata125

4.8.9 擷取本地shuffle資料方法dogetlocal126

4.8.10 擷取遠端block資料方法dogetremote127

4.8.11 擷取block資料方法get128

4.8.12 資料流序列化方法dataserializestream129

4.9 metadatacleaner和broadcastcleaner129

4.10 緩存管理器cachemanager130

4.11 壓縮算法133

4.12 磁盤寫入實作diskblockobjectwriter133

4.13 塊索引shuffle管理器indexshuffleblockmanager135

4.14 shuffle記憶體管理器shufflememorymanager137

4.15 小結138

第5章 任務送出與執行139

5.1 任務概述139

5.2 廣播hadoop的配置資訊142

5.3 rdd轉換及dag建構144

5.3.1 為什麼需要rdd144

5.3.2 rdd實作分析146

5.4 任務送出152

5.4.1 任務送出的準備152

5.4.2 finalstage的建立與stage的劃分157

5.4.3 建立job163

5.4.4 送出stage164

5.4.5 送出task165

5.5 執行任務176

5.5.1 狀态更新176

5.5.2 任務還原177

5.5.3 任務運作178

5.6 任務執行後續處理179

5.6.1 計量統計與執行結果序列化179

5.6.2 記憶體回收180

5.6.3 執行結果處理181

5.7 小結187

第6章 計算引擎188

6.1 疊代計算188

6.2 什麼是shuffle192

6.3 map端計算結果緩存處理194

6.3.1 map端計算結果緩存聚合195

6.3.2 map端計算結果簡單緩存200

6.3.3 容量限制201

6.4 map端計算結果持久化204

6.4.1 溢出分區檔案205

6.4.2排序與分區分組207

6.4.3 分區索引檔案209

6.5 reduce端讀取中間計算結果210

6.5.1 擷取map任務狀态213

6.5.2 劃分本地與遠端block215

6.5.3 擷取遠端block217

6.5.4 擷取本地block218

6.6 reduce端計算219

6.6.1 如何同時處理多個map任務的中間結果219

6.6.2 reduce端在緩存中對中間計算結果執行聚合和排序220

6.7 map端與reduce端組合分析221

6.7.1 在map端溢出分區檔案,在reduce端合并組合221

6.7.2 在map端簡單緩存、排序分組,在reduce端合并組合222

6.7.3 在map端緩存中聚合、排序分組,在reduce端組合222

6.8 小結223

第7章 部署模式224

7.1 local部署模式225

7.2 local-cluster部署模式225

7.2.1 localsparkcluster的啟動226

7.2.2 coarsegrainedschedulerbackend的啟動236

7.2.3 啟動appclient237

7.2.4 資源排程242

7.2.5 local-cluster模式的任務執行253

7.3 standalone部署模式255

7.3.1 啟動standalone模式255

7.3.2 啟動master分析257

7.3.3 啟動worker分析259

7.3.4 啟動driver application分析261

7.3.5 standalone模式的任務執行263

7.3.6 資源回收263

7.4 容錯機制266

7.4.1 executor異常退出266

7.4.2 worker異常退出268

7.4.3 master異常退出269

7.5 其他部署方案276

7.5.1 yarn277

7.5.2 mesos280

7.6 小結282

擴 展 篇

第8章 spark sql284

8.1 spark sql總體設計284

8.1.1 傳統關系型資料庫sql運作原理285

8.1.2 spark sql運作架構286

8.2 字典表catalog288

8.3 tree和treenode289

8.4 詞法解析器parser的設計與實作293

8.4.1 sql語句解析的入口294

8.4.2 建表語句解析器ddlparser295

8.4.3 sql語句解析器sqlparser296

8.4.4 spark代了解析器sparksqlparser299

8.5 rule和ruleexecutor300

8.6 analyzer與optimizer的設計與實作302

8.6.1 文法分析器analyzer304

8.6.2 優化器optimizer305

8.7 生成實體執行計劃306

8.8 執行實體執行計劃308

8.9 hive311

8.9.1 hive sql文法解析器311

8.9.2 hive sql中繼資料分析313

8.9.3 hive sql實體執行計劃314

8.10 應用舉例:javasparksql314

8.11 小結320

第9章 流式計算321

9.1 spark streaming總體設計321

9.2 streamingcontext初始化323

9.3 輸入流接收器規範receiver324

9.4 資料流抽象dstream325

9.4.1 dstream的離散化326

9.4.2 資料源輸入流inputdstream327

9.4.3 dstream轉換及建構dstream graph329

9.5 流式計算執行過程分析330

9.5.1 流式計算例子customreceiver331

9.5.2 spark streaming執行環境建構335

9.5.3 任務生成過程347

9.6 視窗操作355

9.7 應用舉例357

9.7.1 安裝mosquitto358

9.7.2 啟動mosquitto358

9.7.3 mqttwordcount359

9.8 小結361

第10章 圖計算362

10.1 spark graphx總體設計362

10.1.1 圖計算模型363

10.1.2 屬性圖365

10.1.3 graphx的類繼承體系367

10.2 圖操作368

10.2.1 屬性操作368

10.2.2 結構操作368

10.2.3 連接配接操作369

10.2.4 聚合操作370

10.3 pregel api371

10.3.1 dijkstra算法373

10.3.2 dijkstra的實作376

10.4 graph的建構377

10.4.1 從邊的清單加載graph377

10.4.2 在graph中建立圖的方法377

10.5 頂點集合抽象vertexrdd378

10.6 邊集合抽象edgerdd379

10.7 圖分割380

10.8 常用算法382

10.8.1 網頁排名382

10.8.2 connected components的應用386

10.8.3 三角關系統計388

10.9 應用舉例390

10.10 小結391

第11章 機器學習392

11.1機器學習概論392

11.2 spark mllib總體設計394

11.3 資料類型394

11.3.1 局部向量394

11.3.2标記點395

11.3.3局部矩陣396

11.3.4分布式矩陣396

11.4基礎統計398

11.4.1摘要統計398

11.4.2相關統計399

11.4.3分層抽樣401

11.4.4假設檢驗401

11.4.5随機數生成402

11.5分類和回歸405

11.5.1數學公式405

11.5.2線性回歸407

11.5.3分類407

11.5.4回歸410

11.6決策樹411

11.6.1基本算法411

11.6.2使用例子412

11.7随機森林413

11.7.1基本算法414

11.7.2使用例子414

11.8梯度提升決策樹415

11.8.1基本算法415

11.8.2使用例子416

11.9樸素貝葉斯416

11.9.1算法原理416

11.9.2使用例子418

11.10保序回歸418

11.10.1算法原理418

11.10.2使用例子419

11.11協同過濾419

11.12聚類420

11.12.1k-means420

11.12.2高斯混合422

11.12.3快速疊代聚類422

11.12.4latent dirichlet allocation422

11.12.5流式k-means423

11.13維數減縮424

11.13.1奇異值分解424

11.13.2主成分分析425

11.14特征提取與轉型425

11.14.1術語頻率反轉425

11.14.2單詞向量轉換426

11.14.3标準尺度427

11.14.4正規化尺度428

11.14.5卡方特征選擇器428

11.14.6hadamard積429

11.15頻繁模式挖掘429

11.16預言模型标記語言430

11.17管道431

11.17.1管道工作原理432

11.17.2管道api介紹433

11.17.3交叉驗證435

11.18小結436

附錄a utils437

附錄b akka446

附錄c jetty450

附錄d metrics453

附錄e hadoop word count456

附錄f commandutils458

附錄g netty461

附錄h 源碼編譯錯誤465

1章 環境準備

第2章 spark設計理念與基本架構

第1章

環 境 準 備

凡事豫則立,不豫則廢;言前定,則不跲;事前定,則不困。

—《禮記·中庸》

本章導讀

在深入了解一個系統的原理、實作細節之前,應當先準備好它的源碼編譯環境、運作環境。如果能在實際環境安裝和運作spark,顯然能夠提升讀者對于spark的一些感受,對系統能有個大體的印象,有經驗的技術人員甚至能夠猜出一些spark采用的程式設計模型、部署模式等。當你通過一些途徑知道了系統的原理之後,難道不會問問自己:“這是怎麼做到的?”如果隻是遊走于系統使用、原理了解的層面,是永遠不可能真正了解整個系統的。很多ide本身帶有調試的功能,每當你閱讀源碼,陷入重圍時,調試能讓我們更加了解運作期的系統。如果沒有調試功能,不敢想象閱讀源碼會怎樣困難。

本章的主要目的是幫助讀者建構源碼學習環境,主要包括以下内容:

在windows環境下搭建源碼閱讀環境;

在linux環境下搭建基本的執行環境;

spark的基本使用,如spark-shell。

1.1 運作環境準備

考慮到大部分公司的開發和生成環境都采用linux作業系統,是以筆者選用了64位的linux。在正式安裝spark之前,先要找台好機器。為什麼?因為筆者在安裝、編譯、調試的過程中發現spark非常耗費記憶體,如果機器配置太低,恐怕會跑不起來。spark的開發語言是scala,而scala需要運作在jvm之上,因而搭建spark的運作環境應該包括jdk和scala。

1.1.1 安裝jdk

使用指令getconf long_bit檢視linux機器是32位還是64位,然後下載下傳相應版本的jdk并安裝。

下載下傳位址:

http://www.oracle.com/technetwork/java/javase/downloads/index.html

配置環境:

cd ~

vim .bash_profile 

添加如下配置:

export java_home=/opt/java

export path=$path:$java_home/bin

export classpath=.:$java_home/lib/dt.jar:$java_home/lib/tools.jar

由于筆者的機器上已經安裝過openjdk,是以未使用以上方式,openjdk的安裝指令如下:

$ su -c "yum install java-1.7.0-openjdk"

安裝完畢後,使用java –version指令檢視,确認安裝正常,如圖1-1所示。

圖1-1 檢視安裝是否正常

1.1.2 安裝scala

下載下傳位址:http://www.scala-lang.org/download/

選擇最新的scala版本下載下傳,下載下傳方法如下:

wget http://downloads.typesafe.com/scala/2.11.5/scala-2.11.5.tgz

移動到選好的安裝目錄,例如:

mv scala-2.11.5.tgz ~/install/

進入安裝目錄,執行以下指令:

chmod 755 scala-2.11.5.tgz

tar -xzvf scala-2.11.5.tgz 

export scala_home=$home/install/scala-2.11.5

export path=$path:$scala_home/bin:$home/bin

安裝完畢後輸入scala,進入scala指令行說明scala安裝正确,如圖1-2所示。

圖1-2 進入scala指令行

1.1.3 安裝spark

下載下傳位址:http://spark.apache.org/downloads.html

選擇最新的spark版本下載下傳,下載下傳方法如下:

wget http://archive.apache.org/dist/spark/spark-1.2.0/spark-1.2.0-bin-hadoop1.tgz

移動到選好的安裝目錄,如:

mv spark-1.2.0-bin-hadoop1.tgz~/install/

chmod 755 spark-1.2.0-bin-hadoop1.tgz

tar -xzvf spark-1.2.0-bin-hadoop1.tgz

export spark_home=$home/install/spark-1.2.0-bin-hadoop1

1.2 spark初體驗

本節通過spark的基本使用,讓讀者對spark能有初步的認識,便于引導讀者逐漸深入學習。

1.2.1 運作spark-shell

要運作spark-shell,需要先對spark進行配置。

1)進入spark的conf檔案夾:

cd ~/install/spark-1.2.0-bin-hadoop1/conf

2)複制一份spark-env.sh.template,命名為spark-env.sh,對它進行編輯,指令如下:

cp spark-env.sh.template spark-env.sh

vim spark-env.sh

3)添加如下配置:

export spark_master_ip=127.0.0.1

export spark_local_ip=127.0.0.1

4)啟動spark-shell:

cd ~/install/spark-1.2.0-bin-hadoop1/bin

./spark-shell

最後我們會看到spark啟動的過程,如圖1-3所示。

圖1-3 spark啟動過程

從以上啟動日志中我們可以看到sparkenv、mapoutputtracker、blockmanagermaster、diskblockmanager、memorystore、httpfileserver、sparkui等資訊。它們是做什麼的?此處望文生義即可,具體内容将在後邊的章節詳細講解。

1.2.2 執行word count

這一節,我們通過word count這個耳熟能詳的例子來感受下spark任務的執行過程。啟動spark-shell後,會打開scala指令行,然後按照以下步驟輸入腳本。

1)輸入val lines = sc.textfile("../readme.md", 2),執行結果如圖1-4所示。

圖1-4 步驟1執行結果

2)輸入val words = lines.flatmap(line => line.split(" ")),執行結果如圖1-5所示。

圖1-5 步驟2執行結果

3)輸入val ones = words.map(w => (w,1)),執行結果如圖1-6所示。

圖1-6 步驟3執行結果

4)輸入val counts = ones.reducebykey(_ + _),執行結果如圖1-7所示。

圖1-7 步驟4執行結果

5)輸入counts.foreach(println),任務執行過程如圖1-8和圖1-9所示。輸出結果如圖1-10所示。

圖1-8 步驟5執行過程部分(一)

圖1-9 步驟5執行過程部分(二)

圖1-10 步驟5輸出結果

在這些輸出日志中,我們先是看到spark中任務的送出與執行過程,然後看到單詞計數的輸出結果,最後列印一些任務結束的日志資訊。有關任務的執行分析,筆者将在第5章中展開。

1.2.3 剖析spark-shell

通過word count在spark-shell中執行的過程,我們想看看spark-shell做了什麼。spark-shell中有以下一段腳本,見代碼清單1-1。

代碼清單1-1 spark-shell中的一段腳本

function main() {

    if $cygwin; then

stty -icanonmin 1 -echo > /dev/null 2>&1

        export spark_submit_opts="$spark_submit_opts -djline.terminal=unix"

        "$fwdir"/bin/spark-submit --class org.apache.spark.repl.main "${submission_opts[@]}" spark-shell "${application_opts[@]}"

sttyicanon echo > /dev/null 2>&1

    else

        export spark_submit_opts

fi

}

我們看到腳本spark-shell裡執行了spark-submit腳本,打開spark-submit腳本,發現其中包含以下腳本。

exec "$spark_home"/bin/spark-class org.apache.spark.deploy.sparksubmit "${orig_args[@]}"

腳本spark-submit在執行spark-class腳本時,給它增加了參數sparksubmit。打開spark-class腳本,其中包含以下腳本,見代碼清單1-2。

代碼清單1-2 spark-class

if [ -n "${java_home}" ]; then

    runner="${java_home}/bin/java"

else

    if [ `command -v java` ]; then

        runner="java"

       echo "java_home is not set" >&2

       exit 1

    fi

exec "$runner" -cp "$classpath" $java_opts "$@"

讀到這裡,應該知道spark啟動了以sparksubmit為主類的jvm程序。

為便于在本地對spark程序使用遠端監控,給spark-class腳本追加以下jmx配置:

java_opts="-xx:maxpermsize=128m $our_java_opts -dcom.sun.management.jmxremote -dcom.sun.management.jmxremote.port=10207 -dcom.sun.management.jmxremote.authenticate=false -dcom.sun.management.jmxremote.ssl=false"

在本地打開jvisualvm,添加遠端主機,如圖1-11所示。

右擊已添加的遠端主機,添加jmx連接配接,如圖1-12所示。

單擊右側的“線程”頁籤,選擇main線程,然後單擊“線程dump”按鈕,如圖1-13所示。

從dump的内容中找到線程main的資訊,如代碼清單1-3所示。

圖1-13 檢視spark線程

代碼清單1-3 main線程dump資訊

"main" - thread t@1

    java.lang.thread.state: runnable

        at java.io.fileinputstream.read0(native method)

        at java.io.fileinputstream.read(fileinputstream.java:210)

        at scala.tools.jline.terminalsupport.readcharacter(terminalsupport.java:152)

        at scala.tools.jline.unixterminal.readvirtualkey(unixterminal.java:125)

        at scala.tools.jline.console.consolereader.readvirtualkey(consolereader.

        java:933)

        at scala.tools.jline.console.consolereader.readbinding(consolereader.java:1136)

        at scala.tools.jline.console.consolereader.readline(consolereader.java:1218)

        at scala.tools.jline.console.consolereader.readline(consolereader.java:1170)

        at org.apache.spark.repl.sparkjlinereader.readoneline(sparkjlinereader.

        scala:80)

        at scala.tools.nsc.interpreter.interactivereader$class.readline(interactive-

        reader.scala:43)

        at org.apache.spark.repl.sparkjlinereader.readline(sparkjlinereader.scala:25)

        at org.apache.spark.repl.sparkiloop.readoneline$1(sparkiloop.scala:619)

        at org.apache.spark.repl.sparkiloop.innerloop$1(sparkiloop.scala:636)

        at org.apache.spark.repl.sparkiloop.loop(sparkiloop.scala:641)

        at org.apache.spark.repl.sparkiloop$$anonfun$process$1.apply$mcz$sp 

        (sparki-loop.scala:968)

        at org.apache.spark.repl.sparkiloop$$anonfun$process$1.apply(sparkiloop.

        scala:916)

        at scala.tools.nsc.util.scalaclassloader$.savingcontextloader(scalaclass

        loader.scala:135)

        at org.apache.spark.repl.sparkiloop.process(sparkiloop.scala:916)

        at org.apache.spark.repl.sparkiloop.process(sparkiloop.scala:1011)

        at org.apache.spark.repl.main$.main(main.scala:31)

        at org.apache.spark.repl.main.main(main.scala)

        at sun.reflect.nativemethodaccessorimpl.invoke0(native method)

        at sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.

        java:57)

        at sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodacces-

        sorimpl.java:43)

        at java.lang.reflect.method.invoke(method.java:606)

        at org.apache.spark.deploy.sparksubmit$.launch(sparksubmit.scala:358)

        at org.apache.spark.deploy.sparksubmit$.main(sparksubmit.scala:75)

        at org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala)

從main線程的棧資訊中可看出程式的調用順序:sparksubmit.main→repl.main→sparki-loop.process。sparkiloop.process方法中會調用initializespark方法,initializespark的實作見代碼清單1-4。

代碼清單1-4 initializespark的實作

def initializespark() {

intp.bequietduring {

    command("""

        @transient val sc = {

            val _sc = org.apache.spark.repl.main.interp.createsparkcontext()

            println("spark context available as sc.")

            _sc

        }

        """)

        command("import org.apache.spark.sparkcontext._")

    }

我們看到initializespark調用了createsparkcontext方法,createsparkcontext的實作見代碼清單1-5。

代碼清單1-5 createsparkcontext的實作

def createsparkcontext(): sparkcontext = {

valexecuri = system.getenv("spark_executor_uri")

valjars = sparkiloop.getaddedjars

valconf = new sparkconf()

    .setmaster(getmaster())

    .setappname("spark shell")

    .setjars(jars)

    .set("spark.repl.class.uri", intp.classserver.uri)

if (execuri != null) {

                      conf.set("spark.executor.uri", execuri)

sparkcontext = new sparkcontext(conf)

    loginfo("created spark context..")

    sparkcontext

這裡最終使用sparkconf和sparkcontext來完成初始化,具體内容将在第3章講解。代碼分析中涉及的repl主要用于與spark實時互動。

1.3 閱讀環境準備

準備spark閱讀環境,同樣需要一台好機器。筆者調試源碼的機器的記憶體是8 gb。源碼閱讀的前提是在ide環境中打包、編譯通過。常用的ide有intellij idea、eclipse。筆者選擇用eclipse編譯spark,原因有二:一是由于使用多年對它比較熟悉,二是社群中使用eclipse編譯spark的資料太少,在這裡可以做個補充。在windows系統編譯spark源碼,除了安裝jdk外,還需要安裝以下工具。

(1)安裝scala

由于spark 1.20版本的sbt裡指定的scala版本是2.10.4,具體見spark源碼目錄下的檔案\project\plugins.sbt,其中有一行:scalaversion := "2.10.4"。是以選擇下載下傳scala-2.10.4.msi,下載下傳位址:http://www.scala-lang.org/download/。

下載下傳完畢,安裝scala-2.10.4.msi。

(2)安裝sbt

由于scala使用sbt作為建構工具,是以需要下載下傳sbt。下載下傳位址:http://www.scala-sbt.org/,下載下傳最新的安裝包sbt-0.13.8.msi并安裝。

(3)安裝git bash

由于spark源碼使用git作為版本控制工具,是以需要下載下傳git的用戶端工具,推薦使用git bash,因為它更符合linux下的操作習慣。下載下傳位址:http://msysgit.github.io/,下載下傳最新的版本并安裝。

(4)安裝eclipse scala ide插件

eclipse通過強大的插件方式支援各種ide工具的內建,要在eclipse中編譯、調試、運作scala程式,就需要安裝eclipse scala ide插件。下載下傳位址:http://scala-ide.org/download/current.html。

由于筆者本地的eclipse版本是eclipse 4.4 (luna),是以選擇安裝插件http://download.scala-ide.org/sdk/lithium/e44/scala211/stable/site,如圖1-14所示。

圖1-14 eclipse scala ide插件安裝位址

在eclipse中選擇help菜單,然後選擇install new software…選項,打開install對話框,如圖1-15所示。

圖1-15 install對話框

單擊add按鈕,打開add repository對話框,輸入插件位址,如圖1-16所示。

圖1-16 添加scala ide插件位址

全選插件的内容,完成安裝,如圖1-17所示。

圖1-17 安裝scala ide插件

1.4 spark源碼編譯與調試

1.下載下傳spark源碼

首先,通路spark官網http://spark.apache.org/,如圖1-18所示。

圖1-18 spark官網

單擊download spark按鈕,在下一個頁面找到git位址,如圖1-19所示。

圖1-19 spark官方git位址

打開git bash工具,輸入git clone git://github.com/apache/spark.git指令将源碼下載下傳到本地,如圖1-20所示。

圖1-20 下載下傳spark源碼

2.建構scala應用

使用cmd指令行進到spark根目錄,執行sbt指令。會下載下傳和解析很多jar包,要等很長時間,筆者大概花了一個多小時才執行完。

3.使用sbt生成eclipse工程檔案

等sbt提示符(>)出現後,輸入eclipse指令,開始生成eclipse工程檔案,也需要花費很長時間,筆者本地大緻花了40分鐘。完成時的狀況如圖1-21所示。

圖1-21 sbt編譯過程

現在我們檢視spark下的子檔案夾,發現其中都生成了.project和.classpath檔案。比如mllib項目下就生成了.project和.classpath檔案,如圖1-22所示。

圖1-22 sbt生成的項目檔案

4.編譯spark源碼

由于spark使用maven作為項目管理工具,是以需要将spark項目作為maven項目導入eclipse中,如圖1-23所示。

單擊next按鈕進入下一個對話框,如圖1-24所示。

圖1-23 導入maven項目

全選所有項目,單擊finish按鈕,這樣就完成了導入,如圖1-25所示。

導入完成後,需要設定每個子項目的build path。右擊每個項目,選擇“build path”→ “configure build path…”,打開java build path界面,如圖1-26所示。

圖1-26 java編譯目錄

單擊add external jars按鈕,将spark項目下的lib_managed檔案夾的子檔案夾bundles和jars内的jar包添加進來。

lib_managed/jars檔案夾下有很多打好的spark的包,比如:spark-catalyst_2.10-1.3.2-snapshot.jar。這些jar包有可能與你下載下傳的spark源碼的版本不一緻,導緻你在調試源碼時,發生jar包沖突。是以請将它們排除出去。

eclipse在對項目編譯時,筆者本地出現了很多錯誤,有關這些錯誤的解決建議參見附錄h。所有錯誤解決後運作mvn clean install,如圖1-27所示。

5.調試spark源碼

以spark源碼自帶的javawordcount為例,介紹如何調試spark源碼。右擊javaword-count.java,選擇“debug as”→“java application”即可。如果想修改配置參數,右擊javawordcount.java,選擇“debug as”→“debug configurations…”,從打開的對話框中選擇javawordcount,在右側标簽可以修改java執行參數、jre、classpath、環境變量等配置,如圖1-28所示。

讀者也可以在spark源碼中設定斷點,進行跟蹤調試。

圖1-27 編譯成功

圖1-28 源碼調試

1.5 小結

本章通過引導大家在linux作業系統下搭建基本的執行環境,并且介紹spark-shell等腳本的執行,來幫助讀者由淺入深地進行spark源碼的學習。由于目前多數開發工作都在windows系統下進行,并且eclipse有最廣大的使用者群,即便是一些開始使用intellij的使用者對eclipse也不陌生,是以在windows環境下搭建源碼閱讀環境時,選擇這些最常用的工具,能降低讀者的學習門檻,并且替大家節省時間。

第2章

spark設計理念與基本架構

若夫乘天地之正,而禦六氣之辯,以遊無窮者,彼且惡乎待哉?

—《莊子·逍遙遊》

上一章,介紹了spark環境的搭建,為友善讀者學習spark做好準備。本章首先從spark産生的背景開始,介紹spark的主要特點、基本概念、版本變遷。然後簡要說明spark的主要子產品和程式設計模型。最後從spark的設計理念和基本架構入手,使讀者能夠對spark有宏觀的認識,為之後的内容做一些準備工作。

spark是一個通用的并行計算架構,由加州伯克利大學(ucberkeley)的amp實驗室開發于2009年,并于2010年開源,2013年成長為apache旗下大資料領域最活躍的開源項目之一。spark也是基于map reduce 算法模式實作的分布式計算架構,擁有hadoop mapreduce所具有的優點,并且解決了hadoop mapreduce中的諸多缺陷。

2.1 初識spark

2.1.1 hadoop mrv1的局限

hadoop1.0版本采用的是mrv1版本的mapreduce程式設計模型。mrv1版本的實作都封裝在org.apache.hadoop.mapred包中,mrv1的map和reduce是通過接口實作的。mrv1包括三個部分:

運作時環境(jobtracker和tasktracker);

程式設計模型(mapreduce);

資料處理引擎(map任務和reduce任務)。

mrv1存在以下不足:

可擴充性差:在運作時,jobtracker既負責資源管理又負責任務排程,當叢集繁忙時,jobtracker很容易成為瓶頸,最終導緻它的可擴充性問題。

可用性差:采用了單節點的master,沒有備用master及選舉操作,這導緻一旦master出現故障,整個叢集将不可用。

資源使用率低:tasktracker 使用slot等量劃分本節點上的資源量。slot代表計算資源(cpu、記憶體等)。一個task 擷取到一個slot 後才有機會運作,hadoop 排程器負責将各個tasktracker 上的空閑slot配置設定給task使用。一些task并不能充分利用slot,而其他task也無法使用這些空閑的資源。slot 分為map slot 和reduce slot 兩種,分别供maptask和reduce task使用。有時會因為作業剛剛啟動等原因導緻maptask很多,而reduce task任務還沒有排程的情況,這時reduce slot也會被閑置。

不能支援多種mapreduce架構:無法通過可插拔方式将自身的mapreduce架構替換為其他實作,如spark、storm等。

mrv1的示意如圖2-1所示。

apache為了解決以上問題,對hadoop進行更新改造,mrv2最終誕生了。mrv2重用了mrv1中的程式設計模型和資料處理引擎,但是運作時環境被重構了。jobtracker被拆分成了通用的資源排程平台(resourcemanager,rm)和負責各個計算架構的任務排程模型(applicationmaster,am)。mrv2中mapreduce的核心不再是mapreduce架構,而是yarn。在以yarn為核心的mrv2中,mapreduce架構是可插拔的,完全可以替換為其他mapreduce實作,比如spark、storm等。mrv2的示意如圖2-2所示。

hadoop mrv2雖然解決了mrv1中的一些問題,但是由于對hdfs的頻繁操作(包括計算結果持久化、資料備份及shuffle等)導緻磁盤i/o成為系統性能的瓶頸,是以隻适用于離線資料處理,而不能提供實時資料處理能力。

2.1.2 spark使用場景

hadoop常用于解決高吞吐、批量處理的業務場景,例如離線計算結果用于浏覽量統計。如果需要實時檢視浏覽量統計資訊,hadoop顯然不符合這樣的要求。spark通過記憶體計算能力極大地提高了大資料處理速度,滿足了以上場景的需要。此外,spark還支援sql查詢、流式計算、圖計算、機器學習等。通過對java、python、scala、r等語言的支援,極大地友善了使用者的使用。

2.1.3 spark的特點

spark看到mrv1的問題,對mapreduce做了大量優化,總結如下:

快速處理能力。随着實時大資料應用越來越多,hadoop作為離線的高吞吐、低響應架構已不能滿足這類需求。hadoop mapreduce的job将中間輸出和結果存儲在hdfs中,讀寫hdfs造成磁盤i/o成為瓶頸。spark允許将中間輸出和結果存儲在記憶體中,避免了大量的磁盤i/o。同時spark自身的dag執行引擎也支援資料在記憶體中的計算。spark官網聲稱性能比hadoop快100倍,如圖2-3所示。即便是記憶體不足,需要磁盤i/o,其速度也是hadoop的10倍以上。

易于使用。spark現在支援java、scala、python和r等語言編寫應用程式,大大降低了使用者的門檻。自帶了80多個高等級操作符,允許在scala、python、r的shell中進行互動式查詢。

支援查詢。spark支援sql及hive sql對資料查詢。

支援流式計算。與mapreduce隻能處理離線資料相比,spark還支援實時的流計算。spark依賴spark streaming對資料進行實時的處理,其流式處理能力還要強于storm。

可用性高。spark自身實作了standalone部署模式,此模式下的master可以有多個,解決了單點故障問題。此模式完全可以使用其他叢集管理器替換,比如yarn、mesos、ec2等。

豐富的資料源支援。spark除了可以通路作業系統自身的檔案系統和hdfs,還可以通路cassandra、hbase、hive、tachyon以及任何hadoop的資料源。這極大地友善了已經使用hdfs、hbase的使用者順利遷移到spark。

2.2 spark基礎知識

1.版本變遷

經過4年多的發展,spark目前的版本是1.4.1。我們簡單看看它的版本發展過程。

1)spark誕生于ucberkeley的amp實驗室(2009)。

2)spark正式對外開源(2010年)。

3)spark 0.6.0版本釋出(2012-10-15),進行了大範圍的性能改進,增加了一些新特性,并對standalone部署模式進行了簡化。

4)spark 0.6.2版本釋出(2013-02-07),解決了一些bug,并增強了系統的可用性。

5)spark 0.7.0版本釋出(2013-02-27),增加了更多關鍵特性,例如,python api、spark streaming的alpha版本等。

6)spark 0.7.2版本釋出(2013-06-02),性能改進并解決了一些bug,新增api使用的例子。

7)spark接受進入apache孵化器(2013-06-21)。

8)spark 0.7.3版本釋出(2013-07-16),解決一些bug,更新spark streaming api等。

9)spark 0.8.0版本釋出(2013-09-25),一些新功能及可用性改進。

10)spark 0.8.1版本釋出(2013-12-19),支援scala 2.9、yarn 2.2、standalone部署模式下排程的高可用性、shuffle的優化等。

11)spark 0.9.0版本釋出(2014-02-02),增加了graphx,機器學習新特性,流式計算新特性,核心引擎優化(外部聚合、加強對yarn的支援)等。

12)spark 0.9.1版本釋出(2014-04-09),增強使用yarn的穩定性,改進scala和python api的奇偶性。

13)spark 1.0.0版本釋出(2014-05-30),spark sql、mllib、graphx和spark streaming都增加了新特性并進行了優化。spark核心引擎還增加了對安全yarn叢集的支援。

14)spark 1.0.1版本釋出(2014-07-11),增加了spark sql的新特性和對json資料的支援等。

15)spark 1.0.2版本釋出(2014-08-05),spark核心api及streaming、python、mllib的bug修複。

16)spark 1.1.0版本釋出(2014-09-11)。

17)spark 1.1.1版本釋出(2014-11-26),spark核心api及streaming、python、sql、graphx和mllib的bug修複。

18)spark 1.2.0版本釋出(2014-12-18)。

19)spark 1.2.1版本釋出(2015-02-09),spark核心api及streaming、python、sql、graphx和mllib的bug修複。

20)spark 1.3.0版本釋出(2015-03-13)。

21)spark 1.4.0版本釋出(2015-06-11)。

22)spark 1.4.1版本釋出(2015-07-15),dataframe api及streaming、python、sql和mllib的bug修複。

2.基本概念

要想對spark有整體性的了解,推薦讀者閱讀matei zaharia的spark論文。此處筆者先介紹spark中的一些概念:

rdd(resillient distributed dataset):彈性分布式資料集。

task:具體執行任務。task分為shufflemaptask和resulttask兩種。shufflemaptask和resulttask分别類似于hadoop中的map和reduce。

job:使用者送出的作業。一個job可能由一到多個task組成。

stage:job分成的階段。一個job可能被劃分為一到多個stage。

partition:資料分區。即一個rdd的資料可以劃分為多少個分區。

narrowdependency:窄依賴,即子rdd依賴于父rdd中固定的partition。narrow-dependency分為onetoonedependency和rangedependency兩種。

shuffledependency:shuffle依賴,也稱為寬依賴,即子rdd對父rdd中的所有partition都有依賴。

dag(directed acycle graph):有向無環圖。用于反映各rdd之間的依賴關系。

3. scala與java的比較

spark為什麼要選擇java作為開發語言?筆者不得而知。如果能對二者進行比較,也許能看出一些端倪。表2-1列出了scala與java的比較。

表2-1 scala與java的比較

比項項 scala java

語言類型  面向函數為主,兼有面向對象 面向對象(java8也增加了lambda函數程式設計)

簡潔性  非常簡潔 不簡潔

類型推斷  豐富的類型推斷,例如深度和鍊式的類型推斷、 duck type、隐式類型轉換等,但也是以增加了編譯時長 少量的類型推斷

可讀性  一般,豐富的文法糖導緻的各種奇幻用法,例如方法簽名 好

學習成本  較高 一般

語言特性  非常豐富的文法糖和更現代的語言特性,例如 option、模式比對、使用空格的方法調用 豐富

并發程式設計  使用actor的消息模型 使用阻塞、鎖、阻塞隊列等

通過以上比較似乎仍然無法判斷spark選擇java作為開發語言的原因。由于函數式程式設計更接近計算機思維,是以便于通過算法從大資料中模組化,這應該更符合spark作為大資料架構的理念吧!

2.3 spark基本設計思想

2.3.1 spark子產品設計

整個spark主要由以下子產品組成:

spark core:spark的核心功能實作,包括:sparkcontext的初始化(driver application通過sparkcontext送出)、部署模式、存儲體系、任務送出與執行、計算引擎等。

spark sql:提供sql處理能力,便于熟悉關系型資料庫操作的工程師進行互動查詢。此外,還為熟悉hadoop的使用者提供hive sql處理能力。

spark streaming:提供流式計算處理能力,目前支援kafka、flume、twitter、mqtt、zeromq、kinesis和簡單的tcp套接字等資料源。此外,還提供視窗操作。

graphx:提供圖計算處理能力,支援分布式,pregel提供的api可以解決圖計算中的常見問題。

mllib:提供機器學習相關的統計、分類、回歸等領域的多種算法實作。其一緻的api接口大大降低了使用者的學習成本。

spark sql、spark streaming、graphx、mllib的能力都是建立在核心引擎之上,如圖2-4所示。

1. spark核心功能

spark core提供spark最基礎與最核心的功能,主要包括以下功能。

sparkcontext:通常而言,driver application的執行與輸出都是通過sparkcontext來完成的,在正式送出application之前,首先需要初始化sparkcontext。sparkcontext隐藏了網絡通信、分布式部署、消息通信、存儲能力、計算能力、緩存、測量系統、檔案服務、web服務等内容,應用程式開發者隻需要使用sparkcontext提供的api完成功能開發。sparkcontext内置的dagscheduler負責建立job,将dag中的rdd劃分到不同的stage,送出stage等功能。内置的taskscheduler負責資源的申請、任務的送出及請求叢集對任務的排程等工作。

存儲體系:spark優先考慮使用各節點的記憶體作為存儲,當記憶體不足時才會考慮使用磁盤,這極大地減少了磁盤i/o,提升了任務執行的效率,使得spark适用于實時計算、流式計算等場景。此外,spark還提供了以記憶體為中心的高容錯的分布式檔案系統tachyon供使用者進行選擇。tachyon能夠為spark提供可靠的記憶體級的檔案共享服務。

計算引擎:計算引擎由sparkcontext中的dagscheduler、rdd以及具體節點上的executor負責執行的map和reduce任務組成。dagscheduler和rdd雖然位于sparkcontext内部,但是在任務正式送出與執行之前會将job中的rdd組織成有向無關圖(簡稱dag),并對stage進行劃分,決定了任務執行階段任務的數量、疊代計算、shuffle等過程。

部署模式:由于單節點不足以提供足夠的存儲及計算能力,是以作為大資料處理的spark在sparkcontext的taskscheduler元件中提供了對standalone部署模式的實作和yarn、mesos等分布式資源管理系統的支援。通過使用standalone、yarn、mesos等部署模式為task配置設定計算資源,提高任務的并發執行效率。除了可用于實際生産環境的standalone、yarn、mesos等部署模式外,spark還提供了local模式和local-cluster模式便于開發和調試。

2. spark擴充功能

為了擴大應用範圍,spark陸續增加了一些擴充功能,主要包括:

spark sql:sql具有普及率高、學習成本低等特點,為了擴大spark的應用面,增加了對sql及hive的支援。spark sql的過程可以總結為:首先使用sql語句解析器(sqlparser)将sql轉換為文法樹(tree),并且使用規則執行器(ruleexecutor)将一系列規則(rule)應用到文法樹,最終生成實體執行計劃并執行。其中,規則執行器包括文法分析器(analyzer)和優化器(optimizer)。hive的執行過程與sql類似。

spark streaming:spark streaming與apache storm類似,也用于流式計算。spark streaming支援kafka、flume、twitter、mqtt、zeromq、kinesis和簡單的tcp套接字等多種資料輸入源。輸入流接收器(receiver)負責接入資料,是接入資料流的接口規範。dstream是spark streaming中所有資料流的抽象,dstream可以被組織為dstream graph。dstream本質上由一系列連續的rdd組成。

graphx:spark提供的分布式圖計算架構。graphx主要遵循整體同步并行(bulk synchronous parallell,bsp)計算模式下的pregel模型實作。graphx提供了對圖的抽象graph,graph由頂點(vertex)、邊(edge)及繼承了edge的edgetriplet(添加了srcattr和dstattr用來儲存源頂點和目的頂點的屬性)三種結構組成。graphx目前已經封裝了最短路徑、網頁排名、連接配接元件、三角關系統計等算法的實作,使用者可以選擇使用。

mllib:spark提供的機器學習架構。機器學習是一門涉及機率論、統計學、逼近論、凸分析、算法複雜度理論等多領域的交叉學科。mllib目前已經提供了基礎統計、分類、回歸、決策樹、随機森林、樸素貝葉斯、保序回歸、協同過濾、聚類、維數縮減、特征提取與轉型、頻繁模式挖掘、預言模型标記語言、管道等多種數理統計、機率論、資料挖掘方面的數學算法。

2.3.2 spark模型設計

1. spark程式設計模型

spark 應用程式從編寫到送出、執行、輸出的整個過程如圖2-5所示,圖中描述的步驟如下。

1)使用者使用sparkcontext提供的api(常用的有textfile、sequencefile、runjob、stop等)編寫driver application程式。此外sqlcontext、hivecontext及streamingcontext對spark-context進行封裝,并提供了sql、hive及流式計算相關的api。

2)使用sparkcontext送出的使用者應用程式,首先會使用blockmanager和broadcast-manager将任務的hadoop配置進行廣播。然後由dagscheduler将任務轉換為rdd并組織成dag,dag還将被劃分為不同的stage。最後由taskscheduler借助actorsystem将任務送出給叢集管理器(cluster manager)。

3)叢集管理器(cluster manager)給任務配置設定資源,即将具體任務配置設定到worker上,worker建立executor來處理任務的運作。standalone、yarn、mesos、ec2等都可以作為spark的叢集管理器。

2. rdd計算模型

rdd可以看做是對各種資料計算模型的統一抽象,spark的計算過程主要是rdd的疊代計算過程,如圖2-6所示。rdd的疊代計算過程非常類似于管道。分區數量取決于partition數量的設定,每個分區的資料隻會在一個task中計算。所有分區可以在多個機器節點的executor上并行執行。

圖2-5 代碼執行過程

圖2-6 rdd計算模型

2.4 spark基本架構

從叢集部署的角度來看,spark叢集由以下部分組成:

cluster manager:spark的叢集管理器,主要負責資源的配置設定與管理。叢集管理器配置設定的資源屬于一級配置設定,它将各個worker上的記憶體、cpu等資源配置設定給應用程式,但是并不負責對executor的資源配置設定。目前,standalone、yarn、mesos、ec2等都可以作為spark的叢集管理器。

worker:spark的工作節點。對spark應用程式來說,由叢集管理器配置設定得到資源的worker節點主要負責以下工作:建立executor,将資源和任務進一步配置設定給executor,同步資源資訊給cluster manager。

executor:執行計算任務的一線程序。主要負責任務的執行以及與worker、driver app的資訊同步。

driver app:用戶端驅動程式,也可以了解為用戶端應用程式,用于将任務程式轉換為rdd和dag,并與cluster manager進行通信與排程。

這些組成部分之間的整體關系如圖2-7所示。

圖2-7 spark基本架構圖

2.5 小結

每項技術的誕生都會由某種社會需求所驅動,spark正是在實時計算的大量需求下誕生的。spark借助其優秀的處理能力、可用性高、豐富的資料源支援等特點,在目前大資料領域變得火熱,參與的開發者也越來越多。spark經過幾年的疊代發展,如今已經提供了豐富的功能。筆者相信,spark在未來必将産生更耀眼的火花。

第3章 sparkcontext的初始化

第4章 存儲體系

第5章 任務送出與執行

第6章 計算引擎

第7章 部署模式

第3章

sparkcontext的初始化

道生一, 一生二, 二生三, 三生萬物。

—《道德經》

sparkcontext的初始化是driver應用程式送出執行的前提,本章内容以local模式為主,并按照代碼執行順序講解,這将有助于首次接觸spark的讀者了解源碼。讀者朋友如果能邊跟蹤代碼,邊學習本章内容,也許是快速了解sparkcontext初始化過程的便捷途徑。已經熟練使用spark的開發人員可以選擇跳過本章内容。

本章将在介紹sparkcontext初始化過程的同時,向讀者介紹各個元件的作用,為閱讀後面的章節打好基礎。spark中的元件很多,就其功能而言涉及網絡通信、分布式、消息、存儲、計算、緩存、測量、清理、檔案服務、web ui的方方面面。

3.1 sparkcontext概述

spark driver用于送出使用者應用程式,實際可以看作spark的用戶端。了解spark driver的初始化,有助于讀者了解使用者應用程式在用戶端的處理過程。

spark driver的初始化始終圍繞着sparkcontext的初始化。sparkcontext可以算得上是所有spark應用程式的發動機引擎,轎車要想跑起來,發動機首先要啟動。sparkcontext初始化完畢,才能向spark叢集送出任務。在平坦的公路上,發動機隻需以較低的轉速、較低的功率就可以遊刃有餘;在山區,你可能需要一台能夠提供大功率的發動機才能滿足你的需求。這些參數都是通過駕駛員操作油門、檔位等傳送給發動機的,而sparkcontext的配置參數則由sparkconf負責,sparkconf就是你的操作面闆。

sparkconf的構造很簡單,主要是通過concurrenthashmap來維護各種spark的配置屬性。sparkconf代碼結構見代碼清單3-1。spark的配置屬性都是以“spark.”開頭的字元串。

代碼清單3-1 sparkconf代碼結構

class sparkconf(loaddefaults: boolean) extends cloneable with logging {

    import sparkconf._

    def this() = this(true)

    private val settings = new concurrenthashmap[string, string]()

    if (loaddefaults) {

        // 加載任何以spark.開頭的系統屬性

        for ((key, value) <- utils.getsystemproperties if key.startswith("spark.")) {

            set(key, value)

//其餘代碼省略

現在開始介紹sparkcontext。sparkcontext的初始化步驟如下:

1)建立spark執行環境sparkenv;

2)建立rdd清理器metadatacleaner;

3)建立并初始化spark ui;

4)hadoop相關配置及executor環境變量的設定;

5)建立任務排程taskscheduler;

6)建立和啟動dagscheduler;

7)taskscheduler的啟動;

8)初始化塊管理器blockmanager(blockmanager是存儲體系的主要元件之一,将在第4章介紹);

9)啟動測量系統metricssystem;

10)建立和啟動executor配置設定管理器executorallocationmanager;

11)contextcleaner的建立與啟動;

12)spark環境更新;

13)建立dagschedulersource和blockmanagersource;

14)将sparkcontext标記為激活。

sparkcontext的主構造器參數為sparkconf,其實作如下。

class sparkcontext(config: sparkconf) extends logging with executorallocationclient {

private val creationsite: callsite = utils.getcallsite()

    private val allowmultiplecontexts: boolean =

        config.getboolean("spark.driver.allowmultiplecontexts", false)

    sparkcontext.markpartiallyconstructed(this, allowmultiplecontexts)

上面代碼中的callsite存儲了線程棧中最靠近棧頂的使用者類及最靠近棧底的scala或者spark核心類資訊。utils.getcallsite的詳細資訊見附錄a。sparkcontext預設隻有一個執行個體(由屬性spark.driver.allowmultiplecontexts來控制,使用者需要多個sparkcontext執行個體時,可以将其設定為true),方法markpartiallyconstructed用來確定執行個體的唯一性,并将目前sparkcontext标記為正在建構中。

接下來會對sparkconf進行複制,然後對各種配置資訊進行校驗,代碼如下。

private[spark] val conf = config.clone()

conf.validatesettings()

if (!conf.contains("spark.master")) {

    throw new sparkexception("a master url must be set in your configuration")

if (!conf.contains("spark.app.name")) {

    throw new sparkexception("an application name must be set in your configuration")

從上面校驗的代碼看到必須指定屬性spark.master 和spark.app.name,否則會抛出異常,結束初始化過程。spark.master用于設定部署模式,spark.app.name用于指定應用程式名稱。

3.2 建立執行環境sparkenv

sparkenv是spark的執行環境對象,其中包括衆多與executor執行相關的對象。由于在local模式下driver會建立executor,local-cluster部署模式或者standalone部署模式下worker另起的coarsegrainedexecutorbackend程序中也會建立executor,是以sparkenv存在于driver或者coarsegrainedexecutorbackend程序中。建立sparkenv 主要使用sparkenv的createdriverenv,sparkenv.createdriverenv方法有三個參數:conf、islocal和 listenerbus。

val islocal = (master == "local" || master.startswith("local["))

private[spark] val listenerbus = new livelistenerbus

    conf.set("spark.executor.id", "driver")

    private[spark] val env = sparkenv.createdriverenv(conf, islocal, listenerbus)

    sparkenv.set(env)

上面代碼中的conf是對sparkconf的複制,islocal辨別是否是單機模式,listenerbus采用監聽器模式維護各類事件的處理,在3.4.1節會詳細介紹。

sparkenv的方法createdriverenv最終調用create建立sparkenv。sparkenv的構造步驟如下:

1)建立安全管理器securitymanager;

2)建立基于akka的分布式消息系統actorsystem;

3)建立map任務輸出跟蹤器mapoutputtracker;

4)執行個體化shufflemanager;

5)建立shufflememorymanager;

6)建立塊傳輸服務blocktransferservice;

7)建立blockmanagermaster;

8)建立塊管理器blockmanager;

9)建立廣播管理器broadcastmanager;

10)建立緩存管理器cachemanager;

11)建立http檔案伺服器httpfileserver;

12)建立測量系統metricssystem;

13)建立sparkenv。

3.2.1 安全管理器securitymanager

securitymanager主要對權限、賬号進行設定,如果使用hadoop yarn作為叢集管理器,則需要使用證書生成 secret key登入,最後給目前系統設定預設的密碼認證執行個體,此執行個體采用匿名内部類實作,參見代碼清單3-2。

代碼清單3-2 securitymanager的實作

private val secretkey = generatesecretkey()

// 使用http連接配接設定密碼認證

if (authon) {

    authenticator.setdefault(

        new authenticator() {

            override def getpasswordauthentication(): passwordauthentication = {

                var passauth: passwordauthentication = null

            val userinfo = getrequestingurl().getuserinfo()

            if (userinfo != null) {

                val  parts = userinfo.split(":", 2)

                passauth = new passwordauthentication(parts(0), parts(1).tochararray())

                }

                return passauth

            }

    )

3.2.2 基于akka的分布式消息系統actorsystem

actorsystem是spark中最基礎的設施,spark既使用它發送分布式消息,又用它實作并發程式設計。消息系統可以實作并發?要解釋清楚這個問題,首先應該簡單介紹下scala語言的actor并發程式設計模型:scala認為java線程通過共享資料以及通過鎖來維護共享資料的一緻性是糟糕的做法,容易引起鎖的争用,降低并發程式的性能,甚至會引入死鎖的問題。在scala中隻需要自定義類型繼承actor,并且提供act方法,就如同java裡實作runnable接口,需要實作run方法一樣。但是不能直接調用act方法,而是通過發送消息的方式(scala發送消息是異步的)傳遞資料。如:

actor ! message

akka是actor程式設計模型的進階類庫,類似于jdk 1.5之後越來越豐富的并發工具包,簡化了程式員并發程式設計的難度。actorsystem便是akka提供的用于建立分布式消息通信系統的基礎類。akka的具體資訊見附錄b。

正是因為actor輕量級的并發程式設計、消息發送以及actorsystem支援分布式消息發送等特點,spark選擇了actorsystem。

sparkenv中建立actorsystem時用到了akkautils工具類,見代碼清單3-3。akkautils.createactorsystem方法用于啟動actorsystem,見代碼清單3-4。akkautils使用了utils的靜态方法startserviceonport, startserviceonport最終會回調方法startservice: int => (t, int),此處的startservice實際是方法docreateactorsystem。真正啟動actorsystem是由docreate-actorsystem方法完成的,docreateactorsystem的具體實作細節請見附錄b。spark的driver中akka的預設通路位址是akka://sparkdriver,spark的executor中akka的預設通路位址是akka:// sparkexecutor。如果不指定actorsystem的端口,那麼所有節點的actorsystem端口在每次啟動時随機産生。關于startserviceonport的實作,請見附錄a。

代碼清單3-3 akkautils工具類建立和啟動actorsystem

val (actorsystem, boundport) =

    option(defaultactorsystem) match {

        case some(as) => (as, port)

        case none =>

            val actorsystemname = if (isdriver) driveractorsystemname else executoractorsystemname

            akkautils.createactorsystem(actorsystemname, hostname, port, conf, securitymanager)

代碼清單3-4 actorsystem的建立和啟動

def createactorsystem(

        name: string,

        host: string,

        port: int,

        conf: sparkconf,

        securitymanager: securitymanager): (actorsystem, int) = {

    val startservice: int => (actorsystem, int) = { actualport =>

        docreateactorsystem(name, host, actualport, conf, securitymanager)

    utils.startserviceonport(port, startservice, conf, name)

3.2.3 map任務輸出跟蹤器mapoutputtracker

mapoutputtracker用于跟蹤map階段任務的輸出狀态,此狀态便于reduce階段任務擷取位址及中間輸出結果。每個map任務或者reduce任務都會有其唯一辨別,分别為mapid和reduceid。每個reduce任務的輸入可能是多個map任務的輸出,reduce會到各個map任務的所在節點上拉取block,這一過程叫做shuffle。每批shuffle過程都有唯一的辨別shuffleid。

這裡先介紹下mapoutputtrackermaster。mapoutputtrackermaster内部使用mapstatuses:timestampedhashmap[int, array[mapstatus]]來維護跟蹤各個map任務的輸出狀态。其中key對應shuffleid,array存儲各個map任務對應的狀态資訊mapstatus。由于mapstatus維護了map輸出block的位址blockmanagerid,是以reduce任務知道從何處擷取map任務的中間輸出。mapoutputtrackermaster還使用cachedserializedstatuses:timestampedhashmap[int, array[byte]]維護序列化後的各個map任務的輸出狀态。其中key對應shuffleid,array存儲各個序列化mapstatus生成的位元組數組。

driver和executor處理mapoutputtrackermaster的方式有所不同。

如果目前應用程式是driver,則建立mapoutputtrackermaster,然後建立mapoutputtrackermasteractor,并且注冊到actorsystem中。

如果目前應用程式是executor,則建立mapoutputtrackerworker,并從actorsystem中找到mapoutputtrackermasteractor。

無論是driver還是executor,最後都由mapoutputtracker的屬性trackeractor持有mapoutputtrackermasteractor的引用,參見代碼清單3-5。

代碼清單3-5 registerorlookup方法用于查找或者注冊actor的實作

def registerorlookup(name: string, newactor: => actor): actorref = {

    if (isdriver) {

        loginfo("registering " + name)

        actorsystem.actorof(props(newactor), name = name)

    } else {

        akkautils.makedriverref(name, conf, actorsystem)

    val mapoutputtracker =  if (isdriver) {

        new mapoutputtrackermaster(conf)

        new mapoutputtrackerworker(conf)

    mapoutputtracker.trackeractor = registerorlookup(

        "mapoutputtracker",

    new mapoutputtrackermasteractor(mapoutputtracker.asinstanceof[mapoutputtrackermaster], conf))

在後面章節大家會知道map任務的狀态正是由executor向持有的mapoutputtracker-masteractor發送消息,将map任務狀态同步到mapoutputtracker的mapstatuses和cached-serializedstatuses的。executor究竟是如何找到mapoutputtrackermasteractor的?registerorlookup方法通過調用akkautils.makedriverref找到mapoutputtrackermasteractor,實際正是利用actorsystem提供的分布式消息機制實作的,具體細節參見附錄b。這裡第一次使用到了akka提供的功能,以後大家會漸漸感覺到使用akka的便捷。

3.2.4 執行個體化shufflemanager

shufflemanager負責管理本地及遠端的block資料的shuffle操作。shufflemanager預設為通過反射方式生成的sortshufflemanager的執行個體,可以修改屬性spark.shuffle.manager為hash來顯式控制使用hashshufflemanager。sortshufflemanager通過持有的indexshuffleblockmanager間接操作blockmanager中的diskblockmanager将map結果寫入本地,并根據shuffleid、mapid寫入索引檔案,也能通過mapoutputtrackermaster中維護的mapstatuses從本地或者其他遠端節點讀取檔案。有讀者可能會問,為什麼需要shuffle?spark作為并行計算架構,同一個作業會被劃分為多個任務在多個節點上并行執行,reduce的輸入可能存在于多個節點上,是以需要通過“洗牌”将所有reduce的輸入彙總起來,這個過程就是shuffle。這個問題以及對shufflemanager的具體使用會在第5章和第6章詳述。shufflemanager的執行個體化見代碼清單3-6。代碼清單3-6最後建立的shufflememorymanager将在3.2.5節介紹。

代碼清單3-6 shufflemanager的執行個體化及shufflememorymanager的建立

    val shortshufflemgrnames = map(

        "hash" -> "org.apache.spark.shuffle.hash.hashshufflemanager",

        "sort" -> "org.apache.spark.shuffle.sort.sortshufflemanager")

    val shufflemgrname = conf.get("spark.shuffle.manager", "sort")

    val shufflemgrclass = shortshufflemgrnames.get

orelse(shufflemgrname.tolowercase, shufflemgrname)

    val shufflemanager = instantiateclass[shufflemanager](shufflemgrclass)

    val shufflememorymanager = new shufflememorymanager(conf)

3.2.5 shuffle線程記憶體管理器shufflememorymanager

shufflememorymanager負責管理shuffle線程占有記憶體的配置設定與釋放,并通過thread-memory:mutable.hashmap[long, long]緩存每個線程的記憶體位元組數,見代碼清單3-7。

代碼清單3-7 shufflememorymanager的資料結構

private[spark] class shufflememorymanager(maxmemory: long) extends logging {

    private val threadmemory = new mutable.hashmap[long, long]()  // threadid -> memory bytes

    def this(conf: sparkconf) = this(shufflememorymanager.getmaxmemory(conf))

getmaxmemory方法用于擷取shuffle所有線程占用的最大記憶體,實作如下。

def getmaxmemory(conf: sparkconf): long = {

    val memoryfraction = conf.getdouble("spark.shuffle.memoryfraction", 0.2)

    val safetyfraction = conf.getdouble("spark.shuffle.safetyfraction", 0.8)

    (runtime.getruntime.maxmemory * memoryfraction * safetyfraction).tolong

從上面代碼可以看出,shuffle所有線程占用的最大記憶體的計算公式為:

java運作時最大記憶體 * spark的shuffle最大記憶體占比 * spark的安全記憶體占比

可以配置屬性spark.shuffle.memoryfraction修改spark的shuffle最大記憶體占比,配置屬性spark.shuffle.safetyfraction修改spark的安全記憶體占比。

shufflememorymanager通常運作在executor中,driver中的shufflememorymanager 隻有在local模式下才起作用。

3.2.6 塊傳輸服務blocktransferservice

blocktransferservice預設為nettyblocktransferservice(可以配置屬性spark.shuffle.blocktransferservice使用nioblocktransferservice),它使用netty提供的異步事件驅動的網絡應用架構,提供web服務及用戶端,擷取遠端節點上block的集合。

val blocktransferservice =

    conf.get("spark.shuffle.blocktransferservice", "netty").tolowercase match {

        case "netty" =>

            new nettyblocktransferservice(conf, securitymanager, numusablecores)

        case "nio" =>

            new nioblocktransferservice(conf, securitymanager)

nettyblocktransferservice的具體實作将在第4章詳細介紹。這裡大家可能覺得奇怪,這樣的網絡應用為何也要放在存儲體系?大家不妨先帶着疑問,直到你真正了解了存儲體系。

3.2.7 blockmanagermaster介紹

blockmanagermaster負責對block的管理和協調,具體操作依賴于blockmanager-masteractor。driver和executor處理blockmanagermaster的方式不同:

如果目前應用程式是driver,則建立blockmanagermasteractor,并且注冊到actor-system中。

如果目前應用程式是executor,則從actorsystem中找到blockmanagermasteractor。

無論是driver還是executor,最後blockmanagermaster的屬性driveractor将持有對blockmanagermasteractor的引用。blockmanagermaster的建立代碼如下。

val blockmanagermaster = new blockmanagermaster(registerorlookup(

    "blockmanagermaster",

    new blockmanagermasteractor(islocal, conf, listenerbus)), conf, isdriver)

registerorlookup已在3.2.3節介紹過了,不再贅述。blockmanagermaster及blockmanager-masteractor的具體實作将在第4章詳細介紹。

3.2.8 建立塊管理器blockmanager

blockmanager負責對block的管理,隻有在blockmanager的初始化方法initialize被調用後,它才是有效的。blockmanager作為存儲系統的一部分,具體實作見第4章。blockmanager的建立代碼如下。

val blockmanager = new blockmanager(executorid, actorsystem, blockmanagermaster,

    serializer, conf, mapoutputtracker, shufflemanager, blocktransferservice, securitymanager,numusablecores)

3.2.9 建立廣播管理器broadcastmanager

broadcastmanager用于将配置資訊和序列化後的rdd、job以及shuffledependency等資訊在本地存儲。如果為了容災,也會複制到其他節點上。建立broadcastmanager的代碼實作如下。

val broadcastmanager = new broadcastmanager(isdriver, conf, securitymanager)

broadcastmanager必須在其初始化方法initialize被調用後,才能生效。initialize方法實際利用反射生成廣播工廠執行個體broadcastfactory(可以配置屬性spark.broadcast.factory指定,預設為org.apache.spark.broadcast.torrentbroadcastfactory)。broadcastmanager的廣播方法newbroadcast實際代理了工廠broadcastfactory的newbroadcast方法來生成廣播對象。unbroadcast方法實際代理了工廠broadcastfactory的unbroadcast方法生成非廣播對象。broadcastmanager的initialize、unbroadcast及newbroadcast方法見代碼清單3-8。

代碼清單3-8 broadcastmanager的實作

private def initialize() {

    synchronized {

        if (!initialized) {

            val broadcastfactoryclass = conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.torrentbroadcastfactory")

            broadcastfactory =

                class.forname(broadcastfactoryclass).newinstance.asinstanceof [broadcastfactory]

            broadcastfactory.initialize(isdriver, conf, securitymanager)

            initialized = true

    private val nextbroadcastid = new atomiclong(0)

    def newbroadcast[t: classtag](value_ : t, islocal: boolean) = {

        broadcastfactory.newbroadcast[t](value_, islocal, nextbroadcastid.getandincrement())

    def unbroadcast(id: long, removefromdriver: boolean, blocking: boolean) {

        broadcastfactory.unbroadcast(id, removefromdriver, blocking)

3.2.10 建立緩存管理器cachemanager

cachemanager用于緩存rdd某個分區計算後的中間結果,緩存計算結果發生在疊代計算的時候,将在6.1節講到。而cachemanager将在4.10節較長的描述。建立cachemanager的代碼如下。

val cachemanager = new cachemanager(blockmanager)

3.2.11 http檔案伺服器httpfileserver

httpfileserver的建立參見代碼清單3-9。httpfileserver主要提供對jar及其他檔案的http通路,這些jar包包括使用者上傳的jar包。端口由屬性spark.fileserver.port配置,預設為0,表示随機生成端口号。

代碼清單3-9 httpfileserver的建立

val httpfileserver =

        val fileserverport = conf.getint("spark.fileserver.port", 0)

        val server = new httpfileserver(conf, securitymanager, fileserverport)

        server.initialize()

        conf.set("spark.fileserver.uri",  server.serveruri)

        server

        null

httpfileserver的初始化過程見代碼清單3-10,主要包括以下步驟:

1)使用utils工具類建立檔案伺服器的根目錄及臨時目錄(臨時目錄在運作時環境關閉時會删除)。utils工具的詳細介紹見附錄a。

2)建立存放jar包及其他檔案的檔案目錄。

3)建立并啟動http服務。

代碼清單3-10 httpfileserver的初始化

def initialize() {

    basedir = utils.createtempdir(utils.getlocaldir(conf), "httpd")

    filedir = new file(basedir, "files")

    jardir = new file(basedir, "jars")

    filedir.mkdir()

    jardir.mkdir()

    loginfo("http file server directory is " + basedir)

    httpserver = new httpserver(conf, basedir, securitymanager, requestedport, "http file server")

    httpserver.start()

    serveruri = httpserver.uri

    logdebug("http file server started at: " + serveruri)

httpserver的構造和start方法的實作中,再次使用了utils的靜态方法startserviceonport,是以會回調dostart方法,見代碼清單3-11。有關jetty的api使用參見附錄c。

代碼清單3-11 httpserver的啟動

def start() {

    if (server != null) {

        throw new serverstateexception("server is already started")

        loginfo("starting http server")

        val (actualserver, actualport) =

            utils.startserviceonport[server](requestedport, dostart, conf, servername)

        server = actualserver

        port = actualport

dostart方法中啟動内嵌的jetty所提供的http服務,見代碼清單3-12。

代碼清單3-12 httpserver的啟動功能實作

private def dostart(startport: int): (server, int) = {

    val server = new server()

    val connector = new socketconnector

    connector.setmaxidletime(60 * 1000)

    connector.setsolingertime(-1)

    connector.setport(startport)

    server.addconnector(connector)

    val threadpool = new queuedthreadpool

    threadpool.setdaemon(true)

    server.setthreadpool(threadpool)

    val reshandler = new resourcehandler

    reshandler.setresourcebase(resourcebase.getabsolutepath)

    val handlerlist = new handlerlist

    handlerlist.sethandlers(array(reshandler, new defaulthandler))

    if (securitymanager.isauthenticationenabled()) {

        logdebug("httpserver is using security")

        val sh = setupsecurityhandler(securitymanager)

        // make sure we go through security handler to get resources

        sh.sethandler(handlerlist)

        server.sethandler(sh)

        logdebug("httpserver is not using security")

        server.sethandler(handlerlist)

    server.start()

    val actualport = server.getconnectors()(0).getlocalport

    (server, actualport)

3.2.12 建立測量系統metricssystem

metricssystem是spark的測量系統,建立metricssystem的代碼如下。

val metricssystem = if (isdriver) {

        metricssystem.createmetricssystem("driver", conf, securitymanager)

        conf.set("spark.executor.id", executorid)

        val ms = metricssystem.createmetricssystem("executor", conf, securitymanager)

        ms.start()

        ms

上面調用的createmetricssystem方法實際建立了metricssystem,代碼如下。

def createmetricssystem(

    instance: string, conf: sparkconf, securitymgr: securitymanager): metricssystem = {

    new metricssystem(instance, conf, securitymgr)

構造metricssystem的過程最重要的是調用了metricsconfig的initialize方法,見代碼清單3-13。

代碼清單3-13 metricsconfig的初始化

    setdefaultproperties(properties)

    var is: inputstream = null

    try {

        is = configfile match {

            case some(f) => new fileinputstream(f)

            case none => utils.getsparkclassloader.getresourceasstream(metrics_conf)

        if (is != null) {

            properties.load(is)

    } catch {

        case e: exception => logerror("error loading configure file", e)

    } finally {

        if (is != null) is.close()

    propertycategories = subproperties(properties, instance_regex)

    if (propertycategories.contains(default_prefix)) {

        import scala.collection.javaconversions._

        val defaultproperty = propertycategories(default_prefix)

        for { (inst, prop) <- propertycategories

            if (inst != default_prefix)

            (k, v) <- defaultproperty

            if (prop.getproperty(k) == null) } {

        prop.setproperty(k, v)

從以上實作可以看出,metricsconfig的initialize方法主要負責加載metrics.properties檔案中的屬性配置,并對屬性進行初始化轉換。

例如,将屬性

{*.sink.servlet.path=/metrics/json, applications.sink.servlet.path=/metrics/applications/json, *.sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet, master.sink.servlet.path=/metrics/master/json}

轉換為

map(applications -> {sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet, sink.servlet.path=/metrics/applications/json}, master -> {sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet, sink.servlet.path=/metrics/master/json}, * -> {sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet, sink.servlet.path=/metrics/json})

3.2.13 建立sparkenv

當所有的基礎元件準備好後,最終使用下面的代碼建立執行環境sparkenv。

new sparkenv(executorid, actorsystem, serializer, closureserializer, cachemanager,

        mapoutputtracker, shufflemanager, broadcastmanager, blocktransferservice,

    blockmanager, securitymanager, httpfileserver, sparkfilesdir, 

metricssystem, shufflememorymanager, conf)

serializer和closureserializer都是使用class.forname反射生成的org.apache.spark.serializer.javaserializer類的執行個體,其中closureserializer執行個體特别用來對scala中的閉包進行序列化。

3.3 建立metadatacleaner

sparkcontext為了保持對所有持久化的rdd的跟蹤,使用類型是timestamped-weakvaluehashmap的persistentrdds緩存。metadatacleaner的功能是清除過期的持久化rdd。建立metadatacleaner的代碼如下。

private[spark] val persistentrdds = new timestampedweakvaluehashmap[int, rdd[_]]

private[spark] val metadatacleaner =

    new metadatacleaner(metadatacleanertype.spark_context, this.cleanup, conf)

我們仔細看看metadatacleaner的實作,見代碼清單3-14。

代碼清單3-14 metadatacleaner的實作

private[spark] class metadatacleaner(

        cleanertype: metadatacleanertype.metadatacleanertype,

        cleanupfunc: (long) => unit,

        conf: sparkconf)

    extends logging

{

    val name = cleanertype.tostring

    private val delayseconds = metadatacleaner.getdelayseconds(conf, cleanertype)

    private val periodseconds = math.max(10, delayseconds / 10)

    private val timer = new timer(name + " cleanup timer", true)

    private val task = new timertask {

        override def run() {

        try {

            cleanupfunc(system.currenttimemillis() - (delayseconds * 1000))

            loginfo("ran metadata cleaner for " + name)

        } catch {

            case e: exception => logerror("error running cleanup task for " + name, e)

      }

    if (delayseconds > 0) {

        timer.schedule(task, delayseconds * 1000, periodseconds * 1000)

    def cancel() {

        timer.cancel()

從metadatacleaner的實作可以看出其實質是一個用timertask實作的定時器,不斷調用cleanupfunc: (long) => unit這樣的函數參數。構造metadatacleaner時的函數參數是cleanup,用于清理persistentrdds中的過期内容,代碼如下。

private[spark] def cleanup(cleanuptime: long) {

    persistentrdds.clearoldvalues(cleanuptime)

3.4 sparkui詳解

任何系統都需要提供監控功能,用浏覽器能通路具有樣式及布局并提供豐富監控資料的頁面無疑是一種簡單、高效的方式。sparkui就是這樣的服務,它的架構如圖3-1所示。

在大型分布式系統中,采用事件監聽機制是最常見的。為什麼要使用事件監聽機制?假如sparkui采用scala的函數調用方式,那麼随着整個叢集規模的增加,對函數的調用會越來越多,最終會受到driver所在jvm的線程數量限制而影響監控資料的更新,甚至出現監控資料無法及時顯示給使用者的情況。由于函數調用多數情況下是同步調用,這就導緻線程被阻塞,在分布式環境中,還可能因為網絡問題,導緻線程被長時間占用。将函數調用更換為發送事件,事件的處理是異步的,目前線程可以繼續執行後續邏輯,線程池中的線程還可以被重用,這樣整個系統的并發度會大大增加。發送的事件會存入緩存,由定時排程器取出後,配置設定給監聽此事件的監聽器對監控資料進行更新。

圖3-1 sparkui架構

我們先簡單介紹圖3-1中的各個元件:dagscheduler是主要的産生各類sparklistener-event的源頭,它将各種sparklistenerevent發送到listenerbus的事件隊列中,listenerbus通過定時器将sparklistenerevent事件比對到具體的sparklistener,改變sparklistener中的統計監控資料,最終由sparkui的界面展示。從圖3-1中還可以看到spark裡定義了很多監聽器sparklistener的實作,包括jobprogresslistener、environmentlistener、storagelistener、executorslistener,它們的類繼承體系如圖3-2所示。

圖3-2 sparklistener的類繼承體系

3.4.1 listenerbus詳解

listenerbus的類型是livelistenerbus。livelistenerbus實作了監聽器模型,通過監聽事件觸發對各種監聽器監聽狀态資訊的修改,達到ui界面的資料重新整理效果。livelistenerbus由以下部分組成:

事件阻塞隊列:類型為linkedblockingqueue[sparklistenerevent],固定大小是10 000;

監聽器數組:類型為arraybuffer[sparklistener],存放各類監聽器sparklistener。

事件比對監聽器的線程:此thread不斷拉取linkedblockingqueue中的事件,周遊監聽器,調用監聽器的方法。任何事件都會在linkedblockingqueue中存在一段時間,然後thread處理了此事件後,會将其清除。是以使用listenerbus這個名字再合适不過了,到站就下車。listenerbus的實作見代碼清單3-15。

代碼清單3-15 livelistenerbus的事件處理實作

private val event_queue_capacity = 10000

    private val eventqueue = new linkedblockingqueue[sparklistenerevent](event_queue_capacity)

    private var queuefullerrormessagelogged = false

    private var started = false

    // a counter that represents the number of events produced and consumed in the queue

    private val eventlock = new semaphore(0)

    private val listenerthread = new thread("sparklistenerbus") {

      setdaemon(true)

      override def run(): unit = utils.loguncaughtexceptions {

        while (true) {

            eventlock.acquire()

            // atomically remove and process this event

            livelistenerbus.this.synchronized {

                val event = eventqueue.poll

                if (event == sparklistenershutdown) {

                    // get out of the while loop and shutdown the daemon thread

                    return

                option(event).foreach(posttoall)

    if (started) {

        throw new illegalstateexception("listener bus already started!")

    listenerthread.start()

    started = true

def post(event: sparklistenerevent) {

    val eventadded = eventqueue.offer(event)

    if (eventadded) {

        eventlock.release()

        logqueuefullerrormessage()

def listenerthreadisalive: boolean = synchronized { listenerthread.isalive }

def queueisempty: boolean = synchronized { eventqueue.isempty }

def stop() {

   if (!started) {

        throw new illegalstateexception("attempted to stop a listener bus that has not yet started!")

    post(sparklistenershutdown)

    listenerthread.join()

livelistenerbus中調用的posttoall方法實際定義在父類sparklistenerbus中,如代碼清單3-16所示。

代碼清單3-16 sparklistenerbus中的監聽器調用

protected val sparklisteners = new arraybuffer[sparklistener]

    with mutable.synchronizedbuffer[sparklistener]

def addlistener(listener: sparklistener) {

    sparklisteners += listener

def posttoall(event: sparklistenerevent) {

    event match {

        case stagesubmitted: sparklistenerstagesubmitted =>

            foreachlistener(_.onstagesubmitted(stagesubmitted))

        case stagecompleted: sparklistenerstagecompleted =>

            foreachlistener(_.onstagecompleted(stagecompleted))

        case jobstart: sparklistenerjobstart =>

            foreachlistener(_.onjobstart(jobstart))

        case jobend: sparklistenerjobend =>

            foreachlistener(_.onjobend(jobend))

        case taskstart: sparklistenertaskstart =>

            foreachlistener(_.ontaskstart(taskstart))

        case taskgettingresult: sparklistenertaskgettingresult =>

            foreachlistener(_.ontaskgettingresult(taskgettingresult))

        case taskend: sparklistenertaskend =>

            foreachlistener(_.ontaskend(taskend))

        case environmentupdate: sparklistenerenvironmentupdate =>

            foreachlistener(_.onenvironmentupdate(environmentupdate))

        case blockmanageradded: sparklistenerblockmanageradded =>

            foreachlistener(_.onblockmanageradded(blockmanageradded))

        case blockmanagerremoved: sparklistenerblockmanagerremoved =>

            foreachlistener(_.onblockmanagerremoved(blockmanagerremoved))

        case unpersistrdd: sparklistenerunpersistrdd =>

            foreachlistener(_.onunpersistrdd(unpersistrdd))

        case applicationstart: sparklistenerapplicationstart =>

            foreachlistener(_.onapplicationstart(applicationstart))

        case applicationend: sparklistenerapplicationend =>

            foreachlistener(_.onapplicationend(applicationend))

        case metricsupdate: sparklistenerexecutormetricsupdate =>

            foreachlistener(_.onexecutormetricsupdate(metricsupdate))

        case sparklistenershutdown =>

private def foreachlistener(f: sparklistener => unit): unit = {

    sparklisteners.foreach { listener =>

            f(listener)

            case e: exception =>

            logerror(s"listener ${utils.getformattedclassname(listener)} threw an exception", e)

       }

3.4.2 構造jobprogresslistener

我們以jobprogresslistener為例來講解sparklistener。jobprogresslistener是sparkcontext中一個重要的組成部分,通過監聽listenerbus中的事件更新任務進度。sparkstatustracker和sparkui實際上也是通過jobprogresslistener來實作任務狀态跟蹤的。建立jobprogresslistener的代碼如下。

private[spark] val jobprogresslistener = new jobprogresslistener(conf)

listenerbus.addlistener(jobprogresslistener)

val statustracker = new sparkstatustracker(this)

jobprogresslistener的作用是通過hashmap、listbuffer等資料結構存儲jobid及對應的jobuidata資訊,并按照激活、完成、失敗等job狀态統計。對于stageid、stageinfo等資訊按照激活、完成、忽略、失敗等stage狀态統計,并且存儲stageid與jobid的一對多關系。這些統計資訊最終會被jobpage和stagepage等頁面通路和渲染。jobprogresslistener的資料結構見代碼清單3-17。

代碼清單3-17 jobprogresslistener維護的資訊

class jobprogresslistener(conf: sparkconf) extends sparklistener with logging {

    import jobprogresslistener._

    type jobid = int

    type stageid = int

    type stageattemptid = int

    type poolname = string

    type executorid = string

    // jobs:

    val activejobs = new hashmap[jobid, jobuidata]

    val completedjobs = listbuffer[jobuidata]()

    val failedjobs = listbuffer[jobuidata]()

    val jobidtodata = new hashmap[jobid, jobuidata]

    // stages:

    val activestages = new hashmap[stageid, stageinfo]

    val completedstages = listbuffer[stageinfo]()

    val skippedstages = listbuffer[stageinfo]()

    val failedstages = listbuffer[stageinfo]()

    val stageidtodata = new hashmap[(stageid, stageattemptid), stageuidata]

    val stageidtoinfo = new hashmap[stageid, stageinfo]

    val stageidtoactivejobids = new hashmap[stageid, hashset[jobid]]

    val pooltoactivestages = hashmap[poolname, hashmap[stageid, stageinfo]]()

    var numcompletedstages = 0 // 總共完成的stage數量

    var numfailedstages = 0 // 總共失敗的stage數量

    // misc:

    val executoridtoblockmanagerid = hashmap[executorid, blockmanagerid]()

    def blockmanagerids = executoridtoblockmanagerid.values.toseq

    var schedulingmode: option[schedulingmode] = none

    // number of non-active jobs and stages (there is no limit for active jobs   and stages):

    val retainedstages = conf.getint("spark.ui.retainedstages", default_retained_stages)

    val retainedjobs = conf.getint("spark.ui.retainedjobs", default_retained_jobs)

jobprogresslistener 實作了onjobstart、onjobend、onstagecompleted、onstagesubmitted、ontaskstart、ontaskend等方法,這些方法正是在listenerbus的驅動下,改變jobprogress-listener中的各種job、stage相關的資料。

3.4.3 sparkui的建立與初始化

sparkui的建立,見代碼清單3-18。

代碼清單3-18 sparkui的聲明

private[spark] val ui: option[sparkui] =

    if (conf.getboolean("spark.ui.enabled", true)) {

        some(sparkui.createliveui(this, conf, listenerbus, jobprogresslistener,

            env.securitymanager,appname))

        none

ui.foreach(_.bind())

可以看到如果不需要提供sparkui服務,可以将屬性spark.ui.enabled修改為false。其中createliveui實際是調用了create方法,見代碼清單3-19。

代碼清單3-19 sparkui的建立

def createliveui(

        sc: sparkcontext,

        listenerbus: sparklistenerbus,

        jobprogresslistener: jobprogresslistener,

        securitymanager: securitymanager,

        appname: string): sparkui =  {

    create(some(sc), conf, listenerbus, securitymanager, appname,

        jobprogresslistener = some(jobprogresslistener))

  }

create方法的實作參見代碼清單3-20。

代碼清單3-20 creat方法的實作

private def create(

        sc: option[sparkcontext],

        appname: string,

        basepath: string = "",

        jobprogresslistener: option[jobprogresslistener] = none): sparkui = {

    val _jobprogresslistener: jobprogresslistener = jobprogresslistener.getorelse {

        val listener = new jobprogresslistener(conf)

        listenerbus.addlistener(listener)

        listener

    val environmentlistener = new environmentlistener

    val storagestatuslistener = new storagestatuslistener

    val executorslistener = new executorslistener(storagestatuslistener)

    val storagelistener = new storagelistener(storagestatuslistener)

    listenerbus.addlistener(environmentlistener)

    listenerbus.addlistener(storagestatuslistener)

    listenerbus.addlistener(executorslistener)

    listenerbus.addlistener(storagelistener)

    new sparkui(sc, conf, securitymanager, environmentlistener, storagestatuslistener,

        executorslistener, _jobprogresslistener, storagelistener, appname, basepath)

根據代碼清單3-20,可以知道在create方法裡除了jobprogresslistener是外部傳入的之外,又增加了一些sparklistener。例如,用于對jvm參數、spark屬性、java系統屬性、classpath等進行監控的environmentlistener;用于維護executor的存儲狀态的storagestatuslistener;用于準備将executor的資訊展示在executorstab的executorslistener;用于準備将executor相關存儲資訊展示在blockmanagerui的storagelistener等。最後建立sparkui,spark ui服務預設是可以被殺掉的,通過修改屬性spark.ui.killenabled為false可以保證不被殺死。initialize方法會組織前端頁面各個tab和page的展示及布局,參見代碼清單3-21。

代碼清單3-21 sparkui的初始化

private[spark] class sparkui private (

    val sc: option[sparkcontext],

    val conf: sparkconf,

    val securitymanager: securitymanager,

    val environmentlistener: environmentlistener,

    val storagestatuslistener: storagestatuslistener,

    val executorslistener: executorslistener,

    val jobprogresslistener: jobprogresslistener,

    val storagelistener: storagelistener,

    var appname: string,

    val basepath: string)

extends webui(securitymanager, sparkui.getuiport(conf), conf, basepath, "sparkui")

with logging {

val killenabled = sc.map(_.conf.getboolean("spark.ui.killenabled", true)).getorelse(false)

/** initialize all components of the server. */

    attachtab(new jobstab(this))

    val stagestab = new stagestab(this)

    attachtab(stagestab)

    attachtab(new storagetab(this))

    attachtab(new environmenttab(this))

    attachtab(new executorstab(this))

    attachhandler(createstatichandler(sparkui.static_resource_dir, "/static"))

    attachhandler(createredirecthandler("/", "/jobs", basepath = basepath))

    attachhandler(

        createredirecthandler("/stages/stage/kill", "/stages", stagestab.handlekillrequest))

initialize()

3.4.4 spark ui的頁面布局與展示

sparkui究竟是如何實作頁面布局及展示的?jobstab展示所有job的進度、狀态資訊,這裡我們以它為例來說明。jobstab會複用sparkui的killenabled、sparkcontext、job-progresslistener,包括alljobspage和jobpage兩個頁面,見代碼清單3-22。

代碼清單3-22 jobstab的實作

private[ui] class jobstab(parent: sparkui) extends sparkuitab(parent, "jobs") {

    val sc = parent.sc

    val killenabled = parent.killenabled

    def isfairscheduler = listener.schedulingmode.exists(_ == schedulingmode.fair)

    val listener = parent.jobprogresslistener

    attachpage(new alljobspage(this))

    attachpage(new jobpage(this))

alljobspage由render方法渲染,利用jobprogresslistener中的統計監控資料生成激活、完成、失敗等狀态的job摘要資訊,并調用jobstable方法生成表格等html元素,最終使用uiutils的headersparkpage封裝好css、js、header及頁面布局等,見代碼清單3-23。

代碼清單3-23 alljobspage的實作

def render(request: httpservletrequest): seq[node] = {

    listener.synchronized {

        val activejobs = listener.activejobs.values.toseq

        val completedjobs = listener.completedjobs.reverse.toseq

        val failedjobs = listener.failedjobs.reverse.toseq

        val now = system.currenttimemillis

        val activejobstable =

            jobstable(activejobs.sortby(_.starttime.getorelse(-1l)).reverse)

        val completedjobstable =

            jobstable(completedjobs.sortby(_.endtime.getorelse(-1l)).reverse)

        val failedjobstable =

            jobstable(failedjobs.sortby(_.endtime.getorelse(-1l)).reverse)

        val summary: nodeseq =

            <div>

                <ul class="unstyled">

                    {if (starttime.isdefined) {

                        // total duration is not meaningful unless the ui is live

                        <li>

                            <strong>total duration: </strong>

                            {uiutils.formatduration(now - starttime.get)}

                        </li>

                    }}

                    <li>

                        <strong>scheduling mode: </strong>

                        {listener.schedulingmode.map(_.tostring).getorelse("unknown")}

                    </li>

                        <a href="#active"><strong>active jobs:</strong></a>

                        {activejobs.size}

                        <a href="#completed"><strong>completed jobs:</strong></a>

                        {completedjobs.size}

                        <a href="#failed"><strong>failed jobs:</strong></a>

                        {failedjobs.size}

                </ul>

            </div>

jobstable用來生成表格資料,見代碼清單3-24。

代碼清單3-24 jobstable處理表格的實作

private def jobstable(jobs: seq[jobuidata]): seq[node] = {

    val somejobhasjobgroup = jobs.exists(_.jobgroup.isdefined)

    val columns: seq[node] = {

        <th>{if (somejobhasjobgroup) "job id (job group)" else "job id"}</th>

        <th>description</th>

        <th>submitted</th>

        <th>duration</th>

        <th class="sorttable_nosort">stages: succeeded/total</th>

        <th class="sorttable_nosort">tasks (for all stages): succeeded/total</th>

    <table class="table table-bordered table-striped table-condensed sortable">

        <thead>{columns}</thead>

        <tbody>

            {jobs.map(makerow)}

        </tbody>

    </table>

表格中每行資料又是通過makerow方法渲染的,參見代碼清單3-25。

代碼清單3-25 生成表格中的行

def makerow(job: jobuidata): seq[node] = {

    val laststageinfo = option(job.stageids)

        .filter(_.nonempty)

        .flatmap { ids => listener.stageidtoinfo.get(ids.max) }

    val laststagedata = laststageinfo.flatmap { s =>

        listener.stageidtodata.get((s.stageid, s.attemptid))

    val iscomplete = job.status == jobexecutionstatus.succeeded

    val laststagename = laststageinfo.map(_.name).getorelse("(unknown stage name)")

    val laststagedescription = laststagedata.flatmap(_.description).getorelse("")

    val duration: option[long] = {

        job.starttime.map { start =>

            val end = job.endtime.getorelse(system.currenttimemillis())

        end - start

    val formattedduration = duration.map(d => uiutils.formatduration(d)).getorelse("unknown")

    val formattedsubmissiontime = job.starttime.map(uiutils.formatdate).getorelse("unknown")

    val detailurl =

        "%s/jobs/job?id=%s".format(uiutils.prependbaseuri(parent.basepath), job.jobid)

    <tr>

        <td sorttable_customkey={job.jobid.tostring}>

            {job.jobid} {job.jobgroup.map(id => s"($id)").getorelse("")}

        </td>

        <td>

            <div><em>{laststagedescription}</em></div>

            <a href={detailurl}>{laststagename}</a>

            <td sorttable_customkey={job.starttime.getorelse(-1).tostring}>

            {formattedsubmissiontime}

        <td sorttable_customkey={duration.getorelse(-1).tostring}>{formatted-duration}</td>

        <td class="stage-progress-cell">

            {job.completedstageindices.size}/{job.stageids.size - job.numskipped-stages}

            {if (job.numfailedstages > 0) s"(${job.numfailedstages} failed)"}

            {if (job.numskippedstages > 0) s"(${job.numskippedstages} skipped)"}

        <td class="progress-cell">

            {uiutils.makeprogressbar(started = job.numactivetasks, completed = job.numcompletedtasks,

            failed = job.numfailedtasks, skipped = job.numskippedtasks,

            total = job.numtasks - job.numskippedtasks)}

    </tr>

代碼清單3-22中的attachpage方法存在于jobstab的父類webuitab中,webuitab維護有arraybuffer[webuipage]的資料結構,alljobspage和jobpage将被放入此arraybuffer中,參見代碼清單3-26。

代碼清單3-26 webuitab的實作

private[spark] abstract class webuitab(parent: webui, val prefix: string) {

    val pages = arraybuffer[webuipage]()

    val name = prefix.capitalize

    /** attach a page to this tab. this prepends the page's prefix with the tab's own prefix. */

    def attachpage(page: webuipage) {

        page.prefix = (prefix + "/" + page.prefix).stripsuffix("/")

        pages += page

    /** get a list of header tabs from the parent ui. */

    def headertabs: seq[webuitab] = parent.gettabs

    def basepath: string = parent.getbasepath

jobstab建立之後,将被attachtab方法加入sparkui的arraybuffer[webuitab]中,并且通過attachpage方法,給每一個page生成org.eclipse.jetty.servlet.servletcontexthandler,最後調用attachhandler方法将servletcontexthandler綁定到sparkui,即加入到handlers :arraybuffer[servletcontexthandler]和樣例類serverinfo的roothandler(contexthandlercollection)中。sparkui繼承自webui,attachtab方法在webui中實作,參見代碼清單3-27。

代碼清單3-27 webui的實作

private[spark] abstract class webui( securitymanager: securitymanager, port: int,

        conf: sparkconf, basepath: string = "", name: string = "") extends logging {

    protected val tabs = arraybuffer[webuitab]()

    protected val handlers = arraybuffer[servletcontexthandler]()

    protected var serverinfo: option[serverinfo] = none

    protected val localhostname = utils.localhostname()

    protected val publichostname = option(system.getenv("spark_public_dns")).getorelse(localhostname)

    private val classname = utils.getformattedclassname(this)

    def getbasepath: string = basepath

    def gettabs: seq[webuitab] = tabs.toseq

    def gethandlers: seq[servletcontexthandler] = handlers.toseq

    def getsecuritymanager: securitymanager = securitymanager

    /** attach a tab to this ui, along with all of its attached pages. */

    def attachtab(tab: webuitab) {

        tab.pages.foreach(attachpage)

        tabs += tab

    /** attach a page to this ui. */

        val pagepath = "/" + page.prefix

        attachhandler(createservlethandler(pagepath,

        (request: httpservletrequest) => page.render(request), securitymanager, basepath))

    attachhandler(createservlethandler(pagepath.stripsuffix("/") + "/json",

        (request: httpservletrequest) => page.renderjson(request), security-manager, basepath))

    /** attach a handler to this ui. */

    def attachhandler(handler: servletcontexthandler) {

        handlers += handler

        serverinfo.foreach { info =>

            info.roothandler.addhandler(handler)

            if (!handler.isstarted) {

                handler.start()

由于代碼清單3-27所在的類中使用import org.apache.spark.ui.jettyutils._導入了jettyutils的靜态方法,是以createservlethandler方法實際是jettyutils 的靜态方法createservlethandler。createservlethandler實際建立了javax.servlet.http.httpservlet的匿名内部類執行個體,此執行個體實際使用(request: httpservletrequest) => page.render(request)函數參數來處理請求,進而渲染頁面呈現給使用者。有關createservlethandler的實作及jetty的相關資訊,請參閱附錄c。

3.4.5 sparkui的啟動

sparkui建立好後,需要調用父類webui的bind方法,綁定服務和端口,bind方法中主要的代碼實作如下。

serverinfo = some(startjettyserver("0.0.0.0", port, handlers, conf, name))

jettyutils的靜态方法startjettyserver的實作請參閱附錄c。最終啟動了jetty提供的服務,預設端口是4040。

3.5 hadoop相關配置及executor環境變量

3.5.1 hadoop相關配置資訊

預設情況下,spark使用hdfs作為分布式檔案系統,是以需要擷取hadoop相關配置資訊的代碼如下。

val hadoopconfiguration = sparkhadooputil.get.newconfiguration(conf)

擷取的配置資訊包括:

将amazon s3檔案系統的accesskeyid和secretaccesskey加載到hadoop的configuration;

将sparkconf中所有以spark.hadoop.開頭的屬性都複制到hadoop的configuration;

将sparkconf的屬性spark.buffer.size複制為hadoop的configuration的配置io.file.buffer.size。

如果指定了spark_yarn_mode屬性,則會使用yarnsparkhadooputil,否則預設為sparkhadooputil。

3.5.2 executor環境變量

對executor的環境變量的處理,參見代碼清單3-28。executorenvs 包含的環境變量将會在7.2.2節中介紹的注冊應用的過程中發送給master,master給worker發送排程後,worker最終使用executorenvs提供的資訊啟動executor。可以通過配置spark.executor.memory指定executor占用的記憶體大小,也可以配置系統變量spark_executor_memory或者spark_mem對其大小進行設定。

代碼清單3-28 executor環境變量的處理

private[spark] val executormemory = conf.getoption("spark.executor.memory")

        .orelse(option(system.getenv("spark_executor_memory")))

        .orelse(option(system.getenv("spark_mem")).map(warnsparkmem))

        .map(utils.memorystringtomb)

        .getorelse(512)

    // environment variables to pass to our executors.

    private[spark] val executorenvs = hashmap[string, string]()

    for { (envkey, propkey) <- seq(("spark_testing", "spark.testing"))

        value <- option(system.getenv(envkey)).orelse(option(system.getproperty (propkey)))} {

        executorenvs(envkey) = value

    option(system.getenv("spark_prepend_classes")).foreach { v =>

        executorenvs("spark_prepend_classes") = v

    // the mesos scheduler backend relies on this environment variable to set executor memory.

  executorenvs("spark_executor_memory") = executormemory + "m"

    executorenvs ++= conf.getexecutorenv

    // set spark_user for user who is running sparkcontext.

    val sparkuser = option {

        option(system.getenv("spark_user")).getorelse(system.getproperty("user.name"))

    }.getorelse {

        sparkcontext.spark_unknown_user

    executorenvs("spark_user") = sparkuser

3.6 建立任務排程器taskscheduler

taskscheduler也是sparkcontext的重要組成部分,負責任務的送出,并且請求叢集管理器對任務排程。taskscheduler也可以看做任務排程的用戶端。建立taskscheduler的代碼如下。

private[spark] var (schedulerbackend, taskscheduler) =

    sparkcontext.createtaskscheduler(this, master)

createtaskscheduler方法會根據master的配置比對部署模式,建立taskschedulerimpl,并生成不同的schedulerbackend。本章為了使讀者更容易了解spark的初始化流程,故以local模式為例,其餘模式将在第7章詳解。master比對local模式的代碼如下。

master match {

    case "local" =>

        val scheduler = new taskschedulerimpl(sc, max_local_task_failures, islocal = true)

        val backend = new localbackend(scheduler, 1)

        scheduler.initialize(backend)

        (backend, scheduler)

3.6.1 建立taskschedulerimpl

taskschedulerimpl的構造過程如下:

1)從sparkconf中讀取配置資訊,包括每個任務配置設定的cpu數、排程模式(排程模式有fair和fifo兩種,預設為fifo,可以修改屬性spark.scheduler.mode來改變)等。

2)建立taskresultgetter,它的作用是通過線程池(executors.newfixedthreadpool建立的,預設4個線程,線程名字以task-result-getter開頭,線程工廠預設是executors.default-threadfactory)對worker上的executor發送的task的執行結果進行處理。

taskschedulerimpl的實作見代碼清單3-29。

代碼清單3-29 taskschedulerimpl的實作

var dagscheduler: dagscheduler = null

var backend: schedulerbackend = null

val mapoutputtracker = sparkenv.get.mapoutputtracker

var schedulablebuilder: schedulablebuilder = null

var rootpool: pool = null

// default scheduler is fifo

private val schedulingmodeconf = conf.get("spark.scheduler.mode", "fifo")

val schedulingmode: schedulingmode = try {

    schedulingmode.withname(schedulingmodeconf.touppercase)

} catch {

    case e: java.util.nosuchelementexception =>

        throw new sparkexception(s"unrecognized spark.scheduler.mode: $scheduling-modeconf")

// this is a var so that we can reset it for testing purposes.

private[spark] var taskresultgetter = new taskresultgetter(sc.env, this)

taskschedulerimpl的排程模式有fair和fifo兩種。任務的最終排程實際都是落實到接口schedulerbackend的具體實作上的。為友善分析,我們先來看看local模式中schedulerbackend的實作localbackend。localbackend依賴于localactor與actorsystem進行消息通信。localbackend的實作參見代碼清單3-30。

代碼清單3-30 localbackend的實作

private[spark] class localbackend(scheduler: taskschedulerimpl, val totalcores: int)

    extends schedulerbackend with executorbackend {

    private val appid = "local-" + system.currenttimemillis

    var localactor: actorref = null

    override def start() {

        localactor = sparkenv.get.actorsystem.actorof(

            props(new localactor(scheduler, this, totalcores)),

            "localbackendactor")

    override def stop() {

        localactor ! stopexecutor

    override def reviveoffers() {

        localactor ! reviveoffers

    override def defaultparallelism() =

        scheduler.conf.getint("spark.default.parallelism", totalcores)

    override def killtask(taskid: long, executorid: string, interruptthread: boolean) {

        localactor ! killtask(taskid, interruptthread)

    override def statusupdate(taskid: long, state: taskstate, serializeddata: bytebuffer) {

        localactor ! statusupdate(taskid, state, serializeddata)

    override def applicationid(): string = appid

3.6.2 taskschedulerimpl的初始化

建立完taskschedulerimpl和localbackend後,對taskschedulerimpl調用方法initialize進行初始化。以預設的fifo排程為例,taskschedulerimpl的初始化過程如下:

1)使taskschedulerimpl持有localbackend的引用。

2)建立pool,pool中緩存了排程隊列、排程算法及tasksetmanager集合等資訊。

3)建立fifoschedulablebuilder,fifoschedulablebuilder用來操作pool中的排程隊列。

initialize方法的實作見代碼清單3-31。

代碼清單3-31 taskschedulerimpl的初始化

def initialize(backend: schedulerbackend) {

    this.backend = backend

    rootpool = new pool("", schedulingmode, 0, 0)

    schedulablebuilder = {

        schedulingmode match {

            case schedulingmode.fifo =>

                new fifoschedulablebuilder(rootpool)

            case schedulingmode.fair =>

                new fairschedulablebuilder(rootpool, conf)

    schedulablebuilder.buildpools()

3.7 建立和啟動dagscheduler

dagscheduler主要用于在任務正式交給taskschedulerimpl送出之前做一些準備工作,包括:建立job,将dag中的rdd劃分到不同的stage,送出stage,等等。建立dag-scheduler的代碼如下。

@volatile private[spark] var dagscheduler: dagscheduler = _

    dagscheduler = new dagscheduler(this)

dagscheduler的資料結構主要維護jobid和stageid的關系、stage、activejob,以及緩存的rdd的partitions的位置資訊,見代碼清單3-32。

代碼清單3-32 dagscheduler維護的資料結構

private[scheduler] val nextjobid = new atomicinteger(0)

private[scheduler] def numtotaljobs: int = nextjobid.get()

private val nextstageid = new atomicinteger(0)

private[scheduler] val jobidtostageids = new hashmap[int, hashset[int]]

private[scheduler] val stageidtostage = new hashmap[int, stage]

private[scheduler] val shuffletomapstage = new hashmap[int, stage]

private[scheduler] val jobidtoactivejob = new hashmap[int, activejob]

    // stages we need to run whose parents aren't done

    private[scheduler] val waitingstages = new hashset[stage]

    // stages we are running right now

    private[scheduler] val runningstages = new hashset[stage]

    // stages that must be resubmitted due to fetch failures

    private[scheduler] val failedstages = new hashset[stage]

    private[scheduler] val activejobs = new hashset[activejob]

    // contains the locations that each rdd's partitions are cached on

    private val cachelocs = new hashmap[int, array[seq[tasklocation]]]

    private val failedepoch = new hashmap[string, long]

    private val dagscheduleractorsupervisor =

        env.actorsystem.actorof(props(new dagscheduleractorsupervisor(this)))

    private val closureserializer = sparkenv.get.closureserializer.newinstance()

在構造dagscheduler的時候會調用initializeeventprocessactor方法建立dagscheduler-eventprocessactor,見代碼清單3-33。

代碼清單3-33 dagschedulereventprocessactor的初始化

    private[scheduler] var eventprocessactor: actorref = _

private def initializeeventprocessactor() {

        // blocking the thread until supervisor is started, which ensures eventprocess-actor is

        // not null before any job is submitted

        implicit val timeout = timeout(30 seconds)

        val initeventactorreply =

            dagscheduleractorsupervisor ? props(new dagschedulereventprocessactor(this))

        eventprocessactor = await.result(initeventactorreply, timeout.duration).

            asinstanceof[actorref]

initializeeventprocessactor()

這裡的dagscheduleractorsupervisor主要作為dagschedulereventprocessactor的監管者,負責生成dagschedulereventprocessactor。從代碼清單3-34可以看出,dagscheduler-actorsupervisor對于dagschedulereventprocessactor采用了akka的一對一監管政策。dag-scheduleractorsupervisor一旦生成dagschedulereventprocessactor,并注冊到actorsystem,actorsystem就會調用dagschedulereventprocessactor的prestart,taskscheduler于是就持有了dagscheduler,見代碼清單3-35。從代碼清單3-35我們還看到dag-schedulereventprocessactor所能處理的消息類型,比如jobsubmitted、beginevent、completionevent等。dagscheduler-eventprocessactor接受這些消息後會有不同的處理動作。在本章,讀者隻需要了解到這裡即可,後面章節用到時會詳細分析。

代碼清單3-34 dagscheduleractorsupervisor的監管政策

private[scheduler] class dagscheduleractorsupervisor(dagscheduler: dagscheduler)

    extends actor with logging {

    override val supervisorstrategy =

        oneforonestrategy() {

            case x: exception =>

                logerror("eventprocesseractor failed; shutting down sparkcontext", x)

                try {

                    dagscheduler.docancelalljobs()

                } catch {

                    case t: throwable => logerror("dagscheduler failed to cancel all jobs.", t)

                dagscheduler.sc.stop()

                stop

def receive = {

        case p: props => sender ! context.actorof(p)

        case _ => logwarning("received unknown message in dagscheduleractorsupervisor")

代碼清單3-35 dagschedulereventprocessactor的實作

private[scheduler] class dagschedulereventprocessactor(dagscheduler: dags-cheduler)

    override def prestart() {

        dagscheduler.taskscheduler.setdagscheduler(dagscheduler)

    /**

    * the main event loop of the dag scheduler.

    */

    def receive = {

        case jobsubmitted(jobid, rdd, func, partitions, allowlocal, callsite, listener, properties) =>

            dagscheduler.handlejobsubmitted(jobid, rdd, func, partitions, allowlocal, callsite,

                listener, properties)

        case stagecancelled(stageid) =>

            dagscheduler.handlestagecancellation(stageid)

        case jobcancelled(jobid) =>

            dagscheduler.handlejobcancellation(jobid)

        case jobgroupcancelled(groupid) =>

            dagscheduler.handlejobgroupcancelled(groupid)

        case alljobscancelled =>

            dagscheduler.docancelalljobs()

        case executoradded(execid, host) =>

            dagscheduler.handleexecutoradded(execid, host)

        case executorlost(execid) =>

            dagscheduler.handleexecutorlost(execid, fetchfailed = false)

        case beginevent(task, taskinfo) =>

            dagscheduler.handlebeginevent(task, taskinfo)

        case gettingresultevent(taskinfo) =>

            dagscheduler.handlegettaskresult(taskinfo)

        case completion @ completionevent(task, reason, _, _, taskinfo, taskmetrics) =>

            dagscheduler.handletaskcompletion(completion)

        case tasksetfailed(taskset, reason) =>

            dagscheduler.handletasksetfailed(taskset, reason)

        case resubmitfailedstages =>

            dagscheduler.resubmitfailedstages()

override def poststop() {

    // cancel any active jobs in poststop hook

    dagscheduler.cleanupafterschedulerstop()

3.8 taskscheduler的啟動

3.6節介紹了任務排程器taskscheduler的建立,要想taskscheduler發揮作用,必須要啟動它,代碼如下。

taskscheduler.start()

taskscheduler在啟動的時候,實際調用了backend的start方法。

override def start() {

        backend.start()

以localbackend為例,啟動localbackend時向actorsystem注冊了localactor,見代碼清單3-30所示。

3.8.1 建立localactor

建立localactor的過程主要是建構本地的executor,見代碼清單3-36。

代碼清單3-36 localactor的實作

private[spark] class localactor(scheduler: taskschedulerimpl, executorbackend: localbackend,

    private val totalcores: int) extends actor with actorlogreceive with logging {

    import context.dispatcher   // to use akka's scheduler.scheduleonce()

    private var freecores = totalcores

    private val localexecutorid = sparkcontext.driver_identifier

    private val localexecutorhostname = "localhost"

    val executor = new executor(

        localexecutorid, localexecutorhostname, scheduler.conf.getall, totalcores, islocal = true)

    override def receivewithlogging = {

        case reviveoffers =>

            reviveoffers()

        case statusupdate(taskid, state, serializeddata) =>

            scheduler.statusupdate(taskid, state, serializeddata)

            if (taskstate.isfinished(state)) {

                freecores += scheduler.cpus_per_task

                reviveoffers()

        case killtask(taskid, interruptthread) =>

            executor.killtask(taskid, interruptthread)

        case stopexecutor =>

            executor.stop()

executor的建構,見代碼清單3-37,主要包括以下步驟。

1)建立并注冊executorsource。executorsource是做什麼的呢?筆者将在3.8.2節詳細介紹。

2)擷取sparkenv。如果是非local模式,worker上的coarsegrainedexecutorbackend向driver上的coarsegrainedexecutorbackend注冊executor時,則需要建立sparkenv。可以修改屬性spark.executor.port(預設為0,表示随機生成)來配置executor中的actorsystem的端口号。

3)建立并注冊executoractor。executoractor負責接受發送給executor的消息。

4)urlclassloader的建立。為什麼需要建立這個classloader?在非local模式中,driver或者worker上都會有多個executor,每個executor都設定自身的urlclassloader,用于加載任務上傳的jar包中的類,有效對任務的類加載環境進行隔離。

5)建立executor執行task的線程池。此線程池用于執行任務。

6)啟動executor的心跳線程。此線程用于向driver發送心跳。

此外,還包括akka發送消息的幀大小(10 485 760位元組)、結果總大小的位元組限制(1 073 741 824位元組)、正在運作的task的清單、設定serializer的預設classloader為建立的classloader等。

代碼清單3-37 executor的建構

    val executorsource = new executorsource(this, executorid)

private val env = {

        if (!islocal) {

            val port = conf.getint("spark.executor.port", 0)

            val _env = sparkenv.createexecutorenv(

                conf, executorid, executorhostname, port, numcores, islocal, actorsystem)

            sparkenv.set(_env)

            _env.metricssystem.registersource(executorsource)

            _env.blockmanager.initialize(conf.getappid)

            _env

        } else {

            sparkenv.get

    private val executoractor = env.actorsystem.actorof(

        props(new executoractor(executorid)), "executoractor")

    private val urlclassloader = createclassloader()

    private val replclassloader = addreplclassloaderifneeded(urlclassloader)

    env.serializer.setdefaultclassloader(urlclassloader)

    private val akkaframesize = akkautils.maxframesizebytes(conf)

    private val maxresultsize = utils.getmaxresultsize(conf)

    val threadpool = utils.newdaemoncachedthreadpool("executor task launch worker")

    private val runningtasks = new concurrenthashmap[long, taskrunner]

    startdriverheartbeater()

3.8.2 executorsource的建立與注冊

executorsource用于測量系統。通過metricregistry的register方法注冊計量,這些計量資訊包括threadpool.activetasks、threadpool.completetasks、threadpool.currentpool_size、thread-pool.maxpool_size、filesystem.hdfs.write_bytes、filesystem.hdfs.read_ops、filesystem.file.write_bytes、filesystem.hdfs.largeread_ops、filesystem.hdfs.write_ops等,executorsource的實作見代碼清單3-38。metric接口的具體實作,參考附錄d。

代碼清單3-38 executorsource的實作

private[spark] class executorsource(val executor: executor, executorid: string) extends source {

    private def filestats(scheme: string) : option[filesystem.statistics] =

        filesystem.getallstatistics().filter(s => s.getscheme.equals(scheme)).headoption

    private def registerfilesystemstat[t](

            scheme: string, name: string, f: filesystem.statistics => t, defaultvalue: t) = {

        metricregistry.register(metricregistry.name("filesystem", scheme, name), new gauge[t] {

            override def getvalue: t = filestats(scheme).map(f).getorelse (defaultvalue)

        })

    override val metricregistry = new metricregistry()

    override val sourcename = "executor"

metricregistry.register(metricregistry.name("threadpool", "activetasks"), new gauge[int] {

        override def getvalue: int = executor.threadpool.getactivecount()

    })

    metricregistry.register(metricregistry.name("threadpool", "completetasks"), new gauge[long] {

        override def getvalue: long = executor.threadpool.getcompletedtaskcount()

    metricregistry.register(metricregistry.name("threadpool", "currentpool_size"), new gauge[int] {

        override def getvalue: int = executor.threadpool.getpoolsize()

    metricregistry.register(metricregistry.name("threadpool", "maxpool_size"), new gauge[int] {

        override def getvalue: int = executor.threadpool.getmaximumpoolsize()

    // gauge for file system stats of this executor

    for (scheme <- array("hdfs", "file")) {

        registerfilesystemstat(scheme, "read_bytes", _.getbytesread(), 0l)

        registerfilesystemstat(scheme, "write_bytes", _.getbyteswritten(), 0l)

        registerfilesystemstat(scheme, "read_ops", _.getreadops(), 0)

        registerfilesystemstat(scheme, "largeread_ops", _.getlargereadops(), 0)

        registerfilesystemstat(scheme, "write_ops", _.getwriteops(), 0)

建立完executorsource後,調用metricssystem的registersource方法将executorsource注冊到metricssystem。registersource方法使用metricregistry的register方法,将source注冊到metricregistry,見代碼清單3-39。關于metricregistry,具體參閱附錄d。

代碼清單3-39 metricssystem注冊source的實作

def registersource(source: source) {

    sources += source

        val regname = buildregistryname(source)

        registry.register(regname, source.metricregistry)

        case e: illegalargumentexception => loginfo("metrics already registered", e)

3.8.3 executoractor的建構與注冊

executoractor很簡單,當接收到sparkui發來的消息時,将所有線程的棧資訊發送回去,代碼實作如下。

override def receivewithlogging = {

    case triggerthreaddump =>

        sender ! utils.getthreaddump()

3.8.4 spark自身classloader的建立

擷取要建立的classloader的父加載器currentloader,然後根據currentjars生成url數組,spark.files.userclasspathfirst屬性指定加載類時是否先從使用者的classpath下加載,最後建立executorurlclassloader或者childexecutorurlclassloader,見代碼清單3-40。

代碼清單3-40 spark自身classloader的建立

private def createclassloader(): mutableurlclassloader = {

    val currentloader = utils.getcontextorsparkclassloader

    val urls = currentjars.keyset.map { uri =>

        new file(uri.split("/").last).touri.tourl

    }.toarray

    val userclasspathfirst = conf.getboolean("spark.files.userclasspathfirst", false)

    userclasspathfirst match {

        case true => new childexecutorurlclassloader(urls, currentloader)

        case false => new executorurlclassloader(urls, currentloader)

utils.getcontextorsparkclassloader的實作見附錄a。executorurlclassloader或者child-executorurlclassloader實際上都繼承了urlclassloader,見代碼清單3-41。

代碼清單3-41 childexecutorurlclassloader和executorlirlclassloader的實作

private[spark] class childexecutorurlclassloader(urls: array[url], parent: classloader)

    extends mutableurlclassloader {

    private object userclassloader extends urlclassloader(urls, null){

        override def addurl(url: url) {

            super.addurl(url)

    override def findclass(name: string): class[_] = {

        super.findclass(name)

private val parentclassloader = new parentclassloader(parent)

override def findclass(name: string): class[_] = {

        userclassloader.findclass(name)

        case e: classnotfoundexception => {

            parentclassloader.loadclass(name)

    def addurl(url: url) {

        userclassloader.addurl(url)

    def geturls() = {

        userclassloader.geturls()

private[spark] class executorurlclassloader(urls: array[url], parent: classloader)

    extends urlclassloader(urls, parent) with mutableurlclassloader {

    override def addurl(url: url) {

        super.addurl(url)

如果需要repl互動,還會調用addreplclassloaderifneeded建立replclassloader,見代碼清單3-42。

代碼清單3-42 addreplclassloaderifneeded的實作

private def addreplclassloaderifneeded(parent: classloader): classloader = {

    val classuri = conf.get("spark.repl.class.uri", null)

    if (classuri != null) {

        loginfo("using repl class uri: " + classuri)

        val userclasspathfirst: java.lang.boolean =

        conf.getboolean("spark.files.userclasspathfirst", false)

        val klass = class.forname("org.apache.spark.repl.executorclassloader")

            .asinstanceof[class[_ <: classloader]]

        val constructor = klass.getconstructor(classof[sparkconf], classof[string],

            classof[classloader], classof[boolean])

        constructor.newinstance(conf, classuri, parent, userclasspathfirst)

        case _: classnotfoundexception =>

            logerror("could not find org.apache.spark.repl.executorclassloader on classpath!")

            system.exit(1)

            null

        parent

3.8.5 啟動executor的心跳線程

executor的心跳由startdriverheartbeater啟動,見代碼清單3-43。executor心跳線程的間隔由屬性spark.executor.heartbeatinterval配置,預設是10 000毫秒。此外,逾時時間是30秒,逾時重試次數是3次,重試間隔是3000毫秒,使用actorsystem.actorselection (url)方法查找到比對的actor引用, url是akka.tcp://sparkdriver@ $driverhost:$driverport/user/heartbeat-receiver,最終建立一個運作過程中,每次會休眠10 000~20 000毫秒的線程。此線程從runningtasks擷取最新的有關task的測量資訊,将其與executorid、blockmanagerid封裝為heartbeat消息,向heartbeatreceiver發送heartbeat消息。

代碼清單3-43 啟動executor的心跳線程

def startdriverheartbeater() {

    val interval = conf.getint("spark.executor.heartbeatinterval", 10000)

    val timeout = akkautils.lookuptimeout(conf)

    val retryattempts = akkautils.numretries(conf)

    val retryintervalms = akkautils.retrywaitms(conf)

    val heartbeatreceiverref = akkautils.makedriverref("heartbeatreceiver", conf,env.actorsystem)

    val t = new thread() {

            // sleep a random interval so the heartbeats don't end up in sync

            thread.sleep(interval + (math.random * interval).asinstanceof[int])

            while (!isstopped) {

                val tasksmetrics = new arraybuffer[(long, taskmetrics)]()

                val curgctime = gctime

                for (taskrunner <- runningtasks.values()) {

                    if (!taskrunner.attemptedtask.isempty) {

                        option(taskrunner.task).flatmap(_.metrics).foreach { metrics =>

                            metrics.updateshufflereadmetrics

                            metrics.jvmgctime = curgctime - taskrunner.startgctime

                            if (islocal) {

                                val copiedmetrics = utils.deserialize[taskmetrics](utils.serialize(metrics))

                                tasksmetrics += ((taskrunner.taskid, copiedmetrics))

                        } else {

                            // it will be copied by serialization

                            tasksmetrics += ((taskrunner.taskid, metrics))

                        }

                    }

            val message = heartbeat(executorid, tasksmetrics.toarray, env.blockmanager.blockmanagerid)

            try {

                val response = akkautils.askwithreply[heartbeatresponse](message, heartbeatreceiverref,

                    retryattempts, retryintervalms, timeout)

                if (response.reregisterblockmanager) {

                    logwarning("told to re-register on heartbeat")

                    env.blockmanager.reregister()

            } catch {

                case nonfatal(t) => logwarning("issue communicating with driver in heartbeater", t)

thread.sleep(interval)

    t.setdaemon(true)

    t.setname("driver heartbeater")

    t.start()

這個心跳線程的作用是什麼呢?其作用有兩個:

更新正在處理的任務的測量資訊;

通知blockmanagermaster,此executor上的blockmanager依然活着。

下面對心跳線程的實作詳細分析下,讀者可以自行選擇是否需要閱讀。

初始化taskschedulerimpl後會建立心跳接收器heartbeatreceiver。heartbeatreceiver接收所有配置設定給目前driver application的executor的心跳,并将task、task計量資訊、心跳等交給taskschedulerimpl和dagscheduler作進一步處理。建立心跳接收器的代碼如下。

private val heartbeatreceiver = env.actorsystem.actorof(

    props(new heartbeatreceiver(taskscheduler)), "heartbeatreceiver")

heartbeatreceiver在收到心跳消息後,會調用taskscheduler的executorheartbeatreceived方法,代碼如下。

    case heartbeat(executorid, taskmetrics, blockmanagerid) =>

        val response = heartbeatresponse(

            !scheduler.executorheartbeatreceived(executorid, taskmetrics, blockmanagerid))

        sender ! response

executorheartbeatreceived的實作代碼如下。

val metricswithstageids: array[(long, int, int, taskmetrics)] = synchronized {

    taskmetrics.flatmap { case (id, metrics) =>

        taskidtotasksetid.get(id)

            .flatmap(activetasksets.get)

            .map(tasksetmgr => (id, tasksetmgr.stageid, tasksetmgr.taskset.attempt, metrics))

dagscheduler.executorheartbeatreceived(execid, metricswithstageids, blockmanagerid)

這段程式通過周遊taskmetrics,依據taskidtotasksetid和activetasksets找到taskset-manager。然後将taskid、tasksetmanager.stageid、tasksetmanager .taskset.attempt、taskmetrics封裝到類型為array[(long, int, int, taskmetrics)]的數組metricswithstageids中。最後調用了dag-scheduler的executorheartbeatreceived方法,其實作如下。

listenerbus.post(sparklistenerexecutormetricsupdate(execid, taskmetrics))

implicit val timeout = timeout(600 seconds)

await.result(

    blockmanagermaster.driveractor ? blockmanagerheartbeat(blockmanagerid),

    timeout.duration).asinstanceof[boolean]

dagscheduler将executorid、metricswithstageids封裝為sparklistenerexecutormetricsupdate事件,并post到listenerbus中,此事件用于更新stage的各種測量資料。最後給blockmanagermaster持有的blockmanagermasteractor發送blockmanagerheartbeat消息。blockmanagermasteractor在收到消息後會比對執行heartbeatreceived方法(參見4.3.1節)。heartbeatreceived最終更新blockmanagermaster對blockmanger的最後可見時間(即更新block-managerid對應的blockmanagerinfo的_lastseenms,見代碼清單3-44)。

代碼清單3-44 blockmanagermasteractor的心跳處理

private def heartbeatreceived(blockmanagerid: blockmanagerid): boolean = {

    if (!blockmanagerinfo.contains(blockmanagerid)) {

        blockmanagerid.isdriver && !islocal

        blockmanagerinfo(blockmanagerid).updatelastseenms()

        true

local模式下executor的心跳通信過程,可以用圖3-3來表示。

在非local模式中,executor發送心跳的過程是一樣的,主要的差別是executor程序與driver不在同一個程序,甚至不在同一個節點上。

接下來會初始化塊管理器blockmanager,代碼如下。

圖3-3 executor的心跳通信過程

env.blockmanager.initialize(applicationid)

具體的初始化過程,請參閱第4章。

3.9 啟動測量系統metricssystem

metricssystem使用codahale提供的第三方測量倉庫metrics,有關metrics的具體資訊可以參考附錄d。metricssystem中有三個概念:

instance:指定了誰在使用測量系統;

source:指定了從哪裡收集測量資料;

sink:指定了往哪裡輸出測量資料。

spark按照instance的不同,區分為master、worker、application、driver和executor。

spark目前提供的sink有consolesink、csvsink、jmxsink、metricsservlet、graphitesink等。

spark中使用metricsservlet作為預設的sink。

metricssystem的啟動代碼如下。

val metricssystem = env.metricssystem

    metricssystem.start()

metricssystem的啟動過程包括以下步驟:

1)注冊sources;

2)注冊sinks;

3)給sinks增加jetty的servletcontexthandler。

metricssystem啟動完畢後,會周遊與sinks有關的servletcontexthandler,并調用attach-handler将它們綁定到spark ui上。

metricssystem.getservlethandlers.foreach(handler => ui.foreach(_.attachhandler (handler)))

3.9.1 注冊sources

registersources方法用于注冊sources,告訴測量系統從哪裡收集測量資料,它的實作見代碼清單3-45。注冊sources的過程分為以下步驟:

1)從metricsconfig擷取driver的properties,預設為建立metricssystem的過程中解析的{sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet, sink.servlet.path=/metrics/json}。

2)用正則比對driver的properties中以source.開頭的屬性。然後将屬性中的source反射得到的執行個體加入arraybuffer[source]。

3)将每個source的metricregistry(也是metricset的子類型)注冊到concurrent-map<string, metric> metrics。這裡的registersource方法已在3.8.2節講解過。

代碼清單3-45 metricssystem注冊sources的實作

private def registersources() {

    val instconfig = metricsconfig.getinstance(instance)

    val sourceconfigs = metricsconfig.subproperties(instconfig, metricssystem.source_regex)

    // register all the sources related to instance

    sourceconfigs.foreach { kv =>

        val classpath = kv._2.getproperty("class")

            val source = class.forname(classpath).newinstance()

            registersource(source.asinstanceof[source])

            case e: exception => logerror("source class " + classpath + " cannot be instantiated", e)

3.9.2 注冊sinks

registersinks方法用于注冊sinks,即告訴測量系統metricssystem往哪裡輸出測量資料,它的實作見代碼清單3-46。注冊sinks的步驟如下:

1)從driver的properties中用正則比對以sink.開頭的屬性,如{sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet, sink.servlet.path=/metrics/json},将其轉換為map(servlet -> {class=org.apache.spark.metrics.sink.metricsservlet, path=/metrics/json})。

2)将子屬性class對應的類metricsservlet反射得到metricsservlet執行個體。如果屬性的key是servlet,将其設定為metricsservlet;如果是sink,則加入到arraybuffer[sink]中。

代碼清單3-46 metricssystem注冊sinks的實作

private def registersinks() {

    val sinkconfigs = metricsconfig.subproperties(instconfig, metricssystem.sink_regex)

    sinkconfigs.foreach { kv =>

        if (null != classpath) {

                val sink = class.forname(classpath)

                .getconstructor(classof[properties], classof[metricregistry], classof[securitymanager])

                .newinstance(kv._2, registry, securitymgr)

            if (kv._1 == "servlet") {

                metricsservlet = some(sink.asinstanceof[metricsservlet])

            } else {

                sinks += sink.asinstanceof[sink]

                case e: exception => logerror("sink class "+ classpath + " cannot be instantialized",e)

3.9.3 給sinks增加jetty的servletcontexthandler

為了能夠在sparkui(網頁)通路到測量資料,是以需要給sinks增加jetty的servlet-contexthandler,這裡主要用到metricssystem的getservlethandlers方法實作如下。

def getservlethandlers = {

    require(running, "can only call getservlethandlers on a running metricssystem")

    metricsservlet.map(_.gethandlers).getorelse(array())

可以看到調用了metricsservlet的gethandlers,其實作如下。

def gethandlers = array[servletcontexthandler](

    createservlethandler(servletpath,

        new servletparams(request => getmetricssnapshot(request), "text/json"), securitymgr)

)

最終生成處理/metrics/json請求的servletcontexthandler,而請求的真正處理由get-metricssnapshot方法,利用fastjson解析。生成的servletcontexthandler通過sparkui的attachhandler方法,也被綁定到sparkui(creatservlethandler與attachhandler方法在3.4.4節詳細講述過)。最終我們可以使用以下這些位址來通路測量資料。

http://localhost:4040/metrics/applications/json。

http://localhost:4040/metrics/json。

http://localhost:4040/metrics/master/json。

3.10 建立和啟動executorallocationmanager

executorallocationmanager用于對已配置設定的executor進行管理,建立和啟動executor-allocationmanager的代碼如下。

private[spark] val executorallocationmanager: option[executorallocationmanager] =

    if (conf.getboolean("spark.dynamicallocation.enabled", false)) {

        some(new executorallocationmanager(this, listenerbus, conf))

executorallocationmanager.foreach(_.start())

預設情況下不會建立executorallocationmanager,可以修改屬性spark.dynamicallocation.enabled為true來建立。executorallocationmanager可以設定動态配置設定最小executor數量、動态配置設定最大executor數量、每個executor可以運作的task數量等配置資訊,并對配置資訊進行校驗。start方法将executorallocationlistener加入listenerbus中,executorallocationlistener通過監聽listenerbus裡的事件,動态添加、删除executor。并且通過thread不斷添加executor,周遊executor,将逾時的executor殺掉并移除。executorallocationlistener的實作與其他sparklistener類似,不再贅述。executorallocationmanager的關鍵代碼見代碼清單3-47。

代碼清單3-47 executorallocationmanager的關鍵代碼

private val intervalmillis: long = 100

private var clock: clock = new realclock

private val listener = new executorallocationlistener

def start(): unit = {

    listenerbus.addlistener(listener)

    startpolling()

private def startpolling(): unit = {

    val t = new thread {

        override def run(): unit = {

            while (true) {

                    schedule()

                    case e: exception => logerror("exception in dynamic executor allocation thread!", e)

                thread.sleep(intervalmillis)

    t.setname("spark-dynamic-executor-allocation")

根據3.4.1節的内容,我們知道listenerbus内置了線程listenerthread,此線程不斷從eventqueue中拉出事件對象,調用監聽器的監聽方法。要啟動此線程,需要調用listenerbus的start方法,代碼如下。

  listenerbus.start()

3.11 contextcleaner的建立與啟動

contextcleaner用于清理那些超出應用範圍的rdd、shuffledependency和broadcast對象。由于配置屬性spark.cleaner.referencetracking預設是true,是以會構造并啟動contextcleaner,代碼如下。

private[spark] val cleaner: option[contextcleaner] = {

    if (conf.getboolean("spark.cleaner.referencetracking", true)) {

        some(new contextcleaner(this))

cleaner.foreach(_.start())

contextcleaner的組成如下:

referencequeue:緩存頂級的anyref引用;

referencebuffer:緩存anyref的虛引用;

listeners:緩存清理工作的監聽器數組;

cleaningthread:用于具體清理工作的線程。

contextcleaner的工作原理和listenerbus一樣,也采用監聽器模式,由線程來處理,此線程實際隻是調用keepcleaning方法。keepcleaning的實作見代碼清單3-48。

代碼清單3-48 keep cleaning的實作

private def keepcleaning(): unit = utils.loguncaughtexceptions {

    while (!stopped) {

            val reference = option(referencequeue.remove(contextcleaner.ref_queue_poll_timeout))

                .map(_.asinstanceof[cleanuptaskweakreference])

            // synchronize here to avoid being interrupted on stop()

            synchronized {

                reference.map(_.task).foreach { task =>

                logdebug("got cleaning task " + task)

                referencebuffer -= reference.get

                task match {

                    case cleanrdd(rddid) =>

                        docleanuprdd(rddid, blocking = blockoncleanuptasks)

                    case cleanshuffle(shuffleid) =>

                        docleanupshuffle(shuffleid, blocking = blockonshufflecleanuptasks)

                    case cleanbroadcast(broadcastid) =>

                        docleanupbroadcast(broadcastid, blocking = blockoncleanuptasks)

            case ie: interruptedexception if stopped => // ignore

            case e: exception => logerror("error in cleaning thread", e)

3.12 spark環境更新

在sparkcontext的初始化過程中,可能對其環境造成影響,是以需要更新環境,代碼如下。

postenvironmentupdate()

postapplicationstart()

sparkcontext初始化過程中,如果設定了spark.jars屬性, spark.jars指定的jar包将由addjar方法加入httpfileserver的jardir變量指定的路徑下。spark.files指定的檔案将由addfile方法加入httpfileserver的filedir變量指定的路徑下。見代碼清單3-49。

代碼清單3-49 依賴檔案處理

val jars: seq[string] =

    conf.getoption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toseq.flatten

val files: seq[string] =

    conf.getoption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toseq.flatten

// add each jar given through the constructor

    if (jars != null) {

        jars.foreach(addjar)

    if (files != null) {

        files.foreach(addfile)

httpfileserver的addfile和addjar方法,見代碼清單3-50。

代碼清單3-50 httpfileserver提供對依賴檔案的通路

def addfile(file: file) : string = {

    addfiletodir(file, filedir)

    serveruri + "/files/" + file.getname

def addjar(file: file) : string = {

    addfiletodir(file, jardir)

    serveruri + "/jars/" + file.getname

def addfiletodir(file: file, dir: file) : string = {

    if (file.isdirectory) {

        throw new illegalargumentexception(s"$file cannot be a directory.")

    files.copy(file, new file(dir, file.getname))

    dir + "/" + file.getname

postenvironmentupdate的實作見代碼清單3-51,其處理步驟如下:

1)通過調用sparkenv的方法environmentdetails最終影響環境的jvm參數、spark 屬性、系統屬性、classpath等,參見代碼清單3-52。

2)生成事件sparklistenerenvironmentupdate,并post到listenerbus,此事件被environ-mentlistener監聽,最終影響environmentpage頁面中的輸出内容。

代碼清單3-51 postenvironmentupdate的實作

private def postenvironmentupdate() {

    if (taskscheduler != null) {

        val schedulingmode = getschedulingmode.tostring

        val addedjarpaths = addedjars.keys.toseq

        val addedfilepaths = addedfiles.keys.toseq

        val environmentdetails =

            sparkenv.environmentdetails(conf, schedulingmode, addedjarpaths, addedfilepaths)

        val environmentupdate = sparklistenerenvironmentupdate(environmentdetails)

        listenerbus.post(environmentupdate)

代碼清單3-52 environmentdetails的實作

val jvminformation = seq(

    ("java version", s"$javaversion ($javavendor)"),

    ("java home", javahome),

    ("scala version", versionstring)

).sorted

val schedulermode =

    if (!conf.contains("spark.scheduler.mode")) {

        seq(("spark.scheduler.mode", schedulingmode))

        seq[(string, string)]()

val sparkproperties = (conf.getall ++ schedulermode).sorted

// system properties that are not java classpaths

val systemproperties = utils.getsystemproperties.toseq

val otherproperties = systemproperties.filter { case (k, _) =>

    k != "java.class.path" && !k.startswith("spark.")

}.sorted

// class paths including all added jars and files

val classpathentries = javaclasspath

    .split(file.pathseparator)

    .filternot(_.isempty)

    .map((_, "system classpath"))

val addedjarsandfiles = (addedjars ++ addedfiles).map((_, "added by user"))

val classpaths = (addedjarsandfiles ++ classpathentries).sorted

map[string, seq[(string, string)]](

    "jvm information" -> jvminformation,

    "spark properties" -> sparkproperties,

    "system properties" -> otherproperties,

    "classpath entries" -> classpaths)

postapplicationstart方法很簡單,隻是向listenerbus發送了sparklistenerapplicationstart事件,代碼如下。

listenerbus.post(sparklistenerapplicationstart(appname, some(applicationid), starttime, sparkuser))

3.13 建立dagschedulersource和blockmanagersource

在建立dagschedulersource、blockmanagersource之前首先調用taskscheduler的post-starthook方法,其目的是為了等待backend就緒,見代碼清單3-53。poststarthook的實作見代碼清單3-54。

建立dagschedulersource和blockmanagersource的過程類似于executorsource,隻不過dagschedulersource測量的資訊是stage. failedstages、stage. runningstages、stage. waiting-stages、stage. alljobs、stage. activejobs,blockmanagersource測量的資訊是memory. maxmem_mb、memory. remainingmem_mb、memory. memused_mb、memory. diskspace-used_mb。

代碼清單3-53 建立dagschedulersource和blockmanagersource

    taskscheduler.poststarthook()

    private val dagschedulersource = new dagschedulersource(this.dagscheduler)

    private val blockmanagersource = new blockmanagersource(sparkenv.get.blockmanager)

private def initdrivermetrics() {

    sparkenv.get.metricssystem.registersource(dagschedulersource)

    sparkenv.get.metricssystem.registersource(blockmanagersource)

initdrivermetrics()

代碼清單3-54 poststarthook的實作

override def poststarthook() {

        waitbackendready()

private def waitbackendready(): unit = {

    if (backend.isready) {

        return

    while (!backend.isready) {

        synchronized {

            this.wait(100)

3.14 将sparkcontext标記為激活

sparkcontext初始化的最後将目前sparkcontext的狀态從contextbeingconstructed(正在建構中)改為activecontext(已激活),代碼如下。

sparkcontext.setactivecontext(this, allowmultiplecontexts)

setactivecontext方法的實作如下。

private[spark] def setactivecontext(

        allowmultiplecontexts: boolean): unit = {

    spark_context_constructor_lock.synchronized {

        assertnoothercontextisrunning(sc, allowmultiplecontexts)

        contextbeingconstructed = none

        activecontext = some(sc)

3.15 小結

回顧本章, scala與akka的基于actor的并發程式設計模型給人的印象深刻。listenerbus對于監聽器模式的經典應用看來并不複雜,希望讀者朋友能應用到自己的産品開發中去。此外,使用netty所提供的異步網絡架構建構的block傳輸服務,基于jetty建構的内嵌web服務(http檔案伺服器和sparkui),基于codahale提供的第三方測量倉庫建立的測量系統,executor中的心跳實作等内容,都值得借鑒。