天天看點

深入了解Spark:核心思想與源碼分析. 3.2 建立執行環境SparkEnv

<b>3.2 建立執行環境sparkenv</b>

sparkenv是spark的執行環境對象,其中包括衆多與executor執行相關的對象。由于在local模式下driver會建立executor,local-cluster部署模式或者standalone部署模式下worker另起的coarsegrainedexecutorbackend程序中也會建立executor,是以sparkenv存在于driver或者coarsegrainedexecutorbackend程序中。建立sparkenv

主要使用sparkenv的createdriverenv,sparkenv.createdriverenv方法有三個參數:conf、islocal和 listenerbus。

val islocal = (master == "local"

|| master.startswith("local["))

private[spark] val listenerbus = new

livelistenerbus

conf.set("spark.executor.id", "driver")

    private[spark]

val env = sparkenv.createdriverenv(conf, islocal, listenerbus)

sparkenv.set(env)

上面代碼中的conf是對sparkconf的複制,islocal辨別是否是單機模式,listenerbus采用監聽器模式維護各類事件的處理,在3.4.1節會詳細介紹。

sparkenv的方法createdriverenv最終調用create建立sparkenv。sparkenv的構造步驟如下:

1)建立安全管理器securitymanager;

2)建立基于akka的分布式消息系統actorsystem;

3)建立map任務輸出跟蹤器mapoutputtracker;

4)執行個體化shufflemanager;

5)建立shufflememorymanager;

6)建立塊傳輸服務blocktransferservice;

7)建立blockmanagermaster;

8)建立塊管理器blockmanager;

9)建立廣播管理器broadcastmanager;

10)建立緩存管理器cachemanager;

11)建立http檔案伺服器httpfileserver;

12)建立測量系統metricssystem;

13)建立sparkenv。

<b>3.2.1 安全管理器securitymanager</b>

securitymanager主要對權限、賬号進行設定,如果使用hadoop yarn作為叢集管理器,則需要使用證書生成 secret key登入,最後給目前系統設定預設的密碼認證執行個體,此執行個體采用匿名内部類實作,參見代碼清單3-2。

代碼清單3-2 securitymanager的實作

private val secretkey = generatesecretkey()

// 使用http連接配接設定密碼認證

if (authon) {

authenticator.setdefault(

new authenticator() {

override def getpasswordauthentication(): passwordauthentication = {

                var passauth: passwordauthentication

= null

val userinfo = getrequestingurl().getuserinfo()

if (userinfo != null) {

                val  parts = userinfo.split(":", 2)

                passauth = new

passwordauthentication(parts(0), parts(1).tochararray())

          }

                return passauth

}

    )

<b>3.2.2 基于akka的分布式消息系統actorsystem</b>

actorsystem是spark中最基礎的設施,spark既使用它發送分布式消息,又用它實作并發程式設計。消息系統可以實作并發?要解釋清楚這個問題,首先應該簡單介紹下scala語言的actor并發程式設計模型:scala認為java線程通過共享資料以及通過鎖來維護共享資料的一緻性是糟糕的做法,容易引起鎖的争用,降低并發程式的性能,甚至會引入死鎖的問題。在scala中隻需要自定義類型繼承actor,并且提供act方法,就如同java裡實作runnable接口,需要實作run方法一樣。但是不能直接調用act方法,而是通過發送消息的方式(scala發送消息是異步的)傳遞資料。如:

actor ! message

akka是actor程式設計模型的進階類庫,類似于jdk

1.5之後越來越豐富的并發工具包,簡化了程式員并發程式設計的難度。actorsystem便是akka提供的用于建立分布式消息通信系統的基礎類。akka的具體資訊見附錄b。

正是因為actor輕量級的并發程式設計、消息發送以及actorsystem支援分布式消息發送等特點,spark選擇了actorsystem。

sparkenv中建立actorsystem時用到了akkautils工具類,見代碼清單3-3。akkautils.createactorsystem方法用于啟動actorsystem,見代碼清單3-4。akkautils使用了utils的靜态方法startserviceonport,

startserviceonport最終會回調方法startservice: int =&gt; (t, int),此處的startservice實際是方法docreateactorsystem。真正啟動actorsystem是由docreate-actorsystem方法完成的,docreateactorsystem的具體實作細節請見附錄b。spark的driver中akka的預設通路位址是akka://sparkdriver,spark的executor中akka的預設通路位址是akka://

sparkexecutor。如果不指定actorsystem的端口,那麼所有節點的actorsystem端口在每次啟動時随機産生。關于startserviceonport的實作,請見附錄a。

代碼清單3-3 akkautils工具類建立和啟動actorsystem

val (actorsystem, boundport) =

option(defaultactorsystem) match {

case some(as) =&gt; (as, port)

case none =&gt;

val actorsystemname = if (isdriver) driveractorsystemname else

executoractorsystemname

akkautils.createactorsystem(actorsystemname, hostname, port, conf,

securitymanager)

代碼清單3-4 actorsystem的建立和啟動

def createactorsystem(

name: string,

host: string,

port: int,

conf: sparkconf,

securitymanager: securitymanager): (actorsystem, int) = {

val startservice: int =&gt; (actorsystem, int) = { actualport =&gt;

docreateactorsystem(name, host, actualport, conf, securitymanager)

    }

utils.startserviceonport(port, startservice, conf, name)

<b>3.2.3 map任務輸出跟蹤器mapoutputtracker</b>

mapoutputtracker用于跟蹤map階段任務的輸出狀态,此狀态便于reduce階段任務擷取位址及中間輸出結果。每個map任務或者reduce任務都會有其唯一辨別,分别為mapid和reduceid。每個reduce任務的輸入可能是多個map任務的輸出,reduce會到各個map任務的所在節點上拉取block,這一過程叫做shuffle。每批shuffle過程都有唯一的辨別shuffleid。

這裡先介紹下mapoutputtrackermaster。mapoutputtrackermaster内部使用mapstatuses:timestampedhashmap[int, array[mapstatus]]來維護跟蹤各個map任務的輸出狀态。其中key對應shuffleid,array存儲各個map任務對應的狀态資訊mapstatus。由于mapstatus維護了map輸出block的位址blockmanagerid,是以reduce任務知道從何處擷取map任務的中間輸出。mapoutputtrackermaster還使用cachedserializedstatuses:timestampedhashmap[int,

array[byte]]維護序列化後的各個map任務的輸出狀态。其中key對應shuffleid,array存儲各個序列化mapstatus生成的位元組數組。

driver和executor處理mapoutputtrackermaster的方式有所不同。

如果目前應用程式是driver,則建立mapoutputtrackermaster,然後建立mapoutputtrackermasteractor,并且注冊到actorsystem中。

如果目前應用程式是executor,則建立mapoutputtrackerworker,并從actorsystem中找到mapoutputtrackermasteractor。

無論是driver還是executor,最後都由mapoutputtracker的屬性trackeractor持有mapoutputtrackermasteractor的引用,參見代碼清單3-5。

代碼清單3-5 registerorlookup方法用于查找或者注冊actor的實作

def registerorlookup(name: string,

newactor: =&gt; actor): actorref = {

if (isdriver) {

loginfo("registering " + name)

actorsystem.actorof(props(newactor), name = name)

else {

akkautils.makedriverref(name, conf, actorsystem)

val mapoutputtracker =  if

(isdriver) {

new mapoutputtrackermaster(conf)

new mapoutputtrackerworker(conf)

mapoutputtracker.trackeractor = registerorlookup(

"mapoutputtracker",

new mapoutputtrackermasteractor(mapoutputtracker.asinstanceof[mapoutputtrackermaster],

conf))

在後面章節大家會知道map任務的狀态正是由executor向持有的mapoutputtracker-masteractor發送消息,将map任務狀态同步到mapoutputtracker的mapstatuses和cached-serializedstatuses的。executor究竟是如何找到mapoutputtrackermasteractor的?registerorlookup方法通過調用akkautils.makedriverref找到mapoutputtrackermasteractor,實際正是利用actorsystem提供的分布式消息機制實作的,具體細節參見附錄b。這裡第一次使用到了akka提供的功能,以後大家會漸漸感覺到使用akka的便捷。

<b>3.2.4 執行個體化shufflemanager</b>

shufflemanager負責管理本地及遠端的block資料的shuffle操作。shufflemanager預設為通過反射方式生成的sortshufflemanager的執行個體,可以修改屬性spark.shuffle.manager為hash來顯式控制使用hashshufflemanager。sortshufflemanager通過持有的indexshuffleblockmanager間接操作blockmanager中的diskblockmanager将map結果寫入本地,并根據shuffleid、mapid寫入索引檔案,也能通過mapoutputtrackermaster中維護的mapstatuses從本地或者其他遠端節點讀取檔案。有讀者可能會問,為什麼需要shuffle?spark作為并行計算架構,同一個作業會被劃分為多個任務在多個節點上并行執行,reduce的輸入可能存在于多個節點上,是以需要通過“洗牌”将所有reduce的輸入彙總起來,這個過程就是shuffle。這個問題以及對shufflemanager的具體使用會在第5章和第6章詳述。shufflemanager的執行個體化見代碼清單3-6。代碼清單3-6最後建立的shufflememorymanager将在3.2.5節介紹。

代碼清單3-6 shufflemanager的執行個體化及shufflememorymanager的建立

val shortshufflemgrnames = map(

"hash" -&gt;

"org.apache.spark.shuffle.hash.hashshufflemanager",

"sort" -&gt; "org.apache.spark.shuffle.sort.sortshufflemanager")

val shufflemgrname = conf.get("spark.shuffle.manager",

"sort")

val shufflemgrclass = shortshufflemgrnames.get

orelse(shufflemgrname.tolowercase,

shufflemgrname)

val shufflemanager = instantiateclass[shufflemanager](shufflemgrclass)

val shufflememorymanager = new shufflememorymanager(conf)

<b>3.2.5 shuffle線程記憶體管理器shufflememorymanager</b>

shufflememorymanager負責管理shuffle線程占有記憶體的配置設定與釋放,并通過thread-memory:mutable.hashmap[long, long]緩存每個線程的記憶體位元組數,見代碼清單3-7。

代碼清單3-7 shufflememorymanager的資料結構

private[spark] class

shufflememorymanager(maxmemory: long) extends logging {

private val threadmemory = new mutable.hashmap[long, long]()  // threadid -&gt; memory bytes

def this(conf: sparkconf) =

this(shufflememorymanager.getmaxmemory(conf))

getmaxmemory方法用于擷取shuffle所有線程占用的最大記憶體,實作如下。

def getmaxmemory(conf: sparkconf): long = {

val memoryfraction =

conf.getdouble("spark.shuffle.memoryfraction", 0.2)

val safetyfraction =

conf.getdouble("spark.shuffle.safetyfraction", 0.8)

(runtime.getruntime.maxmemory * memoryfraction * safetyfraction).tolong

從上面代碼可以看出,shuffle所有線程占用的最大記憶體的計算公式為:

java運作時最大記憶體 * spark的shuffle最大記憶體占比 *

spark的安全記憶體占比

可以配置屬性spark.shuffle.memoryfraction修改spark的shuffle最大記憶體占比,配置屬性spark.shuffle.safetyfraction修改spark的安全記憶體占比。

shufflememorymanager通常運作在executor中,driver中的shufflememorymanager

隻有在local模式下才起作用。

<b>3.2.6 塊傳輸服務blocktransferservice</b>

blocktransferservice預設為nettyblocktransferservice(可以配置屬性spark.shuffle.blocktransferservice使用nioblocktransferservice),它使用netty提供的異步事件驅動的網絡應用架構,提供web服務及用戶端,擷取遠端節點上block的集合。

val blocktransferservice =

conf.get("spark.shuffle.blocktransferservice",

"netty").tolowercase match {

case "netty" =&gt;

new nettyblocktransferservice(conf, securitymanager, numusablecores)

case "nio" =&gt;

new nioblocktransferservice(conf, securitymanager)

nettyblocktransferservice的具體實作将在第4章詳細介紹。這裡大家可能覺得奇怪,這樣的網絡應用為何也要放在存儲體系?大家不妨先帶着疑問,直到你真正了解了存儲體系。

<b>3.2.7 blockmanagermaster介紹</b>

blockmanagermaster負責對block的管理和協調,具體操作依賴于blockmanager-masteractor。driver和executor處理blockmanagermaster的方式不同:

如果目前應用程式是driver,則建立blockmanagermasteractor,并且注冊到actor-system中。

如果目前應用程式是executor,則從actorsystem中找到blockmanagermasteractor。

無論是driver還是executor,最後blockmanagermaster的屬性driveractor将持有對blockmanagermasteractor的引用。blockmanagermaster的建立代碼如下。

val blockmanagermaster = new

blockmanagermaster(registerorlookup(

"blockmanagermaster",

new blockmanagermasteractor(islocal, conf, listenerbus)), conf,

isdriver)

registerorlookup已在3.2.3節介紹過了,不再贅述。blockmanagermaster及blockmanager-masteractor的具體實作将在第4章詳細介紹。

<b>3.2.8 建立塊管理器blockmanager</b>

blockmanager負責對block的管理,隻有在blockmanager的初始化方法initialize被調用後,它才是有效的。blockmanager作為存儲系統的一部分,具體實作見第4章。blockmanager的建立代碼如下。

val blockmanager = new

blockmanager(executorid, actorsystem, blockmanagermaster,

serializer, conf, mapoutputtracker, shufflemanager,

blocktransferservice, securitymanager,numusablecores)

<b>3.2.9 建立廣播管理器broadcastmanager</b>

broadcastmanager用于将配置資訊和序列化後的rdd、job以及shuffledependency等資訊在本地存儲。如果為了容災,也會複制到其他節點上。建立broadcastmanager的代碼實作如下。

val broadcastmanager = new

broadcastmanager(isdriver, conf, securitymanager)

broadcastmanager必須在其初始化方法initialize被調用後,才能生效。initialize方法實際利用反射生成廣播工廠執行個體broadcastfactory(可以配置屬性spark.broadcast.factory指定,預設為org.apache.spark.broadcast.torrentbroadcastfactory)。broadcastmanager的廣播方法newbroadcast實際代理了工廠broadcastfactory的newbroadcast方法來生成廣播對象。unbroadcast方法實際代理了工廠broadcastfactory的unbroadcast方法生成非廣播對象。broadcastmanager的initialize、unbroadcast及newbroadcast方法見代碼清單3-8。

代碼清單3-8 broadcastmanager的實作

private def initialize() {

synchronized {

if (!initialized) {

val broadcastfactoryclass =

conf.get("spark.broadcast.factory",

"org.apache.spark.broadcast.torrentbroadcastfactory")

broadcastfactory =

class.forname(broadcastfactoryclass).newinstance.asinstanceof

[broadcastfactory]

broadcastfactory.initialize(isdriver, conf, securitymanager)

initialized = true

private val nextbroadcastid = new atomiclong(0)

def newbroadcast[t: classtag](value_ : t, islocal: boolean) = {

  broadcastfactory.newbroadcast[t](value_,

islocal, nextbroadcastid.getandincrement())

def unbroadcast(id: long, removefromdriver: boolean, blocking: boolean)

{

broadcastfactory.unbroadcast(id, removefromdriver, blocking)

<b>3.2.10 建立緩存管理器cachemanager</b>

cachemanager用于緩存rdd某個分區計算後的中間結果,緩存計算結果發生在疊代計算的時候,将在6.1節講到。而cachemanager将在4.10節較長的描述。建立cachemanager的代碼如下。

val cachemanager = new

cachemanager(blockmanager)

<b>3.2.11 http檔案伺服器httpfileserver</b>

httpfileserver的建立參見代碼清單3-9。httpfileserver主要提供對jar及其他檔案的http通路,這些jar包包括使用者上傳的jar包。端口由屬性spark.fileserver.port配置,預設為0,表示随機生成端口号。

代碼清單3-9 httpfileserver的建立

val httpfileserver =

val fileserverport = conf.getint("spark.fileserver.port", 0)

val server = new httpfileserver(conf, securitymanager, fileserverport)

server.initialize()

conf.set("spark.fileserver.uri",  server.serveruri)

server

null

httpfileserver的初始化過程見代碼清單3-10,主要包括以下步驟:

1)使用utils工具類建立檔案伺服器的根目錄及臨時目錄(臨時目錄在運作時環境關閉時會删除)。utils工具的詳細介紹見附錄a。

2)建立存放jar包及其他檔案的檔案目錄。

3)建立并啟動http服務。

代碼清單3-10 httpfileserver的初始化

def initialize() {

basedir = utils.createtempdir(utils.getlocaldir(conf),

"httpd")

filedir = new file(basedir, "files")

jardir = new file(basedir, "jars")

filedir.mkdir()

jardir.mkdir()

loginfo("http file server directory is " + basedir)

httpserver = new httpserver(conf, basedir, securitymanager,

requestedport, "http file server")

httpserver.start()

serveruri = httpserver.uri

logdebug("http file server started at: " + serveruri)

httpserver的構造和start方法的實作中,再次使用了utils的靜态方法startserviceonport,是以會回調dostart方法,見代碼清單3-11。有關jetty的api使用參見附錄c。

代碼清單3-11 httpserver的啟動

def start() {

if (server != null) {

throw new serverstateexception("server is already started")

loginfo("starting http server")

val (actualserver, actualport) =

utils.startserviceonport[server](requestedport, dostart, conf,

servername)

server = actualserver

port = actualport

dostart方法中啟動内嵌的jetty所提供的http服務,見代碼清單3-12。

代碼清單3-12 httpserver的啟動功能實作

private def dostart(startport: int):

(server, int) = {

val server = new server()

val connector = new socketconnector

connector.setmaxidletime(60 * 1000)

connector.setsolingertime(-1)

connector.setport(startport)

server.addconnector(connector)

val threadpool = new queuedthreadpool

threadpool.setdaemon(true)

server.setthreadpool(threadpool)

    val reshandler = new resourcehandler

reshandler.setresourcebase(resourcebase.getabsolutepath)

val handlerlist = new handlerlist

handlerlist.sethandlers(array(reshandler, new defaulthandler))

if (securitymanager.isauthenticationenabled()) {

logdebug("httpserver is using security")

val sh = setupsecurityhandler(securitymanager)

// make sure we go through security handler to get resources

sh.sethandler(handlerlist)

server.sethandler(sh)

logdebug("httpserver is not using security")

server.sethandler(handlerlist)

server.start()

val actualport = server.getconnectors()(0).getlocalport

(server, actualport)

3.2.12 建立測量系統metricssystem

metricssystem是spark的測量系統,建立metricssystem的代碼如下。

val metricssystem = if (isdriver) {

metricssystem.createmetricssystem("driver", conf,

conf.set("spark.executor.id", executorid)

val ms = metricssystem.createmetricssystem("executor", conf,

ms.start()

ms

上面調用的createmetricssystem方法實際建立了metricssystem,代碼如下。

def createmetricssystem(

instance: string, conf: sparkconf, securitymgr: securitymanager):

metricssystem = {

new metricssystem(instance, conf, securitymgr)

構造metricssystem的過程最重要的是調用了metricsconfig的initialize方法,見代碼清單3-13。

代碼清單3-13 metricsconfig的初始化

setdefaultproperties(properties)

var is: inputstream = null

try {

is = configfile match {

case some(f) =&gt; new fileinputstream(f)

utils.getsparkclassloader.getresourceasstream(metrics_conf)

if (is != null) {

properties.load(is)

catch {

case e: exception =&gt; logerror("error loading configure

file", e)

finally {

if (is != null) is.close()

propertycategories = subproperties(properties, instance_regex)

if (propertycategories.contains(default_prefix)) {

import scala.collection.javaconversions._

val defaultproperty = propertycategories(default_prefix)

for { (inst, prop) &lt;- propertycategories

if (inst != default_prefix)

(k, v) &lt;- defaultproperty

if (prop.getproperty(k) == null) } {

prop.setproperty(k, v)

從以上實作可以看出,metricsconfig的initialize方法主要負責加載metrics.properties檔案中的屬性配置,并對屬性進行初始化轉換。

例如,将屬性

{*.sink.servlet.path=/metrics/json,

applications.sink.servlet.path=/metrics/applications/json,

*.sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet,

master.sink.servlet.path=/metrics/master/json}

轉換為

map(applications -&gt;

{sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet,

sink.servlet.path=/metrics/applications/json}, master -&gt;

sink.servlet.path=/metrics/master/json}, * -&gt; {sink.servlet.class=org.apache.spark.metrics.sink.metricsservlet,

sink.servlet.path=/metrics/json})

<b>3.2.13 建立sparkenv</b>

當所有的基礎元件準備好後,最終使用下面的代碼建立執行環境sparkenv。

new sparkenv(executorid, actorsystem,

serializer, closureserializer, cachemanager,

mapoutputtracker, shufflemanager, broadcastmanager,

blocktransferservice,

blockmanager, securitymanager, httpfileserver, sparkfilesdir,

metricssystem, shufflememorymanager, conf)

serializer和closureserializer都是使用class.forname反射生成的org.apache.spark.serializer.javaserializer類的執行個體,其中closureserializer執行個體特别用來對scala中的閉包進行序列化。