天天看点

离线计算平台系列之一离线计算平台简介SparkContext 管理Spark程序调优手段To Be Continued

在蚂蚁金服风控体系里面,有一个重要的环节就是离线仿真,在规则,模型上线之前,在离线的环境里面进行仿真验证,来对规则和模型进行效能的评估,避免人为因素造成不准确性从而造成的资损。起初为了达到这个目的,离线计算平台就这样孕育而生了,慢慢地整个离线平台覆盖了更多风控的业务,也慢慢变成目前odps-spark最大的用户,拥有的集群数目也是最大的。离线计算平台主要以spark为基础,在其上建立起来的一套平台. 后面我们团队会给大家带来一系列,关于离线平台的架构以及我们做过相应业务以及经验,希望和大家一起来探讨。

下面由我来给大家分享下,我们整个团队建立起离线计算平台里面的sparkcontext管理以及几个spark优化手段。

在我们的离线业务场景里面,我们需要持续地接受用户提交实验任务进行分析以及运算,所以对于spark的要求第一点就是需要long-live的sparkcontext, 可以持续接受任务提交,而不是类似d2那样提交一次性的任务。所以我们以目前odps-spark client模式为基础,建立了一套自己的sparkcontext 管理系统,提供了动态新增,删除sparkcontext功能,可以持续地提交任务,调度相应任务的功能。

离线计算平台系列之一离线计算平台简介SparkContext 管理Spark程序调优手段To Be Continued

由于sparkcontext需要一个独立的jvm进程服务,我们目前是利用rmi来完成启动sparkcontext的工作。 在应用服务器上面,当它被通知自己需要启动sparkcontext的时候,就会在进行相应的spark jar准备,然后进行打包,并且启动相应的rmi进程来当做sparkcontext. 在任务执行的流程中,前端系统会向离线系统提交任务,离线系统就有一套自己的任务调度系统,根据目前管理的sparkcontext里面选择一个相对比较空闲的,进行任务提交,否则就进行等待。当任务提交到rmi server之后,就会对任务进行相应的组装以及转化成spark可以执行的任务,进行执行并且同spark cluster进行交互。最终spark任务结果会同步地返回给rmi server, 然后再通过rmi server异步的通知到应用服务器,并且进行任务的进一步处理。

经验分享:控制好每个sparkcontext的并发度,充分利用sparkcontext的能力(目前我们一个sparkcontext上面并发3个任务,后续会引入动态任务大小的评估,从而实现真正的动态任务提交以及资源分配),统一管理每个sparkcontext下面的并发线程,而且不是每个任务都去创建线程池,特别注意从rmi server返回给离线系统数据,不能带有scala相关的东西,离线系统无法解析。优雅地停掉sparkcontext, 防止资源的泄露。

对于风控来说,查看用户历史数据来进行判断风险的行为是一种常见的手段,所以动不动就30天的交易记录,90的某相累计数据。。。 做离线的小伙伴已经哭晕在厕所,在做spark离线的期间,我们也产出了自己的东西,这里我就介绍三种比较有意义的内容,后续欢迎大家来交流。

动态加载jar

由于安全性的问题,目前odps-spark并没有实现动态加载jar的功能,那如果真有这部分需求的时候,我们应该怎么做呢?例如有些规则脚本在我们实验的时候进行了更新,离线也需要同步更新,可是这里sparkcontext已经创建出来,没有办法再新增jar了。目前我们的解决方案是,在执行任务的时候,把这些jar打好包,并且创建出一个classloader来加载这些可变的jar, 然后在实验的时候,把classloader 通过broadcast机制分发到各个worker上面,然后再需要利用这些jar的闭包里面,通过这个classloader进行加载并且执行。<b>这里需要注意的问题是: 多线程的问题</b>, 因为broadcast value每台worker只有一份,如果一个台worker上面有多个cpu去访问broadcast value的时候,如果没有控制好多线程的问题,就会出现一些奇葩问题,而且这种问题不好排查,因为已经在worker上面,不好去打日志进行验证。所以在处理broadcast value的时候,尽量做到线程安全。

减少shuffle key的数目

在spark开发中,基本上都会用到join, cogroup等操作,这类操作就会产生shuffle操作, spark程序的性能很大程度就是取决于shuffle的性能上面,除了调优修改shuffle的参数(spark.shuffle.memoryfraction, spark.shuffle.manager, spark.shuffle.file.buffer等), 也可以利用其它手段来完成

map side join, 特别是在两个rdd, 一个数据量小, 一个数据量大的情况,我们可以把数据量小的那个做成broadcast, 从而把这次shuffle操作转变成一次map操作,大大地减少shuffle中的性能消耗。

利用<b>bloom filter</b>过滤掉多余的key, 这也是我们在实践发现的,当两个rdd进行cogroup的时候,其实会有很多无用的key, 例如用userid进行关联的时候,会很多无用的userid进行干扰,但是他们也参与整个shuffle的流程中,这时候我们可以把key数目相对比较少的那个rdd的key收集来(可能会有多次collect操作,因为每次collect操作有大小限制),然后把这些做成bloom filter去过滤另外一个rdd里面的key, 从而达到减少shuffle中间的数据量的效果。

数据倾斜的解决

当你在logview上面发现你的2400分区的数据,2399都跑完,另外一个分区怎么跑都跑不完,并且执行时间已经远远超越同伴啦。那么恭喜你,很有可能是数据倾斜发生了。目前介绍下我们的做法

首先是去采集哪些key会出现数据倾斜,这里可以使用groupbykey,然后进行count, 如果这样都会挂掉,那么进行sample抽样来解决,随机抽样10%的数据来进行判断。

找到这样的key之后, 在rdd a里面过滤掉热点对应的key, 形成nonhot的rdd a, 然后针对有hotkey的rdd, 里面的每一个key打上 n 以内的随机数作为后缀。

在rdd b里面首先同样先过滤掉热点的key, 形成nonhot的rdd b, 然后针对hotkey的rdd, 里面每一个key按顺序附加 0 - n 的后缀,每条hotkey的数据就会膨胀成n条数据

hotvalues = res.filter(x =&gt; hotkeyset.value.contains(x.1)).flatmap(x =&gt;

for (i &lt;- 0.until(hashpartitionsize)) yield {

}

最后nothotkey的rdd 进行join, hotkey的rdd进行join, 最后再进行union操作就可以等到最后的结果。

经验总结:主要思路就是把少数的key,打散成n份去进行join,这样就不会集中大量的数据在一个worker中,并且把它高挂。如果针对rdd里面很多的key都是hot key的情况,就无需过滤hot key, 直接给一个rdd打上随机数,另外一个rdd扩容n份进行join.

还有很多细节以及经验想跟大家分享,奈何实力有限,后续的分享会持续到来,大家会看到变态数据点优化,velocity数据结构优化,会针对新版的内存计算平台进行相应的改造以及相关的技术落地。 以上都是来自 蚂蚁金服-大安全-成都安全技术平台的小伙伴们。顺便打个小广告,我们在大量招人,希望回成都的同学,以及想做离线这块的同学(当然我们也很多实时的业务),可以联系@锋扬,@云震。