天天看点

21.DataStream API之State & Fault Tolerance(Checkpointing)

flink 1.9

The Broadcast State Pattern

Flink中的每个函数和操作符都可以是有状态的(有关详细信息,请参阅 working with state )。有状态函数在各个元素/事件的处理过程中存储数据,使状态成为任何类型的更精细操作的关键构建块。

为了使状态容错,Flink需要对状态进行检查点checkpoint。检查点允许Flink恢复流中的状态和位置,从而为应用程序提供与无故障执行相同的语义。

关于流容错的文档详细描述documentation on streaming fault tolerance了Flink流容错机制背后的技术。

Prerequisites

Flink的检查点机制与流和状态的持久存储交互。一般来说,它要求:

  • 可以在一定时间内重复使用持久化(persistent)的数据源。此类源的示例:持久消息队列(例如Apache Kafka、RabbitMQ、Amazon Kinesis、谷歌PubSub)或文件系统(例如HDFS、S3、GFS、NFS、Ceph,……)。
  • 状态的持久存储,通常是分布式文件系统(例如,HDFS、S3、GFS、NFS、Ceph,…)

Enabling and Configuring Checkpointing

默认情况下,检查点是禁用的。要启用检查点,可以在StreamExecutionEnvironment上调用enablecheckpoint (n),其中n是检查点间隔(以毫秒为单位)。

检查点的其他参数包括:

  • exactly-once vs. at-least-once:您可以选择将模式传递给enablecheckpoint (n)方法,在两个保证级别之间进行选择。对大多数应用程序来说, 精确一次(Exactly-once)是最好的。至少一次(At-least-once)可能与某些超低延迟(始终只有几毫秒)应用程序相关。
  • checkpoint timeout:如果在此之前未完成,则中止正在执行的检查点的时间。
  • minimum time between checkpoints:为了确保流应用程序在检查点之间取得一定的进展,可以定义在检查点之间需要花费多少时间。例如,如果将此值设置为5000,则无论检查点持续时间和检查点间隔如何,下一个检查点将在前一个检查点完成后5秒内启动。注意,这意味着检查点间隔永远不会小于这个参数。

通过定义“检查点之间的时间(time between checkpoints)”比检查点间隔(checkpoint interval)更容易配置应用程序,因为“检查点之间的时间(time between checkpoints)”不容易受到检查点有时花费的时间可能超过平均时间这一事实的影响(例如,如果目标存储系统临时变慢)。

注意,这个值还意味着并发检查点的数量为1。

  • number of concurrent checkpoints:默认情况下,当一个检查点还在进行时,系统不会触发另一个检查点。这可以确保拓扑不会在检查点上花费太多时间,也不会在处理流方面取得进展。可以允许多个重叠的检查点,这对于具有特定处理延迟的管道(例如,因为函数调用需要一些时间来响应的外部服务)而感兴趣,但是仍然希望执行非常频繁的执行checkpoints(100毫秒),从而为了在失败时重新处理很少的数据量。

当定义检查点checkpoints 之间的最小时间间隔时,不能使用此选项。

  • externalized checkpoints:您可以将定期检查点配置为在外部持久化。外部检查点将其元数据写到持久存储中,并且在作业失败时不会自动清理。这样,如果你的工作失败了,你就会有一个检查点来重新开始工作。关于外部化检查点的部署说明中有更多细节deployment notes on externalized checkpoints。
  • fail/continue task on checkpoint errors: 如果在执行任务的检查点过程中发生错误,则任务将失败。这是默认行为。或者当禁用此选项时,任务将简单地将检查点交给检查点协调器并继续运行。
  • prefer checkpoint for recovery:这将确定作业是否回退到最新检查点checkpoint ,即使有更多最近的保存点savepoints 可用来潜在地减少恢复时间。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 设置checkpoint的时间间隔
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
//exactly-once模式:对大多数应用程序来说,该模式是最好的选择,但是会降低flink的吞吐量;
// at-least-once模式:至少一次,有数据重复的可能,主要应用于某些超低延迟的(始终只有几毫秒)业务场景。
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

//下一个checkpoint将在前一个checkpoint完成后5秒内启动,
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);

//env.enableCheckpointing(1000)和setMinPauseBetweenCheckpoints(5000)对比分析:
//setMinPauseBetweenCheckpoints(5000)要比enableCheckpointing(1000)更容易配
//置应用程序,因为setMinPauseBetweenCheckpoints(5000)不受检查点有时可能比平均
//时间更长这一事实的影响(例如,如果目标存储系统暂时变慢)。

// 检查点必须在一分钟内完成,否则将被丢弃
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 同一时间,只允许一个检查点进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 启用在作业取消后,将checkpoint保留到外部检查点
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 当存在最近的保存点时,允许作业恢复回检查点
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
           

Related Config Options

可以通过 conf/flink-conf.yaml设置更多参数或默认值(configuration):

Key Default Description
state.backend (none) 用于存储和检查点状态的状态后端。
state.backend.async true 选项状态后端是否应在可能且可配置的情况下使用异步快照方法。一些状态后端可能不支持异步快照,或者只支持异步快照,则忽略此选项。
state.backend.fs.memory-threshold 1024 状态数据文件的最小值。所有小于该值的状态块都内联存储在根root检查点元数据文件中。
state.backend.fs.write-buffer-size 4096 写入文件系统的检查点流(checkpoint streams)的写缓冲区的默认大小。实际写缓冲区大小被确定为此选项和选项'state.backend.fs.memory-threshold'的值的最大值。注:MAX[state.backend.fs.memory-threshold,state.backend.fs.write-buffer-size]
state.backend.incremental false 如果可能,选择状态后端是否应该创建增量检查点。对于增量检查点,只存储前一个检查点的差异,而不存储完整的检查点状态。一些状态后端可能不支持增量检查点,则忽略此选项。
state.backend.local-recovery false

此选项为此状态后端配置本地恢复。默认情况下,本地恢复被禁用。本地恢复目前只覆盖键控状态后端(keyed state backends

)。目前,memorystateback不支持本地恢复,因此忽略该选项。

state.checkpoints.dir (none) 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录。存储路径必须可从所有参与的进程/节点访问(i.e. all TaskManagers 和 JobManagers).
state.checkpoints.num-retained 1 要保留的已完成检查点checkpoints的最大数量。
state.savepoints.dir (none) 保存点的默认目录。用于将保存点写入文件系统的状态后端(MemoryStateBackend, FsStateBackend, RocksDBStateBackend)。
taskmanager.state.local.root-dirs (none) 配置参数定义根目录,用于存储本地恢复的基于文件的状态。本地恢复目前只覆盖键状态后端keyed state backends。目前,MemoryStateBackend不支持本地恢复,因此忽略该选项

Selecting a State Backend

Flink的检查点机制checkpointing mechanism会将定时器和有状态操作算子的所有状态存储到一致快照snapshots中,包括连接器、窗口和任何用户定义的状态user-defined state。检查点checkpoints存储在哪里(例如,JobManager内存、文件系统、数据库)取决于配置的状态后端State Backend。

默认情况下,状态state保存在TaskManagers的内存中,检查点checkpoints存储在JobManager的内存中。对于大状态的适当持久性,Flink支持在其他状态后端存储和检查点状态的各种方法。状态后端选择可以通过StreamExecutionEnvironment.setStateBackend(…)。

有关可用状态后端以及作业范围和集群范围配置的选项的详细信息,请参阅状态后端state backends。

State Checkpoints in Iterative Jobs

Flink目前仅为没有迭代的作业提供处理保证。在迭代作业上启用检查点会导致异常。为了在迭代程序上强制检查点,用户需要在启用检查点时设置一个特殊的标志:env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)。

Restart Strategies

Flink支持不同的重启策略,这些策略控制在发生故障时如何重启作业。有关更多信息,请参见重启策略Restart Strategies。

https://flink.sojb.cn/dev/stream/state/checkpointing.html

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/checkpointing.html

https://www.jianshu.com/p/6696742ba228

继续阅读