天天看点

Storm在推荐系统中的应用

Storm简介

       apache开源社区项目Storm,是一款分布式实时计算系统。它之上的应用易于开发与部署。关于他们的介绍,请移步http://storm.apache.org/,那里有更官方且全面的介绍。 我们利用Storm擅长基于数据流并行计算的优势,弥补Hadoop在实时计算方面的缺憾。这些使用日志采集系统(比如基于Kafka或者Scribe)作为输入源计算出来的实时结果,将为推荐系统所享用。

       我们最早使用基于Hadoop的计算解决方案,由于数据采集方案的约束,最快的计算频率也只能是每小时一次。利用Kafka加Storm的解决方案后,计算结果提交频次缩短到1分钟一次。根据观察,我们目前的设定尚未达到Storm集群和数据库的极限。这个提交周期仍然存在缩短的空间。

Storm应用

我们在Storm集群上搭建应用处理在线的几种维度的在线pv统计。其活动图如图1所示:

Storm在推荐系统中的应用

图1 实时计算应用活动图

多数的业务逻辑都是这样的:

1.      不同的日志流来源拥有各自的Spout,负责数据的读取、整理和简单校验。

2.      Spout将这些日志送向处理分发的下一层Bolt节点,做进一步的校验和整理,然后分流到下一层业务各自的持久化Bolt中。

3.      持久化Bolt负责将合并计算的最终结果持久化到数据库。

他们的部署图类似图2所示:

Storm在推荐系统中的应用

图2 在线统计系统部署图

我们在Spout这一层做简单的日志格式校验,如果出错就记录并汇报。第二层的Bolt处理的结果可能稍微复杂一点:这里的节点有时候需要缓存部分特别的校验信息(来自于缓存或者数据库)来做数据校验,同时要做的更关键的事情是判断应该根据业务分发给下层的哪种Bolt。我们的方案是每个数据表使用自己的Bolt。这样的好处是不同的业务逻辑可以分开处理。在线的pv统计即便丢失掉一部分数据其实也无伤大雅(我们的可接受范围是丢失率在5%以下),因此我们开辟了内存中的空间缓存中间计算结果。事实上,Storm的可靠性相当不错,每个节点都能运行很久而没有任何异常。如果有更高的要求,那么可以求助于外部的缓存方案。

       利用Storm计算得到的在线数据,可以回馈给推荐系统:告诉系统哪些不良的推荐素材正在拉低推荐效果,或者哪些推荐素材推送次数过多。有些时候,这些数据还要作为参考去评定离线统计出来的数据是否有误,甚至可以快速的定位问题,而无需临时驱动一个不常用的离线计算任务(甚至需要额外开发)。我们目前离线数据计算和在线数据计算的差异在3%以下。

应用心得:

1. 合理利用流的分发策略

截止发文,Storm官方已经为数据分发实现了八种不同的解决方案:

²  ShuffleGrouping:数据元以随机形式分发的解决方案,保证负载均衡。

²  FieldsGrouping:以指定的Key进行分发的解决方案。

²  AllGrouping:顾名思义,给所有的bolt任务都发送数据。官方友情提示慎用此方案。

²  GlobalGrouping:数据只会涌向id最小的那个bolt任务。注意,是只有一个bolt任务会接收到数据。

²  NoneGrouping:本意是让使用者无需关心具体实现的解决方案。目前其实是ShuffleGrouping。官方声称未来可能会有不一样的实现。

²  DirectGrouping:由数据生产者直接指定要发送到的任务。只能用于被声明为DirectStream的流中,并且数据元(Tuple)必须使用emitDirect方法来发送。

²  Localor shuffleGrouping:如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的ShuffleGrouping行为一致。意思是当有本地的bolt进程时,只发送给本地任务,没有才当成shuffleGrouping使用。 

²  Partial Key grouping:0.10.0版本新增的解决方案,类似FieldsGrouping。考虑到了下游Bolt的负载均衡问题。

²  如果以上策略还不够解决问题。那么可以自定义方案,使用CustomGrouping。

我们使用的Storm版本是0.9.2。为了均衡负载,我们在Spout传递到第二层校验层的时候,使用了shuffleGrouping的方式。根据图3,我们可以看到各个节点的负载很平均。数据往最底层分发的时候,考虑到最终归并后的结果集体入库,除了均衡负载,不应该引入增加数据库提交次数的问题。因此这里的分发策略,我们使用了fieldsGrouping的方式:我们使用时间标签和数据库唯一键复合的形式作为分发的key。这样的方式,使得在大部分的提交周期里,数据库的提交操作并没有被增加,同时也达到均衡的目的。

Storm在推荐系统中的应用

图3 Storm集群的监控网页

2. 使用简单对象进行传递

在Storm的官方文档上,我们并没有发现这样的注意事项,但是我们却在实际应用中发现了以下的问题。我们一开始传递数据的时候,并不是像官方例子里使用字符串类型进行传递,而是图方便使用了序列化的对象。经过统计,我们发现数据转化大概有10%左右的错误情况。当修改为使用字符串进行传递以后,这样的问题就没有了。有心人可以继续追踪。

3. 重载fail方法

编写Spout和Bolt实现类的时候,建议重载fail方法。每次数据发送失败后,发送方会调用自己的fail方法。因此,这个方法里不仅能有效的植入一些自己的报警措施,也可以选择再次发送数据,避免数据的丢失。如果很重要的数据,重复发送失败之后可以引入离线修复的办法去完成。

4. 预警措施

在Storm的应用过程中,我们曾经遇到过Storm的雪崩,zookeeper节点硬盘损坏导致的worker假死。这些现象虽然通过Storm自动重启或者人工重启来解决。但是触发频率与发现时间却是我们需要注意的问题。如果不能及时发现问题,将导致损失掉一大部分的数据,并且不利于定位问题。我们考虑一种简单的预警方法是从数据结果入手。我们认为如果临近日期中同一时段数据出现了大幅度的波动,将是报警的时机。如果为了快速部署实现,简单的SQL语句和shell脚本就能够实现这种方案。

5. 补救措施

预警的策略决定之后,数据的补救方式也应当确定下来。当数据错误发生在大跨度的时间里,我们可能会有数日的数据是缺失的。有两种方案可以考虑。

第一个是令Kafka的offset进行回调。第二是使用离线计算的方式去补回结果。

第一种方案好处是,Storm系统无需修改。坏处是回灌数据将影响新数据的消费。

第二种方案不会有妨碍继续消费的问题,不过需要有一份作为基准的离线数据与离线计算系统来支持。由于我们已经有这样的日志系统,我们目前采用的是第二种策略。