天天看点

Flink State

作者:开心果果咚

State概念

在Flink中,我们通常把State翻译为状态。

对于流计算来说,数据就像水流一样不断的流入。如果每次的计算都是独立的,也就是每次计算都不依赖上下游的事件,那么我们可以把这类计算称为无状态计算;如果计算需要依赖于历史或者未来的计算,则我们把这类计算称为有状态计算。

那么State就是用来保存有状态的计算中中间的数据。

Flink State

Flink提供了对状态操作的接口,可以将临时数据保存在 State 中 和 从 State 中读取数据。在运行的时候,与算子、Function 体系融合,自动对 State 进行备份(CheckPoint),一旦出现异常,能够从保存的 State 中恢复状态。

什么场景下需要使用到State?下面举例说明我近期开发中遇到的一些例子:

  • 去重:驱动源存在重复的数据,通过flink脚本将重复的数据过滤掉。那么就需要按照paration把所有的主键数据都存储下来,当新的流数据进来后,可以判断当前主键是否存在。
  • 聚合计算:需要对流数据进行sum聚合计算时,需要把当前计算得到的结果保存下来,以便于未来数据的累加计算。
  • 窗口计算:比如需要统计每分钟的访问记录,这个时候使用滚动窗口一分钟进行一次计算,那么在窗口出发前需要的数据都先存储起来,等窗口出发后再将这些数据计算输出。
  • 历史数据:当我们需要与昨天的数据对比时,也可以选择把历史数据存入状态。当然这种场景我一般会选择从外部读取。

看到这里不知道大家有没有疑问?

中间数据是存在哪里的?

数据的存储有没有大小限制?

数据我不想要了可以从状态中删除吗?

所有有关存储的结构我们首先都会关注存储空间大小、持久化、数据删除机制。

不着急,这篇文章都会给大家介绍,我们带着问题继续往下看。

State的分类

流式计算一般都是7*24计算的,且很多场景下数据是要求实时产出的,而且对计算的过程也要求数据不能重复、不能丢失,每个数据都只参与一次计算。

State 就是实现有状态计算下的 Exactly-Once 的基础。

将数据直接存到内存中式最常见的做法,但是内存的容量是有限制的。如果需要存储的数据规模比较大,可能会存在内存不足。

下图是我的作业由于内存不足造成的运行时异常:

Flink State

因此理想的状态是要满足易用、高效以及可靠的三个关键点的。

状态管理有两种方式:

  • Managed State(托管状态):由Flink Runtime进行管理,自动存储,自动恢复,并切在内存管理上进行优化。
  • Raw State(原始状态):用户自己管理,State中存入的数据结构也有用户自己定义。比较少用。

Managed State 分为两种:

  • Keyed State(按键状态):Datastream 经过 keyBy 的操作可以变为 KeyedStream 。Keyed State 只能用在 KeyedStream 的算子中。数据按照keygroup在算子上进行分配。
Flink State
  • Operator State(算子状态):适合于所有算子。由于Operator State中没有key,内置了2种分配方式-均匀分配和将State合并后分发给每个实例。

Keyed State 也有很多种类型,如下图为几种 Keyed State 之间的关系。

Flink State

State 有3个子类分别为: ValueState、MapState、AppendingState。AppendingState 又有一个子类 MergingState。MergingState 又分为 3 个子类分别是ListState、ReducingState、AggregatingState。他们的数据结构和使用方法也存在差异性。每个类型都有各自的实现。

详细的方法咱们就不说了,与我们开发平时用的数据结构差不多。

State backend

State Backend 中文名为状态后端,是状态的管理组件。它主要解决两件事:

  • 管理本地状态,包括访问、存储与更新
  • 当 checkpoint 被激活时,决定状态同步的方式和位置

在 Flink 中提供了三种 State Backend,分别是:

  • MemoryStateBackend
  • FsStateBackend
  • RockDBStateBackend

默认管理方式是MemoryStateBackend。

MemoryStateBackend

MemoryStateBackend将状态信息存储在 Java 内存堆中。

因为是在内存中管理状态,所以 MemoryStateBackend 具备快速、低延时 的优点,但与此同时,其所能管理的状态大小受内存大小的限制,有 OOM 的风险。

FsStateBackend

基于文件系统进行存储,可以是本地文件系统,也可以是 HDFS 等分布式文件系统。 进行的数据仍然是存储在TaskManager 的内存中的,只有在 checkpoint 时,才会将状态快照写入到指定文件系统上。

RocksDBStateBackend

RocksDBStateBackend 是 Flink 内置的第三方状态管理器,采用嵌入式的 key-value 型数据库 RocksDB 来存储正在进行的数据。等到 checkpoint 时,再将其中的数据持久化到指定的文件系统中,所以采用 RocksDBStateBackend 时也需要配置持久化存储的文件系统。

在实际使用中可以根据自己的需求选择,如果数据量较小(默认大小不超过5M),可以存放到MemoryStateBackend和FsStateBackend中,如果数据量较大,可以放到RockDB中。

注意不论使用哪种backend,都会因为数据过大造成存储不足的问题,因此我们在使用有状态的流式计算时,要添加过期时间,以免出现OOM等问题。

从 Flink 1.6 版本开始引入了State TTL特性,该特性可以允许对作业中定义的 Keyed 状态进行超时自动清理。

容错机制和故障恢复

检查点机制checkpoint

为了使 Flink 的状态具有良好的容错性,Flink 提供了检查点机制 (CheckPoints) 。

Checkpoint 一种由 Flink 自动执行的快照,其目的是能够从故障中恢复。

通过检查点机制,Flink 定期在数据流上生成 checkpoint barrier ,当某个算子收到 barrier 时,即会基于当前状态生成一份快照,然后再将该 barrier 传递到下游算子,下游算子接收到该 barrier 后,也基于当前状态生成一份快照,依次传递直至到最后的 Sink 算子上。当出现异常后,Flink 就可以根据最近的一次的快照数据将所有算子恢复到先前的状态。

Flink State

Checkpoint n 将包含每个 operator 的 state,这些 state 是对应的 operator 消费了严格在 checkpoint barrier n 之前的所有事件,并且不包含在此(checkpoint barrier n)后的任何事件后而生成的状态。

checkpoint 是需要耗费时间和资源的,耗费的量级与 State 的大小有关。若 State 较小,checkpoint 过程就比较轻量,对数据流的处理不会产生影响;若 State 较大,checkpoint 过程可能就比较长,便需要考虑周期、时间间隔等。

默认情况下 checkpoint 是禁用的。通过调用 StreamExecutionEnvironment 的 enableCheckpointing(n) 来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。

保存点机制savepoint

保存点机制 (Savepoints)是检查点机制的一种特殊的实现,它允许通过手工的方式来触发 Checkpoint,并将结果持久化存储到指定路径中,主要用于避免 Flink 集群在重启或升级时导致状态丢失。

它的底层实现算法,与 checkpoint 基本上可以说是相同的,两者的主要区别在于:

  • savepoint 由用户手动触发,checkpoint 由应用程序自动触发
  • checkpoint 会被更新的 checkpoint 覆盖,savepoint 不会
  • savepoint 仅支持对齐算法

今天到这里就结束了!祝福大家国庆假期快乐~