本文以spark实践经验和spark原理为依据,总结了spark性能调优的一些方法。这些总结基于spark-1.0.0版本。对于最近推出的spark-1.1.0版本,本文介绍了几个版本增强。
executor是一个独立的jvm进程,每个任务会有独立的线程来执行,executor最大可并发任务数量与其拥有的核心数量相同,执行过程中的数据缓存放在executor的全局空间中。根据以上我们可以得出:
同一个executor中执行的任务,可以共享同一个数据缓存。这也是spark称之为process local级别的数据本地性。
executor可并发执行的任务数量,与其所拥有的核心数相同。
并发任务之间可能会产生相互干扰,如有些任务占用内存较大会导致其他并发任务失败。
executor都需要注册到driver上并与其通信,过多的executor数量会增加driver负担。
在阶段划分为任务时,会得到与分区数相同的任务数量。减少分区的数量将减少任务数,同时每个任务所处理的计算量会增大。考虑到任务本身的序列化,发送,运行环境准备,结果收集都需要占用driver资源和executor资源,减少任务数能够减少此类开销。
在实践中,每个executor可以配置多个核心,从而降低executor数量,还可以得到更好的数据本地性。根据所配置的核心数量与分区数据量,可以估计出executor所需最小内存 = 并发任务数 单分区大小 + 内存缓存分区数 单分区大小。分区数的配置与具体业务逻辑相关,为了将计算资源充分利用,可以参考:分区数 并发job数 >= executor数 executor核心数。其中并发job数是rdd在调用动作(action)类型的操作时产生的job,job之间的阶段是没有依赖关系的因此可并发执行。
参数项
默认值
参数解释
spark.executor.instances
无
一个application拥有的executor数量
spark.executor.cores
1
单个executor可用核心数
spark.executor.memory
512m
单个executor最大内存
spark配置项
rdd的persist函数用于在计算中避免重复计算。如果有一个rdd会被不同的job调用,那一定要用persist进行缓存,避免这个rdd的重复计算。如果是缓存到内存中的话,计算完成后一定要记得调用unpersist释放缓存,以免造成内存无法回收。
调用了rdd的persist后,将会在rdd第一次被计算时将其计算结果写入缓存,缓存级别由storage_level决定,缓存的写入是通过blockmanager进行的,缓存信息也会同步到blockmanagermaster中。在rdd被再次计算时,将首先到blockmanagermaster中检查是否有缓存,如果缓存在其他blockmanager中则先传输到本地再使用。
任务的数据本地性与缓存位置关系紧密,在任务被创建时会确定其preferredlocs,在tasksetmanager中再根据preferredlocs确定任务的数据本地性。在计算preferredlocs时,首先会将task所在分区的缓存位置作为优先位置。若无缓存则将rdd指定的preferredlocs作为优先位置,这类rdd一般为数据源类型的rdd。若以上都没有,则将rdd的窄依赖中的第一个依赖的第一个分区所在位置作为优先位置。
spark的数据本地性分四个级别:process_local:同一executor, node_local:同一机器, rack_local:同一机架, any:其他。在实践中,一些分区的体积较大,如果产生了node或rack级别的任务,则缓存中的分区数据要在executor之间传输,这种传输过程不仅占用网络带宽,而且有可能把新executor的内存占满,导致oom。最后的结果就是传输的时间较长,还有可能导致executor崩溃。因此在分区数据量较大时并不建议降低数据本地性。如果driver的日志中出现了较多的node_local或rack_local,同时伴随计算性能下降,那么可以尝试可以将数据本地性的降级等待时间增大,甚至增大到只用process_local。
spark.locality.wait
3000(毫秒)
数据本地性降级的等待时间
spark.locality.wait.process
多长时间等不到process_local就降级
spark.locality.wait.node
多长时间等不到node_local就降级
spark.locality.wait.rack
多长时间等不到rack_local就降级
rdd中有个可选实现的方法是getpreferredlocations(split: partition): seq[string],这个方法用来返回指定分区的优先加载位置,一般指的是离分区的数据源最近的位置。例如以hdfs为数据源的话,则可设置为hdfs文件分区所在的节点。我们在实现自己的rdd时,若有可能最好实现此方法,以保证任务的数据本地性。
在taskset级别spark提供了fifo和fair两种调度模式,fifo模式根据job的先后顺序和stage的先后顺序选择taskset提交。fair模式可以配置多个pool,每个pool有自己的weight和minshare,其中weight是pool的优先级,minshare保证pool最少有几个核心可用。实践中fifo模式在多个job并发的情况下会导致有些job等待时间过长,而fair模式表现良好。
spark.scheduler.mode
fifo
在spark提供的spark-submit作业提交脚本中,如果使用yarn作为资源管理器,则可以使用yarn-client和yarn-cluster两种master模式。其中yarn-client以应用本身为driver,yarn-cluster会将driver提交到yarn集群的其中一个节点上运行。由于driver与executor之间有频繁的通信,因此driver的位置最好在集群内。也就是说如果使用yarn-client的话,要能保证应用在集群内,而yarn-cluster模式本身就保证了driver在集群内。
有些应用的使用场景需要在一个sparkcontext上持续的提交计算任务,这个sparkcontext的生命周期可能会非常长。在这种情况下使用yarn-cluster模式不能实现持续提交计算任务,而且应用与driver的交互较困难。这种情况便适用于采用 yarn-client模式。
sparkconf.setmaster()
有些场景需要一个sparkcontext持续接收计算任务,这种场景往往对计算任务的时效性要求较高(秒级别),并且可能会有并发的计算任务(如多用户提交任务)。这种场景适合采用yarn-client模式,让driver位于应用内部,应用可以不断向driver提交计算任务,并处理返回结果。这种模式的潜在风险在于driver和executor都会长时间持续运行,可能会有内存泄露的问题。
在实践中,在rdd被persist缓存到内存后,调用unpersist并不能立即释放内存,而是会等待垃圾回收器对其进行回收。在垃圾回收器的选择上,建议使用cms类型的垃圾回收器,用于避免垃圾回收过程中的顿卡现象。
在driver和executor的垃圾回收不出问题的情况下,还是可以得到稳定的计算任务性能的。但如果某些情况下计算性能还是随时间推移而下降,则可以重启sparkcontext以解决问题。因为重启sparkcontext后driver和executor都会全新创建,因此能回到最初的性能。重启的方法是在当前所有任务都完成后,在应用中调用sparkcontext.stop()方法,并移除sparkcontext引用,然后创建新的sparkcontext。
driver在启动时需要将spark的jar包上传到集群,用于启动每个executor。这个jar包的大小约130m。executor在接收任务时,会将任务所依赖的文件、jar包传输到本地,这里的jar包是应用包,一般包含了应用的各类依赖一般也得100m,jar包分发的耗时在10秒左右。在对计算任务时效性要求较高的场景,jar包分发的10秒将是无法接受的。在这里可以采用预先分发的方式解决此问题。我们首先将spark jar和应用jar上传到各个节点的某个相同位置,例如/root/sparkjar。
避免driver启动时分发jar包:
将driver机上的spark_jar环境变量设置为空,避免jar包上传动作。
在yarn-site.xml配置文件中,设置yarn.application.classpath为spark jar的位置与此项默认值。
避免task启动时分发依赖和jar包:
将spark.files和spark.jars中的路径配置为local:/root/sparkjar的模式,从而让executor从本地复制。
spark中的任务传输,任务结果收集,shuffle过程,磁盘级缓存,广播变量的传输等过程均会用到序列化方法。这里有个规律是凡是涉及到网络传输和磁盘缓存的操作,均是先序列化写入,再读出后反序列化的。spark提供了两种序列化方法即javaserializer(默认)和kryoserializer。根据spark的官方说法,kryoserializer性能相对好(10倍于javaserializer),但对部分serializable类型的类不支持,对于不支持的类需要自己写序列化的实现。
spark.serializer
org.apache.spark.serializer.javaserializer
spark1.0.0提供的shuffle过程中,如果有c个核心,m个mapper,有r个reducer,则每个mapper将产生r个文件,总计m r个文件。如果使用shuffle files consolidate的话会好一些,会产生总计min(c, m) r个文件。现在假设r的数量比较大(如1万个),则每个mapper将要把输入写入1万个文件中,过程中将会导致较高的磁盘io,在写入过程中的数据压缩、序列化过程也会占用大量内存。
sort-based shuffle使每个mapper的输出只写入到一个文件中,这个文件中的记录是排序过的,因此reducer可以根据记录的起止范围进行读取。为此sort-based shuffle在写入文件时将会创建一个索引文件,用于记录每个分区的起止范围。
spark.shuffle.manager
hash
spark.shuffle.consolidatefiles
false
是否合并多个mapper的shuffle文件
spark-1.1.0之前的广播变量工厂使用的是httpbroadcastfactory,即所有executor都从driver获取广播变量的值。spark-1.1.0加入了torrentbroadcastfactory并将其设置为默认,这种广播变量的模式类似于bt下载。driver会把变量分为数据块存放,executor取值时也会按数据块提取并合并,但取值的位置会随机的从拥有数据块的executor上提取,从而形成了bt下载的模式,可大大减轻driver的负载。
spark.broadcast.factory
org.apache.spark.broadcast.torrentbroadcastfactory
广播变量的实现方式
spark-1.1.0的任务提交处有个细微的优化,即把任务分为公共部分(rdd,依赖关系,分区处理函数)和非公共部分(stageid,分区id,优先位置)。任务的公共部分序列化后作为广播变量传输到executor,非公共部分序列化后分别传输。从而避免了公共部分在同一executor上的多次传输。参见dagscheduler. submitmissingtasks()。
感谢淘宝技术部-数据挖掘与计算团队提供的优化建议。感谢odps团队提供的优化建议。