<b>3.12 spark环境更新</b>
在sparkcontext的初始化过程中,可能对其环境造成影响,所以需要更新环境,代码如下。
postenvironmentupdate()
postapplicationstart()
sparkcontext初始化过程中,如果设置了spark.jars属性,
spark.jars指定的jar包将由addjar方法加入httpfileserver的jardir变量指定的路径下。spark.files指定的文件将由addfile方法加入httpfileserver的filedir变量指定的路径下。见代码清单3-49。
代码清单3-49 依赖文件处理
val jars: seq[string] =
conf.getoption("spark.jars").map(_.split(",")).map(_.filter(_.size
!= 0)).toseq.flatten
val files: seq[string] =
conf.getoption("spark.files").map(_.split(",")).map(_.filter(_.size
// add each jar given through the
constructor
if (jars != null) {
jars.foreach(addjar)
}
if (files != null) {
files.foreach(addfile)
}
httpfileserver的addfile和addjar方法,见代码清单3-50。
代码清单3-50 httpfileserver提供对依赖文件的访问
def addfile(file: file) : string = {
addfiletodir(file, filedir)
serveruri + "/files/" + file.getname
def addjar(file: file) : string = {
addfiletodir(file, jardir)
serveruri + "/jars/" + file.getname
def addfiletodir(file: file, dir: file) :
string = {
if (file.isdirectory) {
throw new illegalargumentexception(s"$file cannot be a
directory.")
files.copy(file, new file(dir, file.getname))
dir + "/" + file.getname
postenvironmentupdate的实现见代码清单3-51,其处理步骤如下:
1)通过调用sparkenv的方法environmentdetails最终影响环境的jvm参数、spark 属性、系统属性、classpath等,参见代码清单3-52。
2)生成事件sparklistenerenvironmentupdate,并post到listenerbus,此事件被environ-mentlistener监听,最终影响environmentpage页面中的输出内容。
代码清单3-51 postenvironmentupdate的实现
private def postenvironmentupdate() {
if (taskscheduler != null) {
val schedulingmode = getschedulingmode.tostring
val addedjarpaths = addedjars.keys.toseq
val addedfilepaths = addedfiles.keys.toseq
val environmentdetails =
sparkenv.environmentdetails(conf, schedulingmode, addedjarpaths,
addedfilepaths)
val environmentupdate =
sparklistenerenvironmentupdate(environmentdetails)
listenerbus.post(environmentupdate)
代码清单3-52 environmentdetails的实现
val jvminformation = seq(
("java
version", s"$javaversion ($javavendor)"),
("java home", javahome),
("scala version", versionstring)
).sorted
val schedulermode =
if (!conf.contains("spark.scheduler.mode")) {
seq(("spark.scheduler.mode", schedulingmode))
else {
seq[(string, string)]()
val sparkproperties = (conf.getall ++
schedulermode).sorted
// system properties that are not java
classpaths
val systemproperties =
utils.getsystemproperties.toseq
val otherproperties =
systemproperties.filter { case (k, _) =>
k
!= "java.class.path" && !k.startswith("spark.")
}.sorted
// class paths including all added jars and
files
val classpathentries = javaclasspath
.split(file.pathseparator)
.filternot(_.isempty)
.map((_, "system classpath"))
val addedjarsandfiles = (addedjars ++
addedfiles).map((_, "added by user"))
val classpaths = (addedjarsandfiles ++
classpathentries).sorted
map[string, seq[(string, string)]](
"jvm information" -> jvminformation,
"spark
properties" -> sparkproperties,
"system properties" -> otherproperties,
"classpath entries" -> classpaths)
postapplicationstart方法很简单,只是向listenerbus发送了sparklistenerapplicationstart事件,代码如下。
listenerbus.post(sparklistenerapplicationstart(appname,
some(applicationid), starttime, sparkuser))