天天看点

深入理解Spark:核心思想与源码分析. 3.12 Spark环境更新

<b>3.12 spark环境更新</b>

在sparkcontext的初始化过程中,可能对其环境造成影响,所以需要更新环境,代码如下。

postenvironmentupdate()

postapplicationstart()

sparkcontext初始化过程中,如果设置了spark.jars属性,

spark.jars指定的jar包将由addjar方法加入httpfileserver的jardir变量指定的路径下。spark.files指定的文件将由addfile方法加入httpfileserver的filedir变量指定的路径下。见代码清单3-49。

代码清单3-49 依赖文件处理

val jars: seq[string] =

conf.getoption("spark.jars").map(_.split(",")).map(_.filter(_.size

!= 0)).toseq.flatten

val files: seq[string] =

conf.getoption("spark.files").map(_.split(",")).map(_.filter(_.size

// add each jar given through the

constructor

if (jars != null) {

jars.foreach(addjar)

    }

if (files != null) {

files.foreach(addfile)

}

httpfileserver的addfile和addjar方法,见代码清单3-50。

代码清单3-50 httpfileserver提供对依赖文件的访问

def addfile(file: file) : string = {

addfiletodir(file, filedir)

serveruri + "/files/" + file.getname

def addjar(file: file) : string = {

addfiletodir(file, jardir)

serveruri + "/jars/" + file.getname

def addfiletodir(file: file, dir: file) :

string = {

if (file.isdirectory) {

throw new illegalargumentexception(s"$file cannot be a

directory.")

files.copy(file, new file(dir, file.getname))

dir + "/" + file.getname

postenvironmentupdate的实现见代码清单3-51,其处理步骤如下:

1)通过调用sparkenv的方法environmentdetails最终影响环境的jvm参数、spark 属性、系统属性、classpath等,参见代码清单3-52。

2)生成事件sparklistenerenvironmentupdate,并post到listenerbus,此事件被environ-mentlistener监听,最终影响environmentpage页面中的输出内容。

代码清单3-51 postenvironmentupdate的实现

private def postenvironmentupdate() {

if (taskscheduler != null) {

val schedulingmode = getschedulingmode.tostring

val addedjarpaths = addedjars.keys.toseq

val addedfilepaths = addedfiles.keys.toseq

val environmentdetails =

sparkenv.environmentdetails(conf, schedulingmode, addedjarpaths,

addedfilepaths)

val environmentupdate =

sparklistenerenvironmentupdate(environmentdetails)

listenerbus.post(environmentupdate)

代码清单3-52 environmentdetails的实现

val jvminformation = seq(

    ("java

version", s"$javaversion ($javavendor)"),

("java home", javahome),

("scala version", versionstring)

).sorted

val schedulermode =

if (!conf.contains("spark.scheduler.mode")) {

seq(("spark.scheduler.mode", schedulingmode))

else {

seq[(string, string)]()

val sparkproperties = (conf.getall ++

schedulermode).sorted

// system properties that are not java

classpaths

val systemproperties =

utils.getsystemproperties.toseq

val otherproperties =

systemproperties.filter { case (k, _) =&gt;

    k

!= "java.class.path" &amp;&amp; !k.startswith("spark.")

}.sorted

// class paths including all added jars and

files

val classpathentries = javaclasspath

.split(file.pathseparator)

.filternot(_.isempty)

.map((_, "system classpath"))

val addedjarsandfiles = (addedjars ++

addedfiles).map((_, "added by user"))

val classpaths = (addedjarsandfiles ++

classpathentries).sorted

map[string, seq[(string, string)]](

"jvm information" -&gt; jvminformation,

    "spark

properties" -&gt; sparkproperties,

"system properties" -&gt; otherproperties,

"classpath entries" -&gt; classpaths)

postapplicationstart方法很简单,只是向listenerbus发送了sparklistenerapplicationstart事件,代码如下。

listenerbus.post(sparklistenerapplicationstart(appname,

some(applicationid), starttime, sparkuser))