属性名
默认值
含义
<code>spark.memory.fraction</code>
0.75
<code>spark.memory.storagefraction</code>
0.5
<code>spark.memory.offheap.enabled</code>
true
如果true,spark会尝试使用堆外内存。启用 后,spark.memory.offheap.size必须为正数。
<code>spark.memory.offheap.size</code>
堆外内存分配的大小(绝对值)。这个设置不会影响堆内存的使用,所以你的执行器总内存必须适应jvm的堆内存大小。必须要设为正数。并且前提是 spark.memory.offheap.enabled=true.
<code>spark.memory.uselegacymode</code>
false
是否使用老式的内存管理模式(1.5以及之前)。老模式在堆内存管理上更死板,使用固定划分的区域做不同功能,潜在的会导致过多的数据溢出到磁盘(如果不小心调整性能)。必须启用本参数,以下选项才可用:
<code></code><code>spark.shuffle.memoryfraction</code>
<code>spark.storage.memoryfraction</code>
<code>spark.storage.unrollfraction</code>
<code>spark.shuffle.memoryfraction</code>
0.2
(废弃)必须先启用spark.memory.uselegacymode这个才有用。
混洗阶段用于聚合和协同分组的jvm堆内存比例。在任何指定的时间,所有用于混洗的内存总和不会超过这个上限,超出的部分会溢出到磁盘上。如果溢出台频繁,考虑增加spark.storage.memoryfraction的大小。
0.6
spark用于缓存数据的对内存比例。这个值不应该比jvm 老生代(old generation)对象所占用的内存大,默认是60%的堆内存,当然你可以增加这个值,同时配置你所用的老生代对象占用内存大小。
spark块展开的内存占用比例。如果没有足够的内存来完整展开新的块,那么老的块将被抛弃。
<code>spark.broadcast.blocksize</code>
4m
torrentbroadcastfactory每个分片大小。太大会减少广播时候的并发数(更慢了);如果太小,blockmanager可能会给出性能提示。
<code>spark.broadcast.factory</code>
org.apache.spark.broadcast.
torrentbroadcastfactory
广播算法的实现。
<code>spark.cleaner.ttl</code>
(infinite)
spark记住任意元数据的保留时间(秒)。周期性的清理能保证比这个更老的元数据将被遗忘(删除)。这对于长期运行的spark作业非常有用(如,一些7*24运行)。注意,rdd持久化到内存中后,过了这么长时间以后,也会被清理掉(这。。。是不是有点坑!)。
<code>spark.executor.cores</code>
yarn模式下默认1;如果是独立部署,则是worker节点上所有可用的core。
单个执行器可用的core数。仅针对yarn和独立部署模式。独立部署时,单个worker节点上会运行多个执行器(executor),只要worker上有足够的core。否则,每个应用在单个worker上只会启动一个执行器。
<code>spark.default.parallelism</code>
对于reducebykey和join这样的分布式混洗(shuffle)算子,等于父rdd中最大的分区。对于parallelize这样没有父rdd的算子,则取决于集群管理器:
local mode: number of cores on the local machine — 本地模式:机器的core数
mesos fine grained mode: 8 — mesos细粒度模式:8
others: total number of cores on all executor nodes or 2, whichever is larger — 其他:所有执行器节点上core的数量 或者 2,这两数取较大的
如果用户没有在参数里指定,这个属性是默认的rdd transformation算子分区数,如:join,reducebykey,parallelize等。
<code>spark.executor.heartbeatinterval</code>
10s
执行器心跳间隔(报告心跳给驱动器)。心跳机制使驱动器了解哪些执行器还活着,并且可以从心跳数据中获得执行器的度量数据。
<code>spark.files.fetchtimeout</code>
60s
获取文件的通讯超时,所获取的文件是通过在驱动器上调用sparkcontext.addfile()添加的。
<code>spark.files.usefetchcache</code>
<code>spark.files.overwrite</code>
sparkcontext.addfile()添加的文件已经存在,且内容不匹配的情况下,是否覆盖。
<code>spark.hadoop.cloneconf</code>
<code>spark.hadoop.validateoutputspecs</code>
如设为true,在saveashadoopfile及其变体的时候,将会验证输出(例如,检查输出目录是否存在)。对于已经验证过或确认存在输出目录的情况,可以禁用这个。我们建议不要禁用,除非你确定需要和之前的spark版本兼容。可以简单的利用hadoop 文件系统api手动删掉已存在的输出目录。这个设置会被spark streaming streamingcontext生成的job忽略,因为streaming需要在回复检查点的时候,覆盖已有的输出目录。
<code>spark.storage.memorymapthreshold</code>
2m
spark从磁盘上读取一个块后,映射到内存块的最小大小。这阻止了spark映射过小的内存块。通常,内存映射块是有开销的,应该比接近或小于操作系统的页大小。
<code>spark.externalblockstore.blockmanager</code>
org.apache.spark.storage.tachyonblockmanager
用于存储rdd的外部块管理器(文件系统)的实现。
文件系统url由spark.externalblockstore.url决定。
<code>spark.externalblockstore.basedir</code>
system.getproperty(“java.io.tmpdir”)
外部块存储存放rdd的目录。文件系统url由spark.externalblockstore.url决定。也可以是逗号分隔的目录列表(tachyon文件系统)
<code>spark.externalblockstore.url</code>
tachyon://localhost:19998 for tachyon
所使用的外部块存储文件系统url。
<code>spark.akka.framesize</code>
128
“control plane” 通讯中所允许的最大消息大小(mb)。通常,只应用于map输出数据的大小信息,这些信息会在执行器和驱动器之间传递。如果你的job包含几千个map和reduce任务,你可能需要增大这个设置。
<code>spark.akka.heartbeat.interval</code>
1000s
设这么大的值,是为了禁用akka传输失败检测器。也可以重新启用,如果你想用这个特性(但不建议)。设成较大的值可以减少网络开销,而较小的值(1秒左右)可能会对akka的失败检测更有用。如有需要,可以调整这个值和spark.akka.heartbeat.pauses的组合。一种可能需要使用失败检测的情形是:用一个敏感的失败检测,可以快速识别并逐出不稳定的执行器。然而,在真实的spark集群中,这通常不是gc暂停或网络延迟造成的。除此之外,启用这个还会导致过多的心跳数据交换,从而造成网络洪峰。
<code>spark.akka.heartbeat.pauses</code>
6000s
设这么大的值,是为了禁用akka传输失败检测器。也可以重新启用,如果你想用这个特性(但不建议)。这个是可接受的akka心跳暂停时间。这个可以用来控制对gc暂停敏感程度。如有需要,可以调整这个值和spark.akka.heartbeat.interval的组合。
<code>spark.akka.threads</code>
4
用于通讯的actor线程数。如果驱动器机器上有很多cpu core,你可以适当增大这个值。
<code>spark.akka.timeout</code>
100s
spark节点之间通讯超时。
<code>spark.blockmanager.port</code>
(random)
块管理器(block manager)监听端口。在驱动器和执行器上都有。
<code>spark.broadcast.port</code>
驱动器http广播server监听端口。这和torrent广播没有关系。
<code>spark.driver.host</code>
(local hostname)
驱动器主机名。用于和执行器以及独立部署时集群master通讯。
<code>spark.driver.port</code>
驱动器端口。用于和执行器以及独立部署时集群master通讯。
<code>spark.executor.port</code>
执行器端口。用于和驱动器通讯。
<code>spark.fileserver.port</code>
驱动器http文件server监听端口。
<code>spark.network.timeout</code>
120s
所有网络交互的默认超时。这个配置是以下属性的默认值:
<code>spark.core.connection.ack.wait.timeout</code>,
<code>spark.akka.timeout</code>,
<code>spark.storage.blockmanagerslavetimeoutms</code>,
<code>spark.shuffle.io.connectiontimeout</code>,<code>spark.rpc.asktimeout</code> or
<code>spark.rpc.lookuptimeout</code>
<code>spark.port.maxretries</code>
16
绑定一个端口的最大重试次数。如果指定了一个端口(非0),每个后续重试会在之前尝试的端口基础上加1,然后再重试绑定。本质上,这确定了一个绑定端口的范围,就是 [start port, start port + maxretries]
<code>spark.replclassserver.port</code>
驱动器http class server的监听端口。只和spark shell相关。
<code>spark.rpc.numretries</code>
3
rpc任务最大重试次数。rpc任务最多重试这么多次。
<code>spark.rpc.retry.wait</code>
3s
rpc请求操作重试前等待时间。
<code>spark.rpc.asktimeout</code>
rpc请求操作超时等待时间。
rpc远程端点查询超时。
<code>spark.cores.max</code>
(not set)
<code>spark.locality.wait</code>
为了数据本地性最长等待时间(spark会根据数据所在位置,尽量让任务也启动于相同的节点,然而可能因为该节点上资源不足等原因,无法满足这个任务分配,spark最多等待这么多时间,然后放弃数据本地性)。数据本地性有多个级别,每一级别都是等待这么多时间(同一进程、同一节点、同一机架、任意)。你也可以为每个级别定义不同的等待时间,需要设置spark.locality.wait.node等。如果你发现任务数据本地性不佳,可以增加这个值,但通常默认值是ok的。
<code>spark.locality.wait.node</code>
spark.locality.wait
单独定义同一节点数据本地性任务等待时间。你可以设为0,表示忽略节点本地性,直接跳到下一级别,即机架本地性(如果你的集群有机架信息)。
<code>spark.locality.wait.process</code>
单独定义同一进程数据本地性任务等待时间。这个参数影响试图访问特定执行器上的缓存数据的任务。
<code>spark.locality.wait.rack</code>
单独定义同一机架数据本地性等待时间。
<code>spark.scheduler.maxregisteredresourceswaitingtime</code>
30s
调度开始前,向集群管理器注册使用资源的最大等待时间。
<code>spark.scheduler.minregisteredresourcesratio</code>
0.8 for yarn mode;
0.0 for standalone mode and mesos coarse-grained mode
调度启动前,需要注册得到资源的最小比例(注册到的资源数 / 需要资源总数)(yarn模式下,资源是执行器;独立部署和mesos粗粒度模式下时资源是cpu core【spark.cores.max是期望得到的资源总数】)。可以设为0.0~1.0的一个浮点数。不管job是否得到了最小资源比例,最大等待时间都是由spark.scheduler.maxregisteredresourceswaitingtime控制的。
<code>spark.scheduler.mode</code>
fifo
<code>spark.scheduler.revive.interval</code>
1s
调度器复活worker的间隔时间。
<code>spark.speculation</code>
如果设为true,将会启动推测执行任务。这意味着,如果stage中有任务执行较慢,他们会被重新调度到别的节点上执行。
<code>spark.speculation.interval</code>
100ms
spark检查慢任务的时间间隔。
<code>spark.speculation.multiplier</code>
1.5
比任务平均执行时间慢多少倍的任务会被认为是慢任务。
<code>spark.speculation.quantile</code>
对于一个stage来说,完成多少百分比才开始检查慢任务,并启动推测执行任务。
<code>spark.task.cpus</code>
1
每个任务分配的cpu core。
<code>spark.task.maxfailures</code>
单个任务最大失败次数。应该>=1。最大重试次数 = spark.task.maxfailures – 1
<code>spark.dynamicallocation.enabled</code>
<code>spark.dynamicallocation .executoridletimeout</code>
<code>spark.dynamicallocation.cachedexecutoridletimeout</code>
infinity
<code>spark.dynamicallocation.initialexecutors</code>
spark
.dynamicallocation
.minexecutors
动态分配开启后,执行器的初始个数
<code>spark.dynamicallocation.maxexecutors</code>
动态分配开启后,执行器个数的上限
<code>spark.dynamicallocation.minexecutors</code>
动态分配开启后,执行器个数的下限
<code>spark.dynamicallocation.schedulerbacklogtimeout</code>
<code>spark.dynamicallocation.sustainedschedulerbacklogtimeout</code>
<code>schedulerbacklogtimeout</code>
<code>spark.acls.enable</code>
是否启用spark acls(访问控制列表)。如果启用,那么将会检查用户是否有权限查看或修改某个作业(job)。注意,检查的前提是需要知道用户是谁,所以如果用户是null,则不会做任何检查。你可以在spark ui上设置过滤器(filters)来做用户认证,并设置用户名。
<code>spark.admin.acls</code>
empty
逗号分隔的用户列表,在该列表中的用户/管理员将能够访问和修改所有的spark作业(job)。如果你的集群是共享的,并且有集群管理员,还有需要调试的开发人员,那么这个配置会很有用。如果想让所有人都有管理员权限,只需把该配置设置为”*”
<code>spark.authenticate</code>
设置spark是否认证集群内部连接。如果不是在yarn上运行,请参考 spark.authenticate.secret
<code>spark.authenticate.secret</code>
none
设置spark用于内部组件认证的秘钥。如果不是在yarn上运行,且启用了 spark.authenticate,那么该配置必须设置
<code>spark.authenticate.enablesaslencryption</code>
是否对spark内部组件认证使用加密通信。该配置目前只有 block transfer service 使用。
<code>spark.network.sasl.serveralwaysencrypt</code>
是否对支持sasl认证的service禁用非加密通信。该配置目前只有 external shuffle service 支持。
<code>spark.core.connection.ack.wait.timeout</code>
网络连接等待应答信号的超时时间。为了避免由于gc等导致的意外超时,你可以设置一个较大的值。
<code>spark.core.connection.auth.wait.timeout</code>
网络连接等待认证的超时时间。
<code>spark.modify.acls</code>
逗号分隔的用户列表,在改列表中的用户可以修改spark作业。默认情况下,只有启动该spark作业的用户可以修改之(比如杀死该作业)。如果想要任何用户都可以修改作业,请将该配置设置为”*”
<code>spark.ui.filters</code>
spark.<class name of filer>.params=’param1=value1,param2=value2’例如:
-dspark.ui.filters=com.test.filter1
-dspark.com.test.filter1.params=’param1=foo,param2=testing’
<code>spark.ui.view.acls</code>
逗号分隔的用户列表,在该列表中的用户可以查看spark web ui。默认,只有启动该spark作业的用户可以查看之。如果需要让所有用户都能查看,只需将该配置设为”*”
<code>spark.ssl.enabled</code>
是否启用ssl连接(在所有所支持的协议上)。所有ssl相关配置(spark.ssl.xxx,其中xxx是一个特定的配置属性),都是全局的。如果需要在某些协议上覆盖全局设置,那么需要在该协议命名空间上进行单独配置。使用 spark.ssl.yyy.xxx 来为协议yyy覆盖全局配置xxx。目前yyy的可选值有 akka(用于基于akka框架的网络连接) 和 fs(用于应广播和文件服务器)
<code>spark.ssl.enabledalgorithms</code>
<code>spark.ssl.keypassword</code>
在key-store中私匙对应的密码。
<code>spark.ssl.keystore</code>
key-store文件路径。可以是绝对路径,或者以本组件启动的工作目录为基础的相对路径。
<code>spark.ssl.keystorepassword</code>
key-store的密码。
<code>spark.ssl.protocol</code>
<code>spark.ssl.truststore</code>
trust-store文件路径。可以是绝对路径,或者以本组件启动的工作目录为基础的相对路径。
<code>spark.ssl.truststorepassword</code>
trust-store的密码
<code>spark.streaming.backpressure.enabled</code>
是否启用spark streaming 的内部反压机制(spark 1.5以上支持)。启用后,spark streaming会根据当前批次的调度延迟和处理时长来控制接收速率,这样一来,系统的接收速度会和处理速度相匹配。该特性会在内部动态地设置接收速率。该速率的上限将由 spark.streaming.receiver.maxrate 和 spark.streaming.kafka.maxrateperpartition 决定(如果它们设置了的话)。
<code>spark.streaming.blockinterval</code>
200ms
<code>spark.streaming.receiver.maxrate</code>
not set
<code>spark.streaming.receiver.writeaheadlog.enable</code>
<code>spark.streaming.unpersist</code>
是否强制spark streaming 自动从内存中清理掉所生成并持久化的rdd。同时,spark streaming收到的原始数据也将会被自动清理掉。如果设置为false,那么原始数据以及持久化的rdd将不会被自动清理,以便外部程序可以访问这些数据。当然,这将导致spark消耗更多的内存。
<code>spark.streaming.stopgracefullyonshutdown</code>
如果设为true,spark将会在jvm关闭时,优雅地关停streamingcontext,而不是立即关闭之。
<code>spark.streaming.kafka.maxrateperpartition</code>
<code>spark.streaming.kafka.maxretries</code>
驱动器连续重试的最大次数,这个配置是为了让驱动器找出每个kafka分区上的最大offset(默认值为1,意味着驱动器将最多尝试2次)。只对新的kafka direct stream api有效。
<code>spark.streaming.ui.retainedbatches</code>
1000
spark streaming ui 以及 status api 中保留的最大批次个数。
<code>spark.r.numrbackendthreads</code>
2
sparkr rbackend处理rpc调用的后台线程数
<code>spark.r.command</code>
rscript
集群模式下,驱动器和worker上执行的r脚本可执行文件
<code>spark.r.driver.command</code>
spark.r.command
client模式的驱动器执行的r脚本。集群模式下会忽略
每个集群管理器都有一些额外的配置选项。详细请参考这里:
有些spark设置需要通过环境变量来设定,这些环境变量可以在${spark_home}/conf/spark-env.sh脚本(windows下是conf/spark-env.cmd)中设置。如果是独立部署或者mesos模式,这个文件可以指定机器相关信息(如hostname)。运行本地spark应用或者submit脚本时,也会引用这个文件。
注意,conf/spark-env.sh默认是不存在的。你需要复制conf/spark-env.sh.template这个模板来创建,还有注意给这个文件附上可执行权限。
以下变量可以在spark-env.sh中设置:
环境变量
<code>java_home</code>
java安装目录(如果没有在path变量中指定)
<code>pyspark_python</code>
驱动器和worker上使用的python二进制可执行文件(默认是python)
<code>pyspark_driver_python</code>
仅在驱动上使用的python二进制可执行文件(默认同pyspark_python)
<code>sparkr_driver_r</code>
sparkr shell使用的r二进制可执行文件(默认是r)
<code>spark_local_ip</code>
本地绑定的ip
<code>spark_public_dns</code>
spark程序公布给其他机器的hostname
spark-env.sh是一个shell脚本,因此一些参数可以通过编程方式来设定 – 例如,你可以获取本机ip来设置spark_local_ip。
默认spark配置目录是”${spark_home}/conf”,你也可以通过 ${spark_conf_dir}指定其他目录。spark会从这个目录下读取配置文件(spark-defaults.conf,spark-env.sh,log4j.properties等)
如果你打算用spark从hdfs读取数据,那么有2个hadoop配置文件必须放到spark的classpath下:
hdfs-site.xml,配置hdfs客户端的默认行为
core-site.xml,默认文件系统名
这些配置文件的路径在不同发布版本中不太一样(如cdh和hdp版本),但通常都能在 ${hadoop_home}/etc/hadoop/conf目录下找到。一些工具,如cloudera manager,可以动态修改配置,而且提供了下载一份拷贝的机制。
要想让这些配置对spark可见,请在${spark_home}/spark-env.sh中设置hadoop_conf_dir变量。