天天看点

OpenMLDB: 一文了解窗口倾斜优化技术细节

​​openmldb​​是针对ai场景优化的开源数据库项目,实现了数据与计算一致性的离线mpp场景和在线oltp场景计算引擎。mpp引擎可基于spark实现,并通过拓展spark源码实现数倍性能提升。本文主要解释​​openmldb​​如何基于spark来解决窗口数据的倾斜问题。

数据倾斜是在大数据处理场景下常见的一种现象,它由某一分区数据量过大造成。数据倾斜会导致倾斜分区与其他分区的运算时间产生巨大差距,换句话说就是倾斜数据分区的计算任务与其cpu资源严重不匹配。最终会造成多等一的情况——多个小数据量的分区计算完毕后等待倾斜的大数据量分区,只有倾斜分区计算完毕才能输出结果。这对效率来说是巨大的灾难。

在机器学习的特征计算中,涉及到很多的窗口计算。在窗口计算下,如果出现单一key数据量过大,也会导致某一分区数据过多,从而产生数据倾斜问题。而传统数据倾斜中分区优化的方案,如:数据加前缀再分区,是不适合窗口计算场景的。它会导致窗口计算场景下最终计算结果错误。因此openmldb提出了一种基于spark的窗口数据倾斜分区优化方案——在扩充窗口数据后,再根据分区键以及时间片对倾斜数据进行再分区。

OpenMLDB: 一文了解窗口倾斜优化技术细节

在上图的数据中,因为主键“gender”只有两个值,离线计算最好情况下只能将数据划分到两个partition,即并行度只有2。此时同样的分区资源,计算任务的数据量差距却很大。在后续的计算中,“male”所在的分区计算的时间必然比“female”所在分区计算的时间大。当倾斜分区数据量变大的时候,这个时间差距还会被不断拉大。且由于spark的底层执行里每个partition只有一个thread,这使得整个stage周期里只有两个thread在工作,还有很多其他的thread一直处于空闲状态,这也会导致严重的性能浪费。

对于倾斜数据的优化,解决根本问题的方法就是对倾斜数据进行再分区,把原本一个倾斜分区内庞大的数据块,分散成多个小的数据分区。以此来达到对大数据进行拆分从而提高计算效率的目的。

在常见的数据再分区策略中,有通过分区键加上不同前缀从而进行再分区的策略,也有通过多加几列作为分区键进行再分区的策略。但是这些简单的再分区方案,在窗口计算中,都会造成计算错误。

如果采用数据加前缀再分区的简单分区优化方案,原本同一个partition下的数据会被拆分到不同的partition。而窗口计算涉及到数据之间滑动取值的情况,因此如果只是简单的将分区内的数据再拆分,窗口计算将无法取到原本相邻的数据,这会导致最终计算结果的错误。

OpenMLDB: 一文了解窗口倾斜优化技术细节

整体思路

我们的方案总体思路是在上述倾斜数据再分区的基础上,进一步保证各个再分区的数据块在窗口计算时结果正确。方案里采用的方式是在每个再分区的数据块中,根据窗口需要滑动的数据条数,进行一定的窗口数据扩充。

在优化中,总体上采用的就是再分区+窗口补充的repartition策略来对数据进行分区。思路是采用空间换时间的策略,优点是计算时间短性能高,缺点是补充的窗口数据会造成一定的数据冗余,导致占用更多内存。

下面详细介绍本方案的技术细节,倾斜优化方案具体的实现主要分为五步,以下面sql为例。

OpenMLDB: 一文了解窗口倾斜优化技术细节

这一步需要对总体的数据做一个评估,统计出一些相关的指标,比如数据划分的分界线,以及partition内数据的条数等。参数介绍如下。

<col>

参数名

解释

quantile

对于数据的拆分是通过传入的“quantile”参数来确定的,并且我们采用的是n等分的机制,quantile = 4代表了四等分(不一定能保证严格四等分)。

根据“quantile”参数,我们就可以划分出来不同值的分界线“percentile_i”,根据数据相对于分界线的值可以划分出不同的数据块。

precentile

根据(“time“)列(sql中窗口里order by的值)划分数据块的分界线,percentile_i为第i条分界线,(”time“)列符合(percentile_i,percentile_i+1] 的数据为第i个数据块。

特殊情况:第一个数据块为(0,percentile_1],最后一块为(percentile_n,无穷大)

总体来说,第一步的数据评估是对数据各项指标进行统计和计算,并在统计后,对数据进行判断以及处理,但由于涉及到全量数据的遍历,会比较耗时。对此我们也有一个额外的优化,我们支持通过读取提前预处理好的distribution表来跳过第一步中统计的部分。这样就可以在凌晨或者不需要处理业务时,执行统计任务,将数据结果统计完成,来避免用户需要执行处理逻辑时,在第一步等待时间太久。

OpenMLDB: 一文了解窗口倾斜优化技术细节
OpenMLDB: 一文了解窗口倾斜优化技术细节

这一步根据distribution table中对数据的统计结果,来对数据进行划分,并对划分后的数据打上(“part_id”)和(“expanded_row"),作为不同数据块重分区后的分区标号以及是否为扩充数据的标记。

在最开始的join中,我们采用了broadcast join,来提升join时的效率。broadcast join是spark中一种可以避免shuffle的join,一般一张大表和一张小表进行join时可以使用broadcast join,它是通过将小表的数据广播到每个executor计算节点上,再通过map聚合的方式,来避免了数据的shuffle。在我们的表中,distribution table比input table小很多,因此刚好可以采用broadcast join。

在join之后,可以得到数据分界线,且当percentile_i为第i条分界线时,符合(percentile_i,percentile_i+1] 的数据就为第i个数据块,采用固定策略划分完结果之后。就可以根据划分结果,生成新的分区标号——“part_id”。表数据介绍如下。

列名

part_id

代表了再分区的id,在addcolumntable中,“part_id”+分区键相同的行,就同属于一个新的partition,如id = 1和id = 3这两行同属于一个分区。

expanded_row

代表了当前行是否是扩充的窗口数据,默认值为false。在下述步骤中,新扩充的窗口数据此列的值为true。

对窗口数据进行扩充是openmldb关于窗口倾斜优化中,比较核心的部分。由于数据较多,为了便于理解,下面只展示“male”部分数据。

具体实现时,我们对每个需要扩充的数据块进行全体窗口数据的扩充,即通过遍历,对每个需要扩充数据的重分区数据块都扩充到第一条数据。过程图解如下,深色代表当前遍历的分区,浅色代表当前分区需要补充的窗口数据。

1.过滤出需要扩充的数据

对于time为1和3,“part_id" = 1的第一个重分区数据块,由于是时间最先的数据块,上面已经没有数据可以给他们补充了,因此会跳过。

OpenMLDB: 一文了解窗口倾斜优化技术细节

对于time为5,“part_id" = 2的第二个重分区数据块,会将所有时间比当前数据块前的数据都取出来,也就是“part_id" = 1的数据块。

OpenMLDB: 一文了解窗口倾斜优化技术细节

对于time为7,“part_id" = 3的第三个重分区数据块同理,将所有时间比当前数据块前的数据都取出来,也就是取第一个和第二个数据块作为扩充的窗口数据。

OpenMLDB: 一文了解窗口倾斜优化技术细节

后续第四个重分区数据块也同上,将所有需要的数据取出,因此不再赘述。

2.更改过滤数据的id并进行union

将数据取出来之后,我们还需要将(“expanded_row")改成true,代表是扩充的窗口数据。改完(“expanded_row")之后,只需要不断的和原来的addcolumn table进行union,我们就完成了一个数据块的窗口数据扩充。以第二块数据块为例子,下图union table中,不同颜色代表不同的重分区数据块,可以看到经过filter和union,第二块数据块已经扩充好了数据。

OpenMLDB: 一文了解窗口倾斜优化技术细节

对于其他数据块窗口扩充的方式和第二块数据块方式的思路一样,在过滤以及扩充完后,再和之前的union表进行unoin即可。

下面展示最终第四块数据块扩充完窗口上数据后得到的最终union table。

OpenMLDB: 一文了解窗口倾斜优化技术细节

虽然之前我们通过不同色块来标记不同的再分区数据,但实际上,到了第四步,我们才真正的对数据进行了重分区,底层我们依赖了spark中的repartition函数进行数据重分区。在第三步后,我们可以得到最终的union table,此时只需要根据分区键(“gender”)和(“part_id")进行repartition,就可以将数据拆分到不同的executor上。

OpenMLDB: 一文了解窗口倾斜优化技术细节

在第三步中,我们知道那些"expanded_row" = false的数据列是新补充进来的窗口数据,而且在实际计算中,他们是不需要参与计算的。因此只需要对"expanded_row" = true的数据进行窗口计算,最终便可得到计算结果。

OpenMLDB: 一文了解窗口倾斜优化技术细节

值得特别说明的是,由于openmldb底层处理引擎是自主研发设计的,因此窗口计算的内部逻辑也是由openmldb实现的。下面贴出相关代码进行讲解。

对于第四步生成的repartitiondf,我们在外层调用了spark的mappartitionswithindex方法。之后对于每个分区,openmldb都构建一个computer计算单元,用来处理接下来的窗口计算。之后则是正式进行窗口计算,调用windowaggiter方法。

在windowaggiter方法里,我们对传进来的迭代器inputiter进行了flatmap操作,之后再检查是否分区内数据有没有分错,如果有分错的row,则会对window进行重新设置。接下来检查orderkey没有问题后,会对expandedflag也就是上图中的(“expanded_row")作判断,如果为true,则证明当前row是扩充的数据,因此computer计算单元只进行bufferowonly操作,缓存扩充的窗口数据进内存,为之后真实需要计算的数据使用。如果为false,此时expandedflag也为false,computer计算单元就进行真正的计算compute,在compute方法里会读取之前缓存的数据并进行计算,之后会返回处理完成的row。compute方法内部是由c实现的,有兴趣的同学可以去查看​​openmldb​​里相关源码。

benchmark性能测试使用kaggle公开数据集,也就是new york city taxi trip duration竞赛的数据集,使用测试的sql语句如下:

对比开源版本sparksql以及开源版本openmldb进行测试,测试结果如下。

计算引擎

计算耗时

sparksql(spark 3.0.0)

950.98s

openmldb,未开启倾斜优化

224.76s

openmldb,开启倾斜优化,倾斜分区数2

140.74s

openmldb,开启倾斜优化,倾斜分区数4

94.44s

可以看到,openmldb引擎即使在不开启倾斜优化的情况下,在不同的倾斜比例中,相对于spark引擎,仍然有4倍以上的性能提升,这种性能提升主要是通过openmldb底层高效的引擎实现来保证的。而openmldb在开启了窗口倾斜优化之后,通过调整不同的再分区数,相比openmldb不开启倾斜优化也还能提升大约60%~140%的性能。

openmldb通过扩充窗口数据加上数据再分区的策略,实现了窗口计算下数据倾斜的优化。策略总体上采用了空间换时间的思想,即将原本集中在一个分区中的倾斜数据,在存储空间上进行窗口数据的扩充,之后再将数据分散至多个分区并行计算,从而增加计算的并行度,来换取更短的计算时间,并在最终实现了效率的大幅提升。此外在数据测试中,我们发现越在极端的倾斜分布下,openmldb越有更好的表现。总的来说,对于窗口计算下的数据倾斜场景,openmldb实现的数据倾斜优化有着不错的效果。

本文介绍了常见的滑动窗口数据倾斜问题,并且剖析了openmldb解决数据倾斜的实现方案以及展示最终的性能优化结果。如果你对spark优化、大规模特征计算、openmldb数据库等感兴趣,我们会分享更多类似的技术文章,欢迎大家继续关注 ​​openmldb专栏​​ 。

继续阅读