状态操作符和用户自定义函数都是我们在写流处理程序时,常用的工具。事实上,大部分稍微复杂一点的逻辑都需要保存数据或者保存计算结果。很多Flink内置的操作符例如:source操作符,sink操作符等等都是有状态的,也就是说会缓存流数据或者计算结果。例如,窗口操作符将会为ProcessWindowFunction收集输入的数据,或者收集ReduceFunction计算的结果。而ProcessFunction也会保存定时器事件,一些sink方法为了做到exactly-once,会将事务保存下来。除了内置的操作符以及提供的source和sink操作符,Flink的DataStream API还在UDF函数中暴露了可以注册、保存和访问状态的接口。
本章重点讨论有状态的用户自定义函数的实现,以及讨论有状态应用的性能和健壮性。特别的,我们将解释在用户自定义函数中,如何定义不同类型的状态,以及如何与状态进行交互。我们还讨论了性能方面的问题以及如何控制状态大小的问题。
1 实现有状态的用户自定义函数
我们知道函数有两种状态,键控状态(keyed state)和操作符状态(operator state)。
1.1 在RuntimeContext中定义键控状态
用户自定义函数可以使用keyed state来存储和访问key对应的状态。对于每一个key,Flink将会维护一个状态实例。一个操作符的状态实例将会被分发到操作符的所有并行任务中去。这表明函数的每一个并行任务只为所有key的某一部分key保存key对应的状态实例。所以keyed state和分布式key-value map数据结构非常类似。
keyed state仅可用于KeyedStream。Flink支持以下数据类型的状态变量:
- ValueState[T]保存单个的值,值的类型为T。
- get操作: ValueState.value()
- set操作: ValueState.update(value: T)
- ListState[T]保存一个列表,列表里的元素的数据类型为T。基本操作如下:
- ListState.add(value: T)
- ListState.addAll(values: java.util.List[T])
- ListState.get()返回Iterable[T]
- ListState.update(values: java.util.List[T])
- MapState[K, V]保存Key-Value对。
- MapState.get(key: K)
- MapState.put(key: K, value: V)
- MapState.contains(key: K)
- MapState.remove(key: K)
- ReducingState[T]
- AggregatingState[I, O]
ReducingState[T]
和
AggregatingState[IN, OUT]
与
ListState[T]
同属于
MergingState[T]
。与
ListState[T]
不同的是,
ReducingState[T]
只有一个元素,而不是一个列表。它的原理是新元素通过
add(value: T)
加入后,与已有的状态元素使用
ReduceFunction
合并为一个元素,并更新到状态里。
AggregatingState[IN, OUT]
ReducingState[T]
类似,也只有一个元素,只不过
AggregatingState[IN, OUT]
的输入和输出类型可以不一样。
ReducingState[T]
AggregatingState[IN, OUT]
与窗口上进行
ReduceFunction
AggregateFunction
很像,都是将新元素与已有元素做聚合。
State.clear()是清空操作。
scala version
val sensorData: DataStream[SensorReading] = ...
val keyedData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id)
val alerts: DataStream[(String, Double, Double)] = keyedData
.flatMap(new TemperatureAlertFunction(1.7))
class TemperatureAlertFunction(val threshold: Double)
extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
private var lastTempState: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
val lastTempDescriptor = new ValueStateDescriptor[Double](
"lastTemp", classOf[Double])
lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor)
}
override def flatMap(
reading: SensorReading,
out: Collector[(String, Double, Double)]
): Unit = {
val lastTemp = lastTempState.value()
val tempDiff = (reading.temperature - lastTemp).abs
if (tempDiff > threshold) {
out.collect((reading.id, reading.temperature, tempDiff))
}
this.lastTempState.update(reading.temperature)
}
}
上面例子中的FlatMapFunction只能访问当前处理的元素所包含的key所对应的状态变量。
不同key对应的keyed state是相互隔离的。
- 通过RuntimeContext注册StateDescriptor。StateDescriptor以状态state的名字和存储的数据类型为参数。数据类型必须指定,因为Flink需要选择合适的序列化器。
- 在open()方法中创建state变量。注意复习之前的RichFunction相关知识。
当一个函数注册了StateDescriptor描述符,Flink会检查状态后端是否已经存在这个状态。这种情况通常出现在应用挂掉要从检查点或者保存点恢复的时候。在这两种情况下,Flink会将注册的状态连接到已经存在的状态。如果不存在状态,则初始化一个空的状态。
使用FlatMap with keyed ValueState的快捷方式flatMapWithState也可以实现以上需求。
val alerts: DataStream[(String, Double, Double)] = keyedSensorData
.flatMapWithState[(String, Double, Double), Double] {
case (in: SensorReading, None) =>
// no previous temperature defined.
// Just update the last temperature
(List.empty, Some(in.temperature))
case (SensorReading r, lastTemp: Some[Double]) =>
// compare temperature difference with threshold
val tempDiff = (r.temperature - lastTemp.get).abs
if (tempDiff > 1.7) {
// threshold exceeded.
// Emit an alert and update the last temperature
(List((r.id, r.temperature, tempDiff)), Some(r.temperature))
} else {
// threshold not exceeded. Just update the last temperature
(List.empty, Some(r.temperature))
}
}
1.2 使用ListCheckpointed接口来实现操作符的列表状态
操作符状态会在操作符的每一个并行实例中去维护。一个操作符并行实例上的所有事件都可以访问同一个状态。Flink支持三种操作符状态:list state, list union state, broadcast state。
一个函数可以实现ListCheckpointed接口来处理操作符的list state。ListCheckpointed接口无法处理ValueState和ListState,因为这些状态是注册在状态后端的。操作符状态类似于成员变量,和状态后端的交互通过ListCheckpointed接口的回调函数实现。接口提供了两个方法:
// 返回函数状态的快照,返回值为列表
snapshotState(checkpointId: Long, timestamp: Long): java.util.List[T]
// 从列表恢复函数状态
restoreState(java.util.List[T] state): Unit
当Flink触发stateful functon的一次checkpoint时,snapshotState()方法会被调用。方法接收两个参数,checkpointId为唯一的单调递增的检查点Id,timestamp为当master机器开始做检查点操作时的墙上时钟(机器时间)。方法必须返回序列化好的状态对象的列表。
当宕机程序从检查点或者保存点恢复时会调用restoreState()方法。restoreState使用snapshotState保存的列表来恢复。
下面的例子展示了如何实现ListCheckpointed接口。业务场景为:一个对每一个并行实例的超过阈值的温度的计数程序。
class HighTempCounter(val threshold: Double)
extends RichFlatMapFunction[SensorReading, (Int, Long)]
with ListCheckpointed[java.lang.Long] {
// index of the subtask
private lazy val subtaskIdx = getRuntimeContext
.getIndexOfThisSubtask
// local count variable
private var highTempCnt = 0L
override def flatMap(
in: SensorReading,
out: Collector[(Int, Long)]): Unit = {
if (in.temperature > threshold) {
// increment counter if threshold is exceeded
highTempCnt += 1
// emit update with subtask index and counter
out.collect((subtaskIdx, highTempCnt))
}
}
override def restoreState(
state: util.List[java.lang.Long]): Unit = {
highTempCnt = 0
// restore state by adding all longs of the list
for (cnt <- state.asScala) {
highTempCnt += cnt
}
}
override def snapshotState(
chkpntId: Long,
ts: Long): java.util.List[java.lang.Long] = {
// snapshot state as list with a single count
java.util.Collections.singletonList(highTempCnt)
}
}
上面的例子中,每一个并行实例都计数了本实例有多少温度值超过了设定的阈值。例子中使用了操作符状态,并且每一个并行实例都拥有自己的状态变量,这个状态变量将会被检查点操作保存下来,并且可以通过使用ListCheckpointed接口来恢复状态变量。
看了上面的例子,我们可能会有疑问,那就是为什么操作符状态是状态对象的列表。这是因为列表数据结构支持包含操作符状态的函数的并行度改变的操作。为了增加或者减少包含了操作符状态的函数的并行度,操作符状态需要被重新分区到更多或者更少的并行任务实例中去。而这样的操作需要合并或者分割状态对象。而对于每一个有状态的函数,分割和合并状态对象都是很常见的操作,所以这显然不是任何类型的状态都能自动完成的。
通过提供一个状态对象的列表,拥有操作符状态的函数可以使用snapshotState()方法和restoreState()方法来实现以上所说的逻辑。snapshotState()方法将操作符状态分割成多个部分,restoreState()方法从所有的部分中将状态对象收集起来。当函数的操作符状态恢复时,状态变量将被分区到函数的所有不同的并行实例中去,并作为参数传递给restoreState()方法。如果并行任务的数量大于状态对象的数量,那么一些并行任务在开始的时候是没有状态的,所以restoreState()函数的参数为空列表。
再来看一下上面的程序,我们可以看到操作符的每一个并行实例都暴露了一个状态对象的列表。如果我们增加操作符的并行度,那么一些并行任务将会从0开始计数。为了获得更好的状态分区的行为,当HighTempCounter函数扩容时,我们可以按照下面的程序来实现snapshotState()方法,这样就可以把计数值分配到不同的并行计数中去了。
override def snapshotState(
chkpntId: Long,
ts: Long): java.util.List[java.lang.Long] = {
// split count into ten partial counts
val div = highTempCnt / 10
val mod = (highTempCnt % 10).toInt
// return count as ten parts
(List.fill(mod)(new java.lang.Long(div + 1)) ++
List.fill(10 - mod)(new java.lang.Long(div))).asJava
}
1.3 使用连接的广播状态
一个常见的需求就是流应用需要将同样的事件分发到操作符的所有的并行实例中,而这样的分发操作还得是可恢复的。
我们举个例子:一条流是一个规则(比如5秒钟内连续两个超过阈值的温度),另一条流是待匹配的流。也就是说,规则流和事件流。所以每一个操作符的并行实例都需要把规则流保存在操作符状态中。也就是说,规则流需要被广播到所有的并行实例中去。
在Flink中,这样的状态叫做广播状态(broadcast state)。广播状态和DataStream或者KeyedStream都可以做连接操作。
下面的例子实现了一个温度报警应用,应用有可以动态设定的阈值,动态设定通过广播流来实现。
val sensorData: DataStream[SensorReading] = ...
val thresholds: DataStream[ThresholdUpdate] = ...
val keyedSensorData: KeyedStream[SensorReading, String] = sensorData
.keyBy(_.id)
// the descriptor of the broadcast state
val broadcastStateDescriptor =
new MapStateDescriptor[String, Double](
"thresholds", classOf[String], classOf[Double])
val broadcastThresholds: BroadcastStream[ThresholdUpdate] = thresholds
.broadcast(broadcastStateDescriptor)
// connect keyed sensor stream and broadcasted rules stream
val alerts: DataStream[(String, Double, Double)] = keyedSensorData
.connect(broadcastThresholds)
.process(new UpdatableTemperatureAlertFunction())
带有广播状态的函数在应用到两条流上时分三个步骤:
- 调用DataStream.broadcast()来创建BroadcastStream,定义一个或者多个MapStateDescriptor对象。
- 将BroadcastStream和DataStream/KeyedStream做connect操作。
- 在connected streams上调用KeyedBroadcastProcessFunction/BroadcastProcessFunction。
下面的例子实现了动态设定温度阈值的功能。
class UpdatableTemperatureAlertFunction()
extends KeyedBroadcastProcessFunction[String,
SensorReading, ThresholdUpdate, (String, Double, Double)] {
// the descriptor of the broadcast state
private lazy val thresholdStateDescriptor =
new MapStateDescriptor[String, Double](
"thresholds", classOf[String], classOf[Double])
// the keyed state handle
private var lastTempState: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
// create keyed state descriptor
val lastTempDescriptor = new ValueStateDescriptor[Double](
"lastTemp", classOf[Double])
// obtain the keyed state handle
lastTempState = getRuntimeContext
.getState[Double](lastTempDescriptor)
}
override def processBroadcastElement(
update: ThresholdUpdate,
ctx: KeyedBroadcastProcessFunction[String,
SensorReading, ThresholdUpdate,
(String, Double, Double)]#Context,
out: Collector[(String, Double, Double)]): Unit = {
// get broadcasted state handle
val thresholds = ctx
.getBroadcastState(thresholdStateDescriptor)
if (update.threshold != 0.0d) {
// configure a new threshold for the sensor
thresholds.put(update.id, update.threshold)
} else {
// remove threshold for the sensor
thresholds.remove(update.id)
}
}
override def processElement(
reading: SensorReading,
readOnlyCtx: KeyedBroadcastProcessFunction
[String, SensorReading, ThresholdUpdate,
(String, Double, Double)]#ReadOnlyContext,
out: Collector[(String, Double, Double)]): Unit = {
// get read-only broadcast state
val thresholds = readOnlyCtx
.getBroadcastState(thresholdStateDescriptor)
// check if we have a threshold
if (thresholds.contains(reading.id)) {
// get threshold for sensor
val sensorThreshold: Double = thresholds.get(reading.id)
// fetch the last temperature from state
val lastTemp = lastTempState.value()
// check if we need to emit an alert
val tempDiff = (reading.temperature - lastTemp).abs
if (tempDiff > sensorThreshold) {
// temperature increased by more than the threshold
out.collect((reading.id, reading.temperature, tempDiff))
}
}
// update lastTemp state
this.lastTempState.update(reading.temperature)
}
}
2 配置检查点
10秒钟保存一次检查点。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;
// set checkpointing interval to 10 seconds (10000 milliseconds)
env.enableCheckpointing(10000L);
2.1 将hdfs配置为状态后端
首先在IDEA的pom文件中添加依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.3</version>
<!-- <scope>provided</scope>-->
</dependency>
在
hdfs-site.xml
添加:
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
别忘了重启hdfs文件系统!
然后添加本地文件夹和hdfs文件的映射:
hdfs getconf -confKey fs.default.name
hdfs dfs -put /home/parallels/flink/checkpoint hdfs://localhost:9000/flink
然后在代码中添加:
env.enableCheckpointing(5000) env.setStateBackend(new FsStateBackend("hdfs://localhost:9000/flink"))
检查一下检查点正确保存了没有:
hdfs dfs -ls hdfs://localhost:9000/flink
3 保证有状态应用的可维护性
3.1 指定唯一的操作符标识符
每一个操作符都可以指定唯一的标识符。标识符将会作为操作符的元数据和状态数据一起保存到savepoint中去。当应用从保存点恢复时,标识符可以用来在savepoint中查找标识符对应的操作符的状态数据。标识符必须是唯一的,否则应用不知道从哪一个标识符恢复。
强烈建议为应用的每一个操作符定义唯一标识符。例子:
DataStream<Tuple3<String, Double, Double>> alerts = keyedSensorData
.flatMap(new TemperatureAlertFunction(1.1))
.uid("TempAlert");
3.2 指定操作符的最大并行度
操作符的最大并行度定义了操作符的keyed state可以被分到多少个key groups中。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;
// set the maximum parallelism for this application
env.setMaxParallelism(512);
DataStream<Tuple3<String, Double, Double>> alerts = keyedSensorData
.flatMap(new TemperatureAlertFunction(1.1))
// set the maximum parallelism for this operator and
// override the application-wide value
.setMaxParallelism(1024);
4 有状态应用的性能和健壮性
4.1 选择一个状态后端
- MemoryStateBackend将状态当作Java的对象(没有序列化操作)存储在TaskManager JVM进程的堆上。
- FsStateBackend将状态存储在本地的文件系统或者远程的文件系统如HDFS。
- RocksDBStateBackend将状态存储在RocksDB \footnote{Facebook开源的KV数据库} 中。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;
String checkpointPath = ???
// configure path for checkpoints on the remote filesystem
// env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
val backend = new RocksDBStateBackend(checkpointPath)
// configure the state backend
env.setStateBackend(backend);
4.2 防止状态泄露
流应用通常需要运行几个月或者几年。如果state数据不断增长的话,会爆炸。所以控制state数据的大小十分重要。而Flink并不会清理state和gc。所以所有的stateful operator都需要控制他们各自的状态数据大小,保证不爆炸。
例如我们之前讲过增量聚合函数ReduceFunction/AggregateFunction,就可以提前聚合而不给state太多压力。
我们来看一个例子,我们实现了一个KeyedProcessFunction,用来计算连续两次的温度的差值,如果差值超过阈值,报警。
class SelfCleaningTemperatureAlertFunction(val threshold: Double)
extends KeyedProcessFunction[String,
SensorReading, (String, Double, Double)] {
// the keyed state handle for the last temperature
private var lastTempState: ValueState[Double] = _
// the keyed state handle for the last registered timer
private var lastTimerState: ValueState[Long] = _
override def open(parameters: Configuration): Unit = {
// register state for last temperature
val lastTempDesc = new ValueStateDescriptor[Double](
"lastTemp", classOf[Double])
lastTempState = getRuntimeContext
.getState[Double](lastTempDescriptor)
// register state for last timer
val lastTimerDesc = new ValueStateDescriptor[Long](
"lastTimer", classOf[Long])
lastTimerState = getRuntimeContext
.getState(timestampDescriptor)
}
override def processElement(
reading: SensorReading,
ctx: KeyedProcessFunction
[String, SensorReading, (String, Double, Double)]#Context,
out: Collector[(String, Double, Double)]): Unit = {
// compute timestamp of new clean up timer
// as record timestamp + one hour
val newTimer = ctx.timestamp() + (3600 * 1000)
// get timestamp of current timer
val curTimer = lastTimerState.value()
// delete previous timer and register new timer
ctx.timerService().deleteEventTimeTimer(curTimer)
ctx.timerService().registerEventTimeTimer(newTimer)
// update timer timestamp state
lastTimerState.update(newTimer)
// fetch the last temperature from state
val lastTemp = lastTempState.value()
// check if we need to emit an alert
val tempDiff = (reading.temperature - lastTemp).abs
if (tempDiff > threshold) {
// temperature increased by more than the threshold
out.collect((reading.id, reading.temperature, tempDiff))
}
// update lastTemp state
this.lastTempState.update(reading.temperature)
}
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[String,
SensorReading, (String, Double, Double)]#OnTimerContext,
out: Collector[(String, Double, Double)]): Unit = {
// clear all state for the key
lastTempState.clear()
lastTimerState.clear()
}
}