天天看點

《深入了解Spark:核心思想與源碼分析》——3.2節建立執行環境SparkEnv

本節書摘來自華章社群《深入了解spark:核心思想與源碼分析》一書中的第3章,第3.2節建立執行環境sparkenv,作者耿嘉安,更多章節内容可以通路雲栖社群“華章社群”公衆号檢視

3.2 建立執行環境sparkenv

sparkenv是spark的執行環境對象,其中包括衆多與executor執行相關的對象。由于在local模式下driver會建立executor,local-cluster部署模式或者standalone部署模式下worker另起的coarsegrainedexecutorbackend程序中也會建立executor,是以sparkenv存在于driver或者coarsegrainedexecutorbackend程序中。建立sparkenv 主要使用sparkenv的createdriverenv,sparkenv.createdriverenv方法有三個參數:conf、islocal和 listenerbus。

上面代碼中的conf是對sparkconf的複制,islocal辨別是否是單機模式,listenerbus采用監聽器模式維護各類事件的處理,在3.4.1節會詳細介紹。

sparkenv的方法createdriverenv最終調用create建立sparkenv。sparkenv的構造步驟如下:

1)建立安全管理器securitymanager;

2)建立基于akka的分布式消息系統actorsystem;

3)建立map任務輸出跟蹤器mapoutputtracker;

4)執行個體化shufflemanager;

5)建立shufflememorymanager;

6)建立塊傳輸服務blocktransferservice;

7)建立blockmanagermaster;

8)建立塊管理器blockmanager;

9)建立廣播管理器broadcastmanager;

10)建立緩存管理器cachemanager;

11)建立http檔案伺服器httpfileserver;

12)建立測量系統metricssystem;

13)建立sparkenv。

3.2.1 安全管理器securitymanager

securitymanager主要對權限、賬号進行設定,如果使用hadoop yarn作為叢集管理器,則需要使用證書生成 secret key登入,最後給目前系統設定預設的密碼認證執行個體,此執行個體采用匿名内部類實作,參見代碼清單3-2。

代碼清單3-2 securitymanager的實作

private val secretkey = generatesecretkey()

3.2.2 基于akka的分布式消息系統actorsystem

actorsystem是spark中最基礎的設施,spark既使用它發送分布式消息,又用它實作并發程式設計。消息系統可以實作并發?要解釋清楚這個問題,首先應該簡單介紹下scala語言的actor并發程式設計模型:scala認為java線程通過共享資料以及通過鎖來維護共享資料的一緻性是糟糕的做法,容易引起鎖的争用,降低并發程式的性能,甚至會引入死鎖的問題。在scala中隻需要自定義類型繼承actor,并且提供act方法,就如同java裡實作runnable接口,需要實作run方法一樣。但是不能直接調用act方法,而是通過發送消息的方式(scala發送消息是異步的)傳遞資料。如:

actor ! message

akka是actor程式設計模型的進階類庫,類似于jdk 1.5之後越來越豐富的并發工具包,簡化了程式員并發程式設計的難度。actorsystem便是akka提供的用于建立分布式消息通信系統的基礎類。akka的具體資訊見附錄b。

正是因為actor輕量級的并發程式設計、消息發送以及actorsystem支援分布式消息發送等特點,spark選擇了actorsystem。

sparkenv中建立actorsystem時用到了akkautils工具類,見代碼清單3-3。akkautils.createactorsystem方法用于啟動actorsystem,見代碼清單3-4。akkautils使用了utils的靜态方法startserviceonport, startserviceonport最終會回調方法startservice: int => (t, int),此處的startservice實際是方法docreateactorsystem。真正啟動actorsystem是由docreate-actorsystem方法完成的,docreateactorsystem的具體實作細節請見附錄b。spark的driver中akka的預設通路位址是akka://sparkdriver,spark的executor中akka的預設通路位址是akka:// sparkexecutor。如果不指定actorsystem的端口,那麼所有節點的actorsystem端口在每次啟動時随機産生。關于startserviceonport的實作,請見附錄a。

代碼清單3-3 akkautils工具類建立和啟動actorsystem

3.2.3 map任務輸出跟蹤器mapoutputtracker

mapoutputtracker用于跟蹤map階段任務的輸出狀态,此狀态便于reduce階段任務擷取位址及中間輸出結果。每個map任務或者reduce任務都會有其唯一辨別,分别為mapid和reduceid。每個reduce任務的輸入可能是多個map任務的輸出,reduce會到各個map任務的所在節點上拉取block,這一過程叫做shuffle。每批shuffle過程都有唯一的辨別shuffleid。

這裡先介紹下mapoutputtrackermaster。mapoutputtrackermaster内部使用mapstatuses:timestampedhashmap[int, array[mapstatus]]來維護跟蹤各個map任務的輸出狀态。其中key對應shuffleid,array存儲各個map任務對應的狀态資訊mapstatus。由于mapstatus維護了map輸出block的位址blockmanagerid,是以reduce任務知道從何處擷取map任務的中間輸出。mapoutputtrackermaster還使用cachedserializedstatuses:timestampedhashmap[int, array[byte]]維護序列化後的各個map任務的輸出狀态。其中key對應shuffleid,array存儲各個序列化mapstatus生成的位元組數組。

driver和executor處理mapoutputtrackermaster的方式有所不同。

如果目前應用程式是driver,則建立mapoutputtrackermaster,然後建立mapoutputtrackermasteractor,并且注冊到actorsystem中。

如果目前應用程式是executor,則建立mapoutputtrackerworker,并從actorsystem中找到mapoutputtrackermasteractor。

無論是driver還是executor,最後都由mapoutputtracker的屬性trackeractor持有mapoutputtrackermasteractor的引用,參見代碼清單3-5。

代碼清單3-5 registerorlookup方法用于查找或者注冊actor的實作

在後面章節大家會知道map任務的狀态正是由executor向持有的mapoutputtracker-masteractor發送消息,将map任務狀态同步到mapoutputtracker的mapstatuses和cached-serializedstatuses的。executor究竟是如何找到mapoutputtrackermasteractor的?registerorlookup方法通過調用akkautils.makedriverref找到mapoutputtrackermasteractor,實際正是利用actorsystem提供的分布式消息機制實作的,具體細節參見附錄b。這裡第一次使用到了akka提供的功能,以後大家會漸漸感覺到使用akka的便捷。

3.2.4 執行個體化shufflemanager

shufflemanager負責管理本地及遠端的block資料的shuffle操作。shufflemanager預設為通過反射方式生成的sortshufflemanager的執行個體,可以修改屬性spark.shuffle.manager為hash來顯式控制使用hashshufflemanager。sortshufflemanager通過持有的indexshuffleblockmanager間接操作blockmanager中的diskblockmanager将map結果寫入本地,并根據shuffleid、mapid寫入索引檔案,也能通過mapoutputtrackermaster中維護的mapstatuses從本地或者其他遠端節點讀取檔案。有讀者可能會問,為什麼需要shuffle?spark作為并行計算架構,同一個作業會被劃分為多個任務在多個節點上并行執行,reduce的輸入可能存在于多個節點上,是以需要通過“洗牌”将所有reduce的輸入彙總起來,這個過程就是shuffle。這個問題以及對shufflemanager的具體使用會在第5章和第6章詳述。shufflemanager的執行個體化見代碼清單3-6。代碼清單3-6最後建立的shufflememorymanager将在3.2.5節介紹。

代碼清單3-6 shufflemanager的執行個體化及shufflememorymanager的建立

3.2.5 shuffle線程記憶體管理器shufflememorymanager

shufflememorymanager負責管理shuffle線程占有記憶體的配置設定與釋放,并通過thread-memory:mutable.hashmap[long, long]緩存每個線程的記憶體位元組數,見代碼清單3-7。

代碼清單3-7 shufflememorymanager的資料結構

從上面代碼可以看出,shuffle所有線程占用的最大記憶體的計算公式為:

java運作時最大記憶體 spark的shuffle最大記憶體占比 spark的安全記憶體占比

可以配置屬性spark.shuffle.memoryfraction修改spark的shuffle最大記憶體占比,配置屬性spark.shuffle.safetyfraction修改spark的安全記憶體占比。

shufflememorymanager通常運作在executor中,driver中的shufflememorymanager 隻有在local模式下才起作用。

3.2.6 塊傳輸服務blocktransferservice

blocktransferservice預設為nettyblocktransferservice(可以配置屬性spark.shuffle.blocktransferservice使用nioblocktransferservice),它使用netty提供的異步事件驅動的網絡應用架構,提供web服務及用戶端,擷取遠端節點上block的集合。

nettyblocktransferservice的具體實作将在第4章詳細介紹。這裡大家可能覺得奇怪,這樣的網絡應用為何也要放在存儲體系?大家不妨先帶着疑問,直到你真正了解了存儲體系。

3.2.7 blockmanagermaster介紹

val blockmanager = new blockmanager(executorid, actorsystem, blockmanagermaster,

val broadcastmanager = new broadcastmanager(isdriver, conf, securitymanager)

private def initialize() {

}

val cachemanager = new cachemanager(blockmanager)

val httpfileserver =

httpfileserver的初始化過程見代碼清單3-10,主要包括以下步驟:

1)使用utils工具類建立檔案伺服器的根目錄及臨時目錄(臨時目錄在運作時環境關閉時會删除)。utils工具的詳細介紹見附錄a。

2)建立存放jar包及其他檔案的檔案目錄。

3)建立并啟動http服務。

代碼清單3-10 httpfileserver的初始化

def initialize() {

httpserver的構造和start方法的實作中,再次使用了utils的靜态方法startserviceonport,是以會回調dostart方法,見代碼清單3-11。有關jetty的api使用參見附錄c。

代碼清單3-11 httpserver的啟動

def start() {

dostart方法中啟動内嵌的jetty所提供的http服務,見代碼清單3-12。

代碼清單3-12 httpserver的啟動功能實作

private def dostart(startport: int): (server, int) = {

val metricssystem = if (isdriver) {

上面調用的createmetricssystem方法實際建立了metricssystem,代碼如下。

def createmetricssystem(

構造metricssystem的過程最重要的是調用了metricsconfig的initialize方法,見代碼清單3-13。

代碼清單3-13 metricsconfig的初始化

從以上實作可以看出,metricsconfig的initialize方法主要負責加載metrics.properties檔案中的屬性配置,并對屬性進行初始化轉換。

例如,将屬性

{.sink.servlet.path=/metrics/json, applications.sink.servlet.path=/metrics/applications/json, .sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet, master.sink.servlet.path=/metrics/master/json}

轉換為

map(applications -> {sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet, sink.servlet.path=/metrics/applications/json}, master -> {sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet, sink.servlet.path=/metrics/master/json}, * -> {sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet, sink.servlet.path=/metrics/json})

new sparkenv(executorid, actorsystem, serializer, closureserializer, cachemanager,

metricssystem, shufflememorymanager, conf)