天天看点

Spark结合源码解决数据倾斜造成Too Large Frame

新公司遇到的第一个spark的坑,寻找原因的过程其实还挺有意思,最终在源码和spark ui上的统计数据的帮助下找到根源,具体如下。

先说下问题

由于严重的数据倾斜,大量数据集中在单个task中,导致shuffle过程中发生异常

完整的exeception是这样的

Spark结合源码解决数据倾斜造成Too Large Frame

但奇怪的是,经过尝试减小executor数量后任务反而成功,增大反而失败,经过多次测试,问题稳定复现。

成功的executor数量是7,失败的则是15,集群的active node是7

这结果直接改变了认知,也没爆内存,cpu也够,怎么会这样,executor数量应该越多越快啊,居然还失败了。

解决过程

这个数在几个失败里不一样,但是都超过了integer.maxvalue。在spark源码中,这条异常发生在transportframedecoder这个类中

Spark结合源码解决数据倾斜造成Too Large Frame

检查发现是frame的大小超过了max_frame_size,而max_frame_size的大小定义如下

Spark结合源码解决数据倾斜造成Too Large Frame

这个transportframedecoder继承自channelinboundhandleradapter,这是netty里的类,好了,看到这就明白了,原来错误发生在网络传输过程中,是数据量超大了。

但是对比了成功与失败的任务,都是单个task严重倾斜啊。再看下两个任务的executor分配。

失败的任务

Spark结合源码解决数据倾斜造成Too Large Frame

成功的任务

Spark结合源码解决数据倾斜造成Too Large Frame

失败的任务里,分配到的节点上都有多个executor;成功的任务里则每个节点只有一个executor。

再看下stage,失败的任务失败在stage26,这个stage依赖于stage24。看图说话

Spark结合源码解决数据倾斜造成Too Large Frame

两个任务的stage24都是成功的,看下24的executor的数据量情况

Spark结合源码解决数据倾斜造成Too Large Frame
Spark结合源码解决数据倾斜造成Too Large Frame

可以看到,两个任务在这个stage上由于数据倾斜,所有数据输入输出都在一个executor中完成。但在stage26中,区别来了

为了提升性能,在hadoop和spark中都会尽量选择数据本地性,尽量让数据local,不行再选择rack等其他方案。而24的输出会作为26的输入。所以24之后自然会选择相同节点上的executor,看下stage26的情况

Spark结合源码解决数据倾斜造成Too Large Frame
Spark结合源码解决数据倾斜造成Too Large Frame

在成功的任务里,stage26与24的executor完全是同一个,这样数据是完全本地化的,甚至是同一个进程,因而经过优化不再需要通过网络传输

而在失败的任务里,stage26在执行时发现这个node上有3个executor,为了性能的提升,将数据分配给3个executor执行计算。可见其中也成功了一半,32686这个端口的executor是24中执行的那个,因而虽然它要处理3.3g的数据,但是因为不需要网络传输,也仍然可以成功。可是对于另外两个,即使是同一个节点,但是由于是不同进程,仍然需要通过netty的server拉取数据,而这一次的拉取最大不能超过int最大值,因而失败了一个,导致整个stage失败,也就导致了整个job的失败。

总结

由此可见在数据极度倾斜的情况下,增大executor的数量未见得是好事,还是要根据具体情况而来。减小了数量解决了问题,但是这其实并不是最好的解决方案,因为这种情况下,可见数据基本等同于本地执行,完全浪费了集群的并发性,更好的解决方案还需要再继续深入理解。

继续阅读