<b>3.5 hadoop相關配置及executor環境變量</b>
<b>3.5.1 hadoop相關配置資訊</b>
預設情況下,spark使用hdfs作為分布式檔案系統,是以需要擷取hadoop相關配置資訊的代碼如下。
val hadoopconfiguration =
sparkhadooputil.get.newconfiguration(conf)
擷取的配置資訊包括:
将amazon s3檔案系統的accesskeyid和secretaccesskey加載到hadoop的configuration;
将sparkconf中所有以spark.hadoop.開頭的屬性都複制到hadoop的configuration;
将sparkconf的屬性spark.buffer.size複制為hadoop的configuration的配置io.file.buffer.size。
如果指定了spark_yarn_mode屬性,則會使用yarnsparkhadooputil,否則預設為sparkhadooputil。
<b>3.5.2 executor環境變量</b>
對executor的環境變量的處理,參見代碼清單3-28。executorenvs 包含的環境變量将會在7.2.2節中介紹的注冊應用的過程中發送給master,master給worker發送排程後,worker最終使用executorenvs提供的資訊啟動executor。可以通過配置spark.executor.memory指定executor占用的記憶體大小,也可以配置系統變量spark_executor_memory或者spark_mem對其大小進行設定。
代碼清單3-28 executor環境變量的處理
private[spark] val executormemory =
conf.getoption("spark.executor.memory")
.orelse(option(system.getenv("spark_executor_memory")))
.orelse(option(system.getenv("spark_mem")).map(warnsparkmem))
.map(utils.memorystringtomb)
.getorelse(512)
// environment variables to pass to our executors.
private[spark] val executorenvs = hashmap[string, string]()
for { (envkey, propkey) <- seq(("spark_testing",
"spark.testing"))
value <-
option(system.getenv(envkey)).orelse(option(system.getproperty (propkey)))} {
executorenvs(envkey) = value
}
option(system.getenv("spark_prepend_classes")).foreach { v
=>
executorenvs("spark_prepend_classes") = v
// the mesos scheduler backend relies on this environment variable to
set executor memory.
executorenvs("spark_executor_memory") = executormemory +
"m"
executorenvs ++= conf.getexecutorenv
// set spark_user for user who is running sparkcontext.
val sparkuser = option {
option(system.getenv("spark_user")).getorelse(system.getproperty("user.name"))
}.getorelse {
sparkcontext.spark_unknown_user
executorenvs("spark_user") = sparkuser