天天看点

大数据开发:Flink的状态编程、状态机制

作者:尚硅谷教育

引言

在流处理中,数据是连续不断到来和处理的。每个任务在进行计算处理时,都可以基于当前数据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护且用来计算输出结果的所有数据就叫作这个任务的状态。

一、什么是状态

1.1有状态的算子

在 Flink 中,算子任务可以分为无状态和有状态两种情况。

基本转换算子,如 map、filter、flatMap, 计算时不依赖其他数据,就都属于无状态的算子。

大数据开发:Flink的状态编程、状态机制

常见的聚合算子、窗口算子都属于有状态的算子。

大数据开发:Flink的状态编程、状态机制

1.2状态的管理

在传统的事务型处理架构中,这种额外的状态数据是保存在数据库中的。而对于实时流处理来说,这样做需要频繁读写外部数据库,如果数据规模非常大肯定就达不到性能要求了。所以 Flink 的解决方案是,将状态直接保存在内存中来保证性能,并通过分布式扩展来提高吞吐量。

1.3状态的分类

1. 托管状态(Managed State)和原始状态(Raw State)

托管状态就是由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现,我们只要调接口就可以。聚合、窗口等算子中内置的状态,就都是托管状态;我们也可以在富函数类(RichFunction)中通过上下文来自定义状态,这些也都是托管状态。

而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。只有在遇到托管状态无法实现的特殊需求时,我们才会考虑使用原始状态;

2. 算子状态(Operator State)和按键分区状态(Keyed State)

我们又可以将托管状态分为两类:算子状态和按键分区状态。

Operator State Keyed State
适用算子类型 可用于所有算子: 常用于source, sink, 例如 FlinkKafkaConsumer 只能用于用于KeyedStream上的算子
状态分配 一个算子的子任务对应一个状态 一个Key对应一个State: 一个算子会处理多个Key, 则访问相应的多个State
创建和访问方式 实现CheckpointedFunction或ListCheckpointed(已经过时)接口 重写RichFunction, 通过里面的RuntimeContext访问
横向扩展 并发改变时有多重重写分配方式可选: 均匀分配和合并后每个得到全量 并发改变, State随着Key在实例间迁移
支持的数据结构 ListState, UnionListStste, BroadCastState ValueState, ListState, MapState, ReduceState, AggregatingState

Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。无论是 Keyed State 还是 Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着对应的状态,算子的子任务之间状态不共享。

二、Flink的状态和数据结构

2.1键控状态

具有相同 key 的所有数据都会到访问相同的状态,而不同 key 的状态之间是彼此隔离的。

键控状态是根据输入数据流中定义的键(key)来维护和访问的。

Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。

大数据开发:Flink的状态编程、状态机制

键控状态支持的数据结构

  • ValueState<T>
  • 保存单个值。每个key有一个状态值。设置使用 update(T),获取使用 T value()
  • ListState<T>
  • 保存元素列表。
  • 添加元素:add(T) addAll(List<T>)
  • 获取元素:Iterable<T> get()
  • 覆盖所有元素:update(List<T>)
  • ReducingState<T>
  • 存储单个值,表示把所有元素的聚合结果添加到状态中。与ListState类似,但是当使用add(T)的时候ReducingState会使用指定的ReduceFunction进行聚合。
  • AggregatingState<IN, OUT>
  • 存储单个值。与ReducingState类似,都是进行聚合。不同的是,AggregatingState的聚合结果和元素类型可以不一样。
  • MapState<UK, UV>
  • 存储键值对列表。
  • 添加键值对:put(UK, UV) or putAll(Map<UK, UV>)
  • 根据key获取值:get(UK)
  • 获取所有:entries(), keys() and values()
  • 检测是否为空:isEmpty()

注意:

a) 所有的类型都有clear(),清空当前key的状态

b) 这些状态对象仅用于用户与状态进行交互

c) 状态不是必须存储到内存,也可以存储在磁盘或者任意其他地方

d) 从状态获取的值与输入元素的key相关

2.2算子状态

除按键分区状态之外,另一大类受控状态就是算子状态。从某种意义上说,算子状态是更底层的状态类型,因为它只针对当前算子并行任务有效,不需要考虑不同 key 的隔离。算子状态功能不如按键分区状态丰富,应用场景较少,经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,Kafka consumer 每个并行实例维护了 topic partitions 和偏移量的map作为它的算子状态,以保证Flink应用的Exactly-Once语义。

Flink为算子状态提供三种基本数据结构:

  • 列表状态(List state)
  • 将状态表示为一组数据的列表
  • 联合列表状态(Union list state)
  • 也是将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
  • 一种是均匀分配(List state),另外一种是将所有 State 合并为全量 State 再分发给每个实例(Union list state)。
  • 广播状态(Broadcast state)
  • 是一种特殊的算子状态。如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。

三、状态持久化和状态后端

Flink 的状态管理机制中,很重要的一个功能就是对状态进行持久化(persistence)保存,这样就可以在发生故障后进行重启恢复。Flink 对状态进行持久化的方式,就是将当前所有分布式状态进行“快照”保存,写入一个“检查点”(checkpoint)或者保存点(savepoint)保存到外部存储系统中。具体的存储介质,一般是分布式文件系统(distributed file system)。

3.1检查点

有状态流应用中的检查点(checkpoint),其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉。在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态;如果发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程, 就如同“读档”一样。

3.2状态后端

在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。

大数据开发:Flink的状态编程、状态机制

1. 状态后端的分类

Flink 中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌 RocksDB 状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是HashMapStateBackend。

  • 哈希表状态后端(HashMapStateBackend)
  • HashMapStateBackend 是将本地状态全部放入内存的,这样可以获得最快的读写速度,使计算性能达到最佳;代价则是内存的占用。它适用于具有大状态、长窗口、大键值状态的作业, 对所有高可用性设置也是有效的。
  • 对于检查点的保存,一般是放在持久化的分布式文件系统(file system)中,也可以通过配置“检查点存储”(CheckpointStorage)来另外指定。
  • 内嵌RocksDB 状态后端(EmbeddedRocksDBStateBackend)
  • EmbeddedRocksDBStateBackend会将处理中的数据全部放入 RocksDB 数据库中,数据被存储为序列化的字节数组,读写操作需要序列化/反序列化,因此状态的访问性能要差一些。EmbeddedRocksDBStateBackend始终执行的是异步快照,也就是不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。对于检查点,同样会写入到远程的持久化文件系统中。由于它会把状态数据落盘,而且支持增量化的检查点,所以在状态非常大、窗口非常长、键/值状态很大的应用场景中是一个好选择,同样对所有高可用性设置有效。

2. 如何选择正确的状态后端

  • HashMap 和 RocksDB 两种状态后端最大的区别,就在于本地状态存放在哪里:前者是内存,后者是 RocksDB。在实际应用中,选择哪种状态后端,主要是需要根据业务需求在处理性能和应用的扩展性上做一个选择。在工作中,RocksDB读写性能可能不如HashMap,但是由于RocksDb存储在硬盘上,所以RocksDb往往更受公司的欢迎。

总结

Flink的状态,我们可以当成类似于redis去理解,状态的数据类型也就是存储数据的数据类型。键控状态我们使用的会比较多一点,另外多并行度的情况,每个并行度下的状态值都不是共享的。状态后端的话,数据量如果比较大,建议使用RocksDB。

继续阅读