天天看点

MillWheel: Google是如何在事件流处理上做到exactly one semantic的

作者:大数据与人工智能分享

Google在2013年发了一篇非常重要的paper,来教大家Google是如何在stream processing (事件流处理)方面做到exactly once semantic的,叫MillWheel。这个实现并不是最早做到exactly once的(可能trident会稍微早一点),但是这里面通过low watermark和per key storage这两个概念来做绝对是创新。原文在这里:https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41378.pdf

为了做到exactly once semantic,我们得先介绍几个概念

  • Persistent storage:在储存事件流处理完的结果之时,得有一个系统可以永久储存这些处理完的结果,因为很多处理完的结果是需要被再次调用的
  • Low watermark:事件流是从世界各地发往Google的服务器的,所以事件流在各个数据中心之间的传输是有延时的。 所以MillWheel需要你提供一个时间区间,所有的数据应该都在这个时间区间里面到达你的事件流处理器
  • Duplicate prevention:每一行来到的重复的数据都会被删除

每一行的MillWheel数据可以被理解成三个值:key,value和timestamp。在这里lower watermark是根据每个发送过来的时间戳计算的

MillWheel: Google是如何在事件流处理上做到exactly one semantic的

数据处理作为一个整体可以根据用户提供的DAG(无回路图)来决定这些事件流的内容被如何处理,这样用户可以叠加各种各样的计算方式。比如我在一个不知名高逼格问答网站上面点击了一个广告,这个点击可以根据广告id,广告主id,用户id来做聚合,但是处理逻辑是分开的,但是都可以在MillWheel的框架下面执行。

MillWheel: Google是如何在事件流处理上做到exactly one semantic的

一个特别傻的用MillWheel做的某问答网站收钱系统

具体MillWheel框架给每一行数据提供的保证是:每一行处理的数据都会根据每一个key做一个checkpoint,而且每一行数据只提供一次。我们接下来看一看具体MillWheel是怎么做到这个保证的,以及我们可以如何利用这个保证。

既然我们要根据每一个key来做checkpointing,那么每一行数据都得有一个可以把key从数据里面读取出来的逻辑。Google内部有很多deserialization protocol,比如protobuf,会被用来做key的读取。这里key要看具体你需要处理的业务逻辑是什么,假设是一个广告系统的话广告主的ID或者点击广告的用户的ID都是一个合理的key。

MillWheel: Google是如何在事件流处理上做到exactly one semantic的

在提供一个key之后,MillWheel还会提供一个per key persistent storage,让你来更好处理你的业务逻辑。比如我需要给广告主提供的是每个广告每五分钟多少次点击,但是用户我不想每一个用户都存那么多东西,那可能每一个用户的点击我只要存hyperloglog就可以了,只要看他最近有没有很多点击来判断他是不是机器人,这个点击是否有效。

MillWheel: Google是如何在事件流处理上做到exactly one semantic的

当然,并不是每一个广告的点击都会被送到系统里面:在获取key的时候我们也会获取这一行数据的时间戳,这个时间戳会被用来计算low watermark。low watermark的定义是现在已经到达MillWheel但是还没有被处理的数据的时间戳,但是这个时间戳不会超过一个用户定义的上线、不能比当下的时间晚太多。所以只要超过用户定义的时间的时间范围的数据,就是迟到的数据,迟到的数据的状态是不会被存到内存里面的。这个设计的厉害之处在于,如果数据处理一直很快,且所有的消息都没有迟到,那么low watermark会很接近现实的时间。如果数据出现迟到,再迟也不会超越用户设定的上限。

因为这里有一个low watermark的概念,那么我们就得确保有一个服务可以计算low watermark。MillWheel的设计是每一个事件流处理器会回报自己的最老还没处理的数据的时间戳,然后Injector会从每一个处理器收集最迟的时间戳。要收集最迟的时间戳因为每一个处理器的watermark应该都是一样且应该是最保守的。

MillWheel: Google是如何在事件流处理上做到exactly one semantic的

每次处理每一行的数据的时候,只要过了low watermark,MillWheel需要做下面这些事情:

  • 检查这个数据是不是重复了
  • 处理用户提供的逻辑
  • 所有的state存到数据库里
  • 告诉发送数据的服务器已经处理完毕
  • 服务器发送后面的数据发送给处理器

这里发送端是会给每一行unique ID的,然后接收端根据这个unique ID去做dedup。有的时候为了优化速度,可以一次性从服务器拿很多行数据一起处理。

这里还有几个比较复杂的状况我们需要考虑。比如输出数据也是需要checkpoint的,不然的话有可能在同一个时间区间输出两个截然不同的数据,因为之前聚合结束的state没有被存下来。通过checkpoint输出,整个数据处理直接变成了一个idempotent的服务。当然本身有些数据处理就是idempotent的,那么这个时候可以省略dedup,或者先broadcast给下游这个输出再checkpoint。

这里还有一个需要注意的地方是每一个key必须只有一个writer,而且每个key的state在储存的时候必须是atomic的,不然是没有办法保证每一个key的state是consistent的。

MillWheel: Google是如何在事件流处理上做到exactly one semantic的

MillWheel里面大概所有的流程

论文后面主要讨论的是在deploy了之后效果如何以及一些edge case,我这里摘几个比较有意思的

MillWheel: Google是如何在事件流处理上做到exactly one semantic的

low watermark的计算是有延时的。整体来说再快的数据处理可能还是有两秒左右的延迟

MillWheel: Google是如何在事件流处理上做到exactly one semantic的

延迟不会因为增加了机器的数量就减少,因为出现的慢的服务器可能性更大

最后我再讲两句为什呢这个问题很重要。在一个互联网广告公司里面,各种各样的事件是要根据数据流来收钱的。Google要拿着广告的点击去跟其他公司收钱,YouTube要拿着它的video view去跟其他公司收钱。在这些情况下,收对钱是一件非常重要的事情:你要是钱收少了,公司遭受损失;你要是钱收多了,你的数据会跟第三方做审核的公司数据出现出入,会出现非常严重的商誉问题。所以这个数据流处理在这方面不能多不能少,最好每一行只处理一次只收一次钱。

这时候你可能想退一步,说为了处理收钱这个问题,我能不能直接每天或者每小时做一次dedup,然后再回复给用户说你的广告拿到了多少点击,我要收你多少钱。这里还有一个问题是在很多情况下收钱这件事情是实时汇报给广告商的,因为广告商最好是是有能力随时看到自己的广告到底效果如何,然后可以选择增加或者减少预算。甚至在某些特定的情况下(比如Superbowl或者黑色星期五),广告商其实是本着“我今天就是要花这么多钱,哪个平台上面撒出去我是不管的”,那这个时候实时的reporting就变得尤其的重要。

继续阅读