天天看点

Spark Streaming 1.6 流式状态管理分析

在流式计算中,数据是持续不断来的,有时候我们要对一些数据做跨周期(duration)的统计,这个时候就不得不维护状态了。而状态管理对spark 的 rdd模型是个挑战,因为在spark里,任何数据集都需要通过rdd来呈现,而rdd 的定义是一个不变的分布式集合。在状态管理中,比如spark streaming中的word-count 就涉及到更新原有的记录,比如在batch 1 中  a 出现1次,batch 2中出现3次,则总共出现了4次。这里就有两种实现:

获取batch 1 中的 状态rdd  和当前的batch rdd 做co-group 得到一个新的状态rdd。这种方式完美的契合了rdd的不变性,但是对性能却会有比较大的影响,因为需要对所有数据做处理,计算量和数据集大小是成线性相关的。这个我们后续会详细讨论。

第二种是一种变通的实现。因为没法变更rdd/partition等核心概念,所以spark streaming在集合元素上做了文章,定义了mapwithstaterdd,将该rdd的元素做了限定,必须是mapwithstaterddrecord 这个东西。该mapwithstaterddrecord 保持有某个分区的所有key的状态(通过statemap记录)以及计算结果(mappeddata),元素mapwithstaterddrecord 变成可变的,但是rdd 依然是不变的。

这两个方案分别对应了 updatestatebykey/mapwithstate  的实现。

前言

parquet性能

<a href="https://yq.aliyun.com/articles/60199">自动内存管理模型</a>

<a href="https://yq.aliyun.com/articles/60245">流式状态管理10倍性能提升</a>

之前就想系统的对这三块仔细阐述下。现在总算有了第二篇。

本文会从三个方面展开:

updatestatebykey的实现;

mapwithstate(1.6新引入的流式状态管理)的实现

mapwithstate额外内容

在 关于状态管理中,我们已经描述了一个大概。该方法可以在org.apache.spark.streaming.dstream.pairdstreamfunctions中找到。调用该方法后会构建出一个org.apache.spark.streaming.dstream.statedstream对象。计算的方式也较为简单,核心逻辑是下面两行代码:

首先将prevstaterdd 和 parentrdd(新batch 的数据) 做一次cogroup,形成了 (k, seq[v], seq[w]) 这样的结果集。你会发现和updatestatebykey 要求的(seq[v], option[s])签名还是有些类似的。事实上这里的seq[v] 就是parentrdd的对应k 的新的值。为了适配他两,spark 内部会对你传进来的updatefunc 做两次转换,从而使得你的函数能够接受(k, seq[v], seq[w])这样的参数。看到这,想必你也就知道为啥updatestatebykey  接受的函数签名是那样的了。

前文我们提到,这样做很漂亮,代码也少,契合rdd的概念,然而你会发现无论parentrdd里有多少key,哪怕是只有一个,也需要对原有所有的数据做cogroup 并且全部做一遍处理(也就是应用你的update函数)。显然这是很低效的。很多场景下,新的batch 里只有一小部分数据,但是我们却不得不对所有的数据都进行计算。

可以为key 设置ttl(timeout)

用户可以对返回值进行控制

前面我们提到,在新的mapwithstate api 中,核心思路是创建一个新的mapwithstaterdd,该rdd的元素是 mapwithstaterddrecord,每个mapwithstaterddrecord 记录某个partiton下所有key的state。

依然的,你在org.apache.spark.streaming.dstream.pairdstreamfunctions 可以看到mapwithstate 签名。

这一段代码有三点值得注意:

该接口在1.6 中还是 experimental 状态

接受的不是一函数,而是一个statespec 的对象。

返回了一个新的dstream

其实statespec 只是一个包裹,你在实际操作上依然是定义一个函数,然后通过statespec进行包裹一下。以 wordcount 为例:

接着statespec.function(mappingfunc) 包裹一下就可以传递给mapwithstate。我们看到该函数更加清晰,word 是k,one新值,state 是原始值(本batch之前的状态值)。这里你需要把state 更新为新值,该实现是做了一个内部状态维护的,不像updatestatebykey一样,一切都是现算的。

mapwithstatedstreamimpl 的compute逻辑都委托给了internalmapwithstatedstream,最终要得到mapwithstaterdd,基本是通过下面的逻辑来计算的:

这里有个很重要的操作是对datardd进行了partition操作,保证和prevstaterdd 按相同的分区规则进行分区。这个在后面做计算时有用。

获取到prevstaterdd,接着获取当前batch的数据的rdd,最后组装成一个新的mapwithstaterdd。mapwithstaterdd 还接受你定义的函数mappingfunction以及key的超时时间。

其中mapwithstaterdd 和别的rdd 不同之处在于rdd里的元素是mapwithstaterddrecord 对象。其实prevstaterdd  也是个mapwithstaterdd 。

整个实际计算逻辑都在mapwithstaterddrecord.updaterecordwithdata 方法里。

前面我们提到,mapwithstaterddrecord 是prevstaterdd 里的元素。有多少个分区,就有多少个mapwithstaterddrecord 。一个record 对应一个分区下所有数据的状态。在mapwithstaterddrecord.updaterecordwithdata 方法中,第一步是copy 当前record 的状态。这个copy是非常快的。我们会在mapwithsate额外内容 那个章节有更详细的分析。

接着定义了两个变量,其中mappeddata  会作为最后的计算结果返回,wrappedstate 类似hadoop里的 text,你可以不断给它赋值,然后获得一些新的功能,避免返回创建对象。它主要是给state添加了一些方法,比如update,define状态等。

接着遍历当前batch 所有的数据,并且应用用户定义的函数。这里我们看到,我们只对当前batch的数据进行函数计算,而不是针对历史全集数据进行计算,这是一个很大的性能提升点。接着根据wrappedstate的状态对newstatemap做更新,主要是删除或者数据的更新。最后将新的结果返回并且放到mappeddata 。

上面这段逻辑,你会发现一个问题,如果dataiterator 里有重复的数据比如某个k 出现多次,则mappeddata也会有多次。以wordcount 为例:

Spark Streaming 1.6 流式状态管理分析

hello 出现了三次,所以会加入到mappeddata中三次。其实我没发现这么做的意义,并且我认为会对内存占用造成一定的压力。

如果你想要最后的结果,需要调用完mapwithstate 之后需要再调用一次statesnapshots,就可以拿到第三栏的计算结果了。

经过上面的计算,我们对parentrdd里的每个分区进行计算,得到了mappeddata以及newstatemap,这两个对象一起构建出mapwithstaterddrecord,而该record 则形成一个partition,最后构成新的mapwithstaterdd。 

mapwithstaterddrecord 透过statemap 维护了某个分区下所有key的当前状态。 在前面的分析中,我们第一步便是clone old statemap。如果集合非常大,拷贝也是很费时才对,而且还耗费内存。

所以如何实现好statemap 变得非常重要:

实现过程采用的是 增量copy。也叫deltamap。 新创建的statemap 会引用旧的statemap。新增数据会放到新的statemap中,而更新,删除,查找等操作则有可能发生在老得statemap上。缺点也是有的,如果statemap 链路太长,则可能会对性能造成一定的影响。我们只要在特定条件下做合并即可。目前是超过delta_chain_length_threshold=20 时会做合并。

使用 org.apache.spark.util.collection.openhashmap,该实现比java.util.hashmap 快5倍,并且占用更少的内存空间。不过该hashmap 无法进行删除操作。

继续阅读