借用和董神的一段对话说下背景:
<b>shuffle共有三种,别人讨论的是hash shuffle,这是最原始的实现,曾经有两个版本,第一版是每个map产生r个文件,一共产生mr个文件,由于产生的中间文件太大影响扩展性,社区提出了第二个优化版本,让一个core上map共用文件,减少文件数目,这样共产生corer个文件,好多了,但中间文件数目仍随任务数线性增加,仍难以应对大作业,但hash shuffle已经优化到头了。为了解决hash shuffle性能差的问题,又引入sort shuffle,完全借鉴mapreduce实现,每个map产生一个文件,彻底解决了扩展性问题</b>
目前sort based shuffle 是作为默认shuffle类型的。shuffle 是一个很复杂的过程,任何一个环节都足够写一篇文章。所以这里,我尝试换个方式,从实用的角度出发,让读者有两方面的收获:
剖析哪些环节,哪些代码可能会让内存产生问题
控制相关内存的参数
有时候,我们宁可程序慢点,也不要oom,至少要先跑步起来,希望这篇文章能够让你达成这个目标。
同时我们会提及一些类名,这些类方便你自己想更深入了解时,可以方便的找到他们,自己去探个究竟。
spark 的shuffle 分为 write,read 两阶段。我们预先建立三个概念:
write 对应的是shufflemaptask,具体的写操作externalsorter来负责
read 阶段由shufflerdd里的hashshufflereader来完成。如果拉来的数据如果过大,需要落地,则也由externalsorter来完成的
所有write 写完后,才会执行read。 他们被分成了两个不同的stage阶段。
也就是说,shuffle write ,shuffle read 两阶段都可能需要落磁盘,并且通过disk merge 来完成最后的sort归并排序。
shuffle write 的入口链路为:
会产生内存瓶颈的其实就是 org.apache.spark.util.collection.externalsorter。我们看看这个复杂的externalsorter都有哪些地方在占用内存:
第一个地:
我们知道,数据都是先写内存,内存不够了,才写磁盘。这里的map就是那个放数据的内存了。
这个partitionedappendonlymap内部维持了一个数组,是这样的:
也就是他消耗的并不是storage的内存,所谓storage内存,指的是由blockmanager管理起来的内存。
partitionedappendonlymap 放不下,要落地,那么不能硬生生的写磁盘,所以需要个buffer,然后把buffer再一次性写入磁盘文件。这个buffer是由参数
控制的。数据获取的过程中,序列化反序列化,也是需要空间的,所以spark 对数量做了限制,通过如下参数控制:
spark.shuffle.spill.batchsize=10000
假设一个executor的可使用的core为 c个,那么对应需要的内存消耗为:
这么看来,写文件的buffer不是问题,而序列化的batchsize也不是问题,几万或者十几万个record 而已。那c * partitionedappendonlymap 到底会有多大呢?我先给个结论:
怎么得到上面的结论呢?核心店就是要判定partitionedappendonlymap 需要占用多少内存,而它到底能占用内存,则由触发写磁盘动作决定,因为一旦写磁盘,partitionedappendonlymap所占有的内存就会被释放。下面是判断是否写磁盘的逻辑代码:
每放一条记录,就会做一次内存的检查,看partitionedappendonlymap 到底占用了多少内存。如果真是这样,假设检查一次内存1ms, 1kw 就不得了的时间了。所以肯定是不行的,所以 estimatesize其实是使用采样算法来做的。
第二个,我们也不希望maybespill太耗时,所以 maybespill 方法里就搞了很多东西,减少耗时。我们看看都设置了哪些防线
首先会判定要不要执行内部逻辑:
每隔32次会进行一次检查,并且要当前partitionedappendonlymap currentmemory > mymemorythreshold 才会进一步判定是不是要spill.
其中 mymemorythreshold可通过如下配置获得初始值
接着会向 shufflememorymanager 要 2 * currentmemory - mymemorythreshold 的内存,shufflememorymanager 是被executor 所有正在运行的task(core) 共享的,能够分配出去的内存是:
上面的数字可通过下面两个配置来更改:
如果无法获取到足够的内存,就会触发真的spill操作了。
看到这里,上面的结论就显而易见了。
然而,这里我们忽略了一个很大的问题,就是
为什么说它是大问题,前面我们说了,estimatesize 是近似估计,所以有可能估的不准,也就是实际内存会远远超过预期。
具体的大家可以看看 org.apache.spark.util.collection.sizetracker
我这里给出一个结论:
如果你内存开的比较大,其实反倒风险更高,因为estimatesize 并不是每次都去真实的算缓存。它是通过采样来完成的,而采样的周期不是固定的,而是指数增长的,比如第一次采样完后,partitionedappendonlymap 要经过1.1次的update/insert操作之后才进行第二次采样,然后经过1.1*.1.1次之后进行第三次采样,以此递推,假设你内存开的大,那partitionedappendonlymap可能要经过几十万次更新之后之后才会进行一次采样,然后才能计算出新的大小,这个时候几十万次更新带来的新的内存压力,可能已经让你的gc不堪重负了。
当然,这是一种折中,因为确实不能频繁采样。
如果你不想出现这种问题,要么自己替换实现这个类,要么将
设置的更小一些。
shuffle read 内存消耗分析
shuffle read 的入口链路为:
shuffle read 会更复杂些,尤其是从各个节点拉取数据。但这块不是不是我们的重点。按流程,主要有:
获取待拉取数据的迭代器
使用appendonlymap/externalappendonlymap 做combine
如果需要对key排序,则使用externalsorter
其中1后续会单独列出文章。3我们在write阶段已经讨论过。所以这里重点是第二个步骤,combine阶段。
如果你开启了
则使用externalappendonlymap,否则使用appendonlymap。两者的区别是,前者如果内存不够,则落磁盘,会发生spill操作,后者如果内存不够,直接oom了。
这里我们会重点分析externalappendonlymap。
externalappendonlymap 作为内存缓冲数据的对象如下:
如果currentmap 对象向申请不到内存,就会触发spill动作。判定内存是否充足的逻辑和shuffle write 完全一致。
combine做完之后,externalappendonlymap 会返回一个iterator,叫做externaliterator,这个iterator背后的数据源是所有spill文件以及当前currentmap里的数据。
我们进去 externaliterator 看看,唯一的一个占用内存的对象是这个优先队列:
mergeheap 里元素数量等于所有spill文件个数加一。streambuffer 的结构:
其中iterator 只是一个对象引用,pairs 应该保存的是iterator里的第一个元素(如果hash有冲突的话,则为多个)
所以mergeheap 应该不占用什么内存。到这里我们看看应该占用多少内存。依然假设 corenum 为 c,则
所以这一段占用内存较大的依然是 sizetrackingappendonlymap ,一样的,他的值也符合如下公式
externalappendonlymap 的目的是做combine,然后如果你还设置了order,那么接着会启用 externalsorter 来完成排序。
经过上文对shuffle write的使用,相比大家也对externalsorter有一定的了解了,此时应该占用内存的地方最大不超过下面的这个值:
不过即使如此,因为他们共享一个shufflememorymanager,则理论上只有这么大:
分析到这里,我们可以做个总结:
shuffle read阶段如果内存不足,有两个阶段会落磁盘,分别是combine 和 sort 阶段。对应的都会spill小文件,并且产生读。
shuffle read 阶段如果开启了spill功能,则基本能保证内存控制在 executorheapmemeory * 0.2 * 0.8 之内。