天天看点

Flink Checkpoint 问题排查

作者:小虾好望角

在 Flink 中,状态可靠性保证由 Checkpoint 支持,当作业出现 failover 的情况下,Flink 会从最近成功的 Checkpoint 恢复。在生产实践中,Checkpoint 的问题需要引起我们的重视。

Checkpoint 流程简介

Checkpoint 是由 JM 的 Checkpoint Coordinator 发起的:

Flink Checkpoint 问题排查

1.1 Coordinator 向所有 Source 节点触发 Checkpoint

Flink Checkpoint 问题排查

1.2 在数据流中插入 Checkpoint Barrier

source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才会执行相应的 Checkpoint:

Flink Checkpoint 问题排查

1.3 对算子 State 状态进行同步快照与异步上传

当 task 完成 state 备份后,会将备份数据的地址(state handle)通知给 Checkpoint Coordinator。这里分为同步和异步(如果开启的话)两个阶段:

Flink Checkpoint 问题排查

同步阶段:task执行状态快照,并写入外部存储系统(根据状态后端的选择不同有所区别)执行快照的过程:

  • 对 state 做深拷贝
  • 将写操作封装在异步的 FutureTask 中,FutureTask 的作用包括:1)打开输入流;2)写入状态的元数据信息;3)写入状态;4)关闭输入流

异步阶段:

  • 执行同步阶段创建的 FutureTask
  • 向 Checkpoint Coordinator 发送 ACK 响应

1.4 多输入算子要进行 Barrier 对齐操作

下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本地快照。这里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会从中选择没有上传的文件进行持久化备份(紫色小三角):

Flink Checkpoint 问题排查

同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator:

Flink Checkpoint 问题排查

1.5 所有算子状态都已上传后确认 Checkpoint 完成

最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件:

Flink Checkpoint 问题排查

Checkpoint 监控面板

Checkpoint 监控面板详细介绍参考官网链接 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/monitoring/checkpoint_monitoring/,这个页面上有解释 End to End Duration、Processed (persisted) in-flight data、Sync Duration、Async Duration、Alignment Duration、Start Delay,如果对这些概念不清楚的同学一定要好好看看。

只有理解了 Checkpoint 的流程,才能看懂 Flink Web UI 提供的 Checkpoint 信息:

Flink Checkpoint 问题排查

日志分析示例

我们在 Checkpoint 的 History Tab 可以看到是否有 Checkpoint 失败:

Flink Checkpoint 问题排查

从上图中我们发现 Checkpoint 10423 失败了,借助 Checkpoint 监控面板,我们可以做初步分析和判断,至于具体的原因可能还需要通过日志来分析:

1、首先我们可以在 jobmanager.log 中查找关键字“checkpoint 10423”,假设我们找到如下内容:

Flink Checkpoint 问题排查
  • 0b60f08bf8984085b59f8d9bc74ce2e1 是 execution id
  • 85d268e6fbc19411185f7e4868a44178 是 job id

2、然后,我们可以在 jobmanager.log 中查找 execution id,找到被调度到哪个 taskmanager 上,类似如下所示:

Flink Checkpoint 问题排查

3、从上面的日志我们知道该 execution 被调度到 hostnameABCDE 的 container_e24_1566836790522_8088_04_013155_1 slot 上,接下来我们就可以到 container container_e24_1566836790522_8088_04_013155 的 taskmanager.log 中查找 Checkpoint 失败的具体原因

常见的 Checkpoint 慢的情况

Checkpoint 慢可以算是一个高频问题,比如 Checkpoint interval 1 分钟,超时 10 分钟,Checkpoint 经常需要做 9 分钟(我们希望 1 分钟左右就能够做完),而且我们预期 state size 不是非常大。

4.1 Checkpointed Data Size 太大

状态数据的大小(在 Checkpoint 监控面板上看)也会影响 Checkpoint 的时间,并且在 Checkpoint 时 IO 压力也会较大。对于像 RocksDB 这种支持增量 Checkpoint 的 StateBackend,如果两次 Checkpoint 之间状态变化不大,那么增量 Checkpoint 能够极大减少状态上传时间。

但当前的增量 Checkpoint 仍存在一些问题:

  • 一是不通用,不是所有的 StateBackend 都能够支持增量 Checkpoint
  • 二是存在由于状态合并的影响,增量状态数据仍会非常大

[FLIP-158: Generalized incremental checkpoints](https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints) 提出了一种通用的增量快照方案,其核心思想是基于 state changelog,changelog 能够细粒度地记录状态数据的变化,如下图所示:

Flink Checkpoint 问题排查

4.2 作业存在反压或者数据倾斜

我们知道 task 仅在接受到所有的 barrier 之后才会进行 snapshot,如果作业存在反压,或者有数据倾斜,则会导致全部的 channel 或者某些 channel 的 barrier 发送慢,从而整体影响 Checkpoint 的时间,我们可以通过如下的页面进行检查:

Flink Checkpoint 问题排查

上图中我们选择了一个 task,查看所有 subtask 的反压情况,发现都是 high,表示反压情况严重,这种情况下会导致下游接收 barrier 比较晚。

分析反压的大致思路是:

  • 如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了
  • 如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游

反压情况可以根据以下表格进行对号入座:

Flink Checkpoint 问题排查

对于 Flink 1.9 及以上版本,除了上述的表格,我们还可以根据 floatingBuffersUsage/exclusiveBuffersUsage(其中inPoolUsage 等于 floatingBuffersUsage 与 exclusiveBuffersUsage 的总和)以及其上游 Task 的 outPoolUsage 来进行进一步的分析一个 Subtask 和其上游 Subtask 的数据传输:

Flink Checkpoint 问题排查

通常来说,floatingBuffersUsage 为高则表明反压正在传导至上游,而 exclusiveBuffersUsage 则表明了反压是否存在倾斜(floatingBuffersUsage 高、exclusiveBuffersUsage 低为有倾斜,因为少数 channel 占用了大部分的 Floating Buffer)。参考链接 [How to identify the source of backpressure](https://flink.apache.org/2021/07/07/backpressure.html) 和 [Monitoring, Metrics, and that Backpressure Thing](https://flink.apache.org/2019/07/23/flink-network-stack-2.html)

再来看看数据倾斜:

Flink Checkpoint 问题排查

上图中我们选择其中一个 operator,点击所有的 subtask,然后按照 Records Received/Bytes Received/TPS 从大到小进行排序,能看到前面几个 subtask 会比其他的 subtask 要处理的数据多。

如果存在反压或者数据倾斜的情况,我们需要首先解决反压或者数据倾斜问题之后,再查看 Checkpoint 的时间是否符合预期。

4.3 Barrier 对齐慢

从前面我们知道 Checkpoint 在 task 端分为 barrier 对齐(收齐所有上游发送过来的 barrier),然后开始同步阶段,再做异步阶段。如果 barrier 一直对不齐的话,就不会开始做 snapshot。

Flink Checkpoint 问题排查

注意:这里我们要理解和区分 Start Delay 和 Alignment Duration。

针对 Barrier 对齐慢的解决思路有两个:

Flink Checkpoint 问题排查

1、让 barrier 能跳过 buffer 中缓存的数据,对应 [FLIP-76: Unaligned Checkpoints](https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints),Unaligned Checkpoints 最根本的思想就是将缓冲的数据当做算子状态的一部分,该机制仍会使用 barrier,用来触发 checkpoint,其原理如下图所示:

Flink Checkpoint 问题排查

2、让 buffer 中的数据变少,对应 [FLIP-183: Dynamic buffer size adjustment (Buffer debloat) ](https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment),其思想是动态调整 Buffer 大小,只缓存配置时间内可以处理的数据量,进而可以预估和控制 barrier 对齐所需的时间

Flink Checkpoint 问题排查

4.4 主线程太忙导致没机会做 snapshot

在 task 端,所有的处理都是单线程的,数据处理和 barrier 处理都由主线程处理,如果主线程在处理太慢(比如使用 RocksDBBackend,state 操作慢导致整体处理慢),导致 barrier 处理的慢,也会影响整体 Checkpoint 的进度,在这一步我们需要能够查看某个 PID 对应热点方法,这里推荐两个方法:

1、多次连续 jstack,查看一直处于 RUNNABLE 状态的线程有哪些

2、使用工具 AsyncProfile dump 一份火焰图,查看占用 CPU 最多的栈。比方说,对 TaskManager 进行 CPU profile,从中我们可以分析到 Task Thread 是否跑满一个 CPU 核:

  • 如果是的话要分析 CPU 主要花费在哪些函数里面
  • 如果不是的话要看 Task Thread 阻塞在哪里,可能是用户函数本身有些同步的调用,也可能是内存资源不足或 GC(包括 TaskManager JVM 各区内存不合理导致的频繁 Full GC 甚至失联)等原因导致的暂时的暂停
  • 推荐给 TaskManager 启用 G1 垃圾回收器来优化 GC,并加上 -XX:+PrintGCDetails 来打印 GC 日志的方式来观察 GC 的问题

4.5 同步阶段做的慢

同步阶段一般不会太慢,但是如果我们通过日志发现同步阶段比较慢,那么可以分两类情况来分析:

  • 对于非 RocksDBBackend 我们可以考虑查看是否开启了异步 snapshot,如果开启了异步 snapshot 还是慢,需要看整个 JVM 在干嘛,也可以使用上面提到的工具
  • 对于 RocksDBBackend 来说,我们可以用 iostate 查看磁盘的压力如何,另外可以查看 tm 端 RocksDB 的 log 的日志如何,查看其中 snapshot 的时间总共开销多少

4.6 异步阶段做的慢

对于异步阶段来说,tm 端主要将 state 备份到持久化存储上:

  • 对于非 RocksDBBackend 来说,主要瓶颈来自于网络,这个阶段可以考虑观察网络的 metric,或者对应机器上能够观察到网络流量的情况(比如 iftop)
  • 对于 RocksDB 来说,则需要从本地读取文件,写入到远程的持久化存储上,所以不仅需要考虑网络的瓶颈,还需要考虑本地磁盘的性能;另外对于 RocksDBBackend 来说,如果觉得网络流量不是瓶颈,但是上传比较慢的话,还可以尝试考虑开启多线程上传功能

常见的检查点和状态问题

1、Received checkpoint barrier for checkpoint <cp_id> before completing current checkpoint <cp_id>. Skipping current checkpoint.

在当前检查点还未做完时,收到了更新的检查点的 barrier,表示当前检查点不再需要而被取消掉,一般不需要特殊处理。

2、Checkpoint <cp_id> expired before completing

首先应检查 CheckpointConfig.setCheckpointTimeout() 方法设定的检查点超时,如果设的太短,适当改长一点。另外就是考虑发生了反压或数据倾斜,或者 barrier 对齐太慢。

3、org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible

Flink 的状态是按 key 组织并保存的,如果程序逻辑内改了 keyBy() 逻辑或者 key 的序列化逻辑,就会导致检查点/保存点的数据无法正确恢复。所以如果必须要改key相关的东西,就弃用之前的状态数据吧。

4、org.apache.flink.util.StateMigrationException: The new serializer for a MapState requires state migration in order for the job to proceed. However, migration for MapState currently isn't supported

在 1.9 之前的 Flink 版本中,如果我们使用 RocksDB 状态后端,并且更改了自用 MapState的schema,恢复作业时会抛出此异常,表示不支持更改 schema。这个问题已经在 FLINK-11947 解决,升级版本即可。

5、时钟不同步导致无法启动

启动Flink任务的时候报错 Caused by: java.lang.RuntimeException: Couldn't deploy Yarn cluster。

然后仔细看发现:system times on machines may be out of sync。

意思说是机器上的系统时间可能不同步。同步集群机器时间即可。

继续阅读