天天看點

深入了解Spark:核心思想與源碼分析. 3.5 Hadoop相關配置及Executor環境變量

<b>3.5 hadoop相關配置及executor環境變量</b>

<b>3.5.1 hadoop相關配置資訊</b>

預設情況下,spark使用hdfs作為分布式檔案系統,是以需要擷取hadoop相關配置資訊的代碼如下。

val hadoopconfiguration =

sparkhadooputil.get.newconfiguration(conf)

擷取的配置資訊包括:

将amazon s3檔案系統的accesskeyid和secretaccesskey加載到hadoop的configuration;

将sparkconf中所有以spark.hadoop.開頭的屬性都複制到hadoop的configuration;

将sparkconf的屬性spark.buffer.size複制為hadoop的configuration的配置io.file.buffer.size。

如果指定了spark_yarn_mode屬性,則會使用yarnsparkhadooputil,否則預設為sparkhadooputil。

<b>3.5.2 executor環境變量</b>

對executor的環境變量的處理,參見代碼清單3-28。executorenvs 包含的環境變量将會在7.2.2節中介紹的注冊應用的過程中發送給master,master給worker發送排程後,worker最終使用executorenvs提供的資訊啟動executor。可以通過配置spark.executor.memory指定executor占用的記憶體大小,也可以配置系統變量spark_executor_memory或者spark_mem對其大小進行設定。

代碼清單3-28 executor環境變量的處理

private[spark] val executormemory =

conf.getoption("spark.executor.memory")

.orelse(option(system.getenv("spark_executor_memory")))

.orelse(option(system.getenv("spark_mem")).map(warnsparkmem))

.map(utils.memorystringtomb)

.getorelse(512)

// environment variables to pass to our executors.

private[spark] val executorenvs = hashmap[string, string]()

for { (envkey, propkey) &lt;- seq(("spark_testing",

"spark.testing"))

value &lt;-

option(system.getenv(envkey)).orelse(option(system.getproperty (propkey)))} {

executorenvs(envkey) = value

    }

option(system.getenv("spark_prepend_classes")).foreach { v

=&gt;

executorenvs("spark_prepend_classes") = v

// the mesos scheduler backend relies on this environment variable to

set executor memory.

executorenvs("spark_executor_memory") = executormemory +

"m"

executorenvs ++= conf.getexecutorenv

// set spark_user for user who is running sparkcontext.

val sparkuser = option {

option(system.getenv("spark_user")).getorelse(system.getproperty("user.name"))

}.getorelse {

sparkcontext.spark_unknown_user

executorenvs("spark_user") = sparkuser

繼續閱讀