spark 2.0 将流式计算也统一到dataframe里去了,提出了structured streaming的概念,将数据源映射为一张无线长度的表,同时将流式计算的结果映射为另外一张表,完全以结构化的方式去操作流式数据,复用了其对象的catalyst引擎。
作为spark平台的流式实现,spark streaming 是有单独一套抽象和api的,大体如下

图片来源于spakr官网
代码的形态如下:
上面都是套路,基本都得照着这么写。
spark 2.0 时代
概念上,所谓流式,无非就是无限大的表,官方给出的图一目了然:
图片来源于官网
在之前的宣传ppt里,有类似的代码,给人焕然一新的感觉。当然,下面的代码你肯定要有上下文的,就这一句肯定跑不起来的。
图片来源于http://litaotao.github.io/images/spark-2.0-7.png
第一个是标准的dataframe的使用代码。下面第二个则是流式计算的代码,看完这个demo你肯定会纳闷:
没有定时器么,我怎么设置duration?
在哪里设置awaittermination呢?
如果我要写入到其他引擎,而其他引擎没有适配咋办?
这些疑问其实归结起来就是:
<b>structured streaming 的完整套路是啥?</b>
我们来看看代码(例子来源于spark源码,我稍微做了些修改):
这个就是structured streaming 的完整套路了。
structured streaming 目前source源只支持file 和 socket 两种。输出则是四种,前面已经提到。foreach则是可以无限扩展的。我举个例子:
我把数据最后写到各个节点的临时目录里。当然,这只是个例子,不过其他类似于写入redis的,则是类似的。
如果structured streaming 仅仅是换个api,或者能够支持dataframe操作,那么我只能感到遗憾了,因为2.0之前通过某些封装也能够很好的支持dataframe的操作。那么 structured streaming 的意义到底何在?
重新抽象了流式计算
易于实现数据的exactly-once
我们知道,2.0之前的spark streaming 只能做到at-least once,框架层次很难帮你做到exactly-once,参考我以前写的文章spark streaming crash 如何保证exactly once semantics。 现在通过重新设计了流式计算框架,使得实现exactly-once 变得容易了。
可能你会注意到,在structured streaming 里,多出了outputmode,现在有complete,append,update 三种,现在的版本只实现了前面两种。
complete,每次计算完成后,你都能拿到全量的计算结果。
append,每次计算完成后,你能拿到增量的计算结果。
但是,这里有个但是,使用了聚合类函数才能用complete模式,只是简单的使用了map,filter等才能使用append模式。 不知道大家明白了这里的含义么?
complete 就是我们前面提到的mapwithstate实现。 append 模式则是标准的对数据做解析处理,不做复杂聚合统计功能。
官方给出了complete 模式的图:
append 模式则是返回transform后最新的数据。
前面我们说到,现在的设计很简单,其实就是 无限大的 source table 映射到一张无限大的 result table上,每个周期完成后,都会更新result table。我们看到,structured streaming 已经接管了端到端了,可以通过内部机制保证数据的完整性,可靠性。
offset 概念,流式计算一定有offset的概念。
对于无法回溯的数据源则采用了wal日志
state概念,对result table 的每个分区都进行状态包装,分区的的每个add,put,update,delete操作,都会写入到hdfs上,方便系统恢复。
其中第三点是只有在2.0才有的概念。不过比较遗憾的是,result table 和foreachwriter 并没有什么结合,系统只是保证result table的完整性,通过hdfsbackedstatestoreprovider将result table 保存到hdfs。
以前的api就是给你个partition的iterator,你爱怎么玩怎么玩,但是到了现在,以foreachwriter为例,
数据你只能一条一条处理了。理论上如果假设正好在process的过程中,系统挂掉了,那么数据就会丢了,但因为 structured streaming 如果是complete模式,因为是全量数据,所以其实做好覆盖就行,也就说是幂等的。
如果是append 模式,则可能只能保证at-least once ,而对于其内部,也就是result table 是可以保证exactly-once 的。对于比如数据库,本身是可以支持事物的,可以在foreachwrite close的时候commit下,有任何失败的时候则在close的时候,rollback 就行。但是对于其他的,比如hbase,redis 则较为困难。
另外在foreachwriter提供的初始化函数,
返回值是boolean,通过检测版本号,是否跳过这个分区的数据处理。返回true是为不跳过,否则为跳过。当你打开的时候,可以通过某种手段保存version,再系统恢复的时候,则可以读取该版本号,低于该版本的则返回false,当前的则继续处理。