狀态操作符和使用者自定義函數都是我們在寫流處理程式時,常用的工具。事實上,大部分稍微複雜一點的邏輯都需要儲存資料或者儲存計算結果。很多Flink内置的操作符例如:source操作符,sink操作符等等都是有狀态的,也就是說會緩存流資料或者計算結果。例如,視窗操作符将會為ProcessWindowFunction收集輸入的資料,或者收集ReduceFunction計算的結果。而ProcessFunction也會儲存定時器事件,一些sink方法為了做到exactly-once,會将事務儲存下來。除了内置的操作符以及提供的source和sink操作符,Flink的DataStream API還在UDF函數中暴露了可以注冊、儲存和通路狀态的接口。
本章重點讨論有狀态的使用者自定義函數的實作,以及讨論有狀态應用的性能和健壯性。特别的,我們将解釋在使用者自定義函數中,如何定義不同類型的狀态,以及如何與狀态進行互動。我們還讨論了性能方面的問題以及如何控制狀态大小的問題。
1 實作有狀态的使用者自定義函數
我們知道函數有兩種狀态,鍵控狀态(keyed state)和操作符狀态(operator state)。
1.1 在RuntimeContext中定義鍵控狀态
使用者自定義函數可以使用keyed state來存儲和通路key對應的狀态。對于每一個key,Flink将會維護一個狀态執行個體。一個操作符的狀态執行個體将會被分發到操作符的所有并行任務中去。這表明函數的每一個并行任務隻為所有key的某一部分key儲存key對應的狀态執行個體。是以keyed state和分布式key-value map資料結構非常類似。
keyed state僅可用于KeyedStream。Flink支援以下資料類型的狀态變量:
- ValueState[T]儲存單個的值,值的類型為T。
- get操作: ValueState.value()
- set操作: ValueState.update(value: T)
- ListState[T]儲存一個清單,清單裡的元素的資料類型為T。基本操作如下:
- ListState.add(value: T)
- ListState.addAll(values: java.util.List[T])
- ListState.get()傳回Iterable[T]
- ListState.update(values: java.util.List[T])
- MapState[K, V]儲存Key-Value對。
- MapState.get(key: K)
- MapState.put(key: K, value: V)
- MapState.contains(key: K)
- MapState.remove(key: K)
- ReducingState[T]
- AggregatingState[I, O]
ReducingState[T]
和
AggregatingState[IN, OUT]
與
ListState[T]
同屬于
MergingState[T]
。與
ListState[T]
不同的是,
ReducingState[T]
隻有一個元素,而不是一個清單。它的原理是新元素通過
add(value: T)
加入後,與已有的狀态元素使用
ReduceFunction
合并為一個元素,并更新到狀态裡。
AggregatingState[IN, OUT]
ReducingState[T]
類似,也隻有一個元素,隻不過
AggregatingState[IN, OUT]
的輸入和輸出類型可以不一樣。
ReducingState[T]
AggregatingState[IN, OUT]
與視窗上進行
ReduceFunction
AggregateFunction
很像,都是将新元素與已有元素做聚合。
State.clear()是清空操作。
scala version
val sensorData: DataStream[SensorReading] = ...
val keyedData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id)
val alerts: DataStream[(String, Double, Double)] = keyedData
.flatMap(new TemperatureAlertFunction(1.7))
class TemperatureAlertFunction(val threshold: Double)
extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
private var lastTempState: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
val lastTempDescriptor = new ValueStateDescriptor[Double](
"lastTemp", classOf[Double])
lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor)
}
override def flatMap(
reading: SensorReading,
out: Collector[(String, Double, Double)]
): Unit = {
val lastTemp = lastTempState.value()
val tempDiff = (reading.temperature - lastTemp).abs
if (tempDiff > threshold) {
out.collect((reading.id, reading.temperature, tempDiff))
}
this.lastTempState.update(reading.temperature)
}
}
上面例子中的FlatMapFunction隻能通路目前處理的元素所包含的key所對應的狀态變量。
不同key對應的keyed state是互相隔離的。
- 通過RuntimeContext注冊StateDescriptor。StateDescriptor以狀态state的名字和存儲的資料類型為參數。資料類型必須指定,因為Flink需要選擇合适的序列化器。
- 在open()方法中建立state變量。注意複習之前的RichFunction相關知識。
當一個函數注冊了StateDescriptor描述符,Flink會檢查狀态後端是否已經存在這個狀态。這種情況通常出現在應用挂掉要從檢查點或者儲存點恢複的時候。在這兩種情況下,Flink會将注冊的狀态連接配接到已經存在的狀态。如果不存在狀态,則初始化一個空的狀态。
使用FlatMap with keyed ValueState的快捷方式flatMapWithState也可以實作以上需求。
val alerts: DataStream[(String, Double, Double)] = keyedSensorData
.flatMapWithState[(String, Double, Double), Double] {
case (in: SensorReading, None) =>
// no previous temperature defined.
// Just update the last temperature
(List.empty, Some(in.temperature))
case (SensorReading r, lastTemp: Some[Double]) =>
// compare temperature difference with threshold
val tempDiff = (r.temperature - lastTemp.get).abs
if (tempDiff > 1.7) {
// threshold exceeded.
// Emit an alert and update the last temperature
(List((r.id, r.temperature, tempDiff)), Some(r.temperature))
} else {
// threshold not exceeded. Just update the last temperature
(List.empty, Some(r.temperature))
}
}
1.2 使用ListCheckpointed接口來實作操作符的清單狀态
操作符狀态會在操作符的每一個并行執行個體中去維護。一個操作符并行執行個體上的所有事件都可以通路同一個狀态。Flink支援三種操作符狀态:list state, list union state, broadcast state。
一個函數可以實作ListCheckpointed接口來處理操作符的list state。ListCheckpointed接口無法處理ValueState和ListState,因為這些狀态是注冊在狀态後端的。操作符狀态類似于成員變量,和狀态後端的互動通過ListCheckpointed接口的回調函數實作。接口提供了兩個方法:
// 傳回函數狀态的快照,傳回值為清單
snapshotState(checkpointId: Long, timestamp: Long): java.util.List[T]
// 從清單恢複函數狀态
restoreState(java.util.List[T] state): Unit
當Flink觸發stateful functon的一次checkpoint時,snapshotState()方法會被調用。方法接收兩個參數,checkpointId為唯一的單調遞增的檢查點Id,timestamp為當master機器開始做檢查點操作時的牆上時鐘(機器時間)。方法必須傳回序列化好的狀态對象的清單。
當當機程式從檢查點或者儲存點恢複時會調用restoreState()方法。restoreState使用snapshotState儲存的清單來恢複。
下面的例子展示了如何實作ListCheckpointed接口。業務場景為:一個對每一個并行執行個體的超過門檻值的溫度的計數程式。
class HighTempCounter(val threshold: Double)
extends RichFlatMapFunction[SensorReading, (Int, Long)]
with ListCheckpointed[java.lang.Long] {
// index of the subtask
private lazy val subtaskIdx = getRuntimeContext
.getIndexOfThisSubtask
// local count variable
private var highTempCnt = 0L
override def flatMap(
in: SensorReading,
out: Collector[(Int, Long)]): Unit = {
if (in.temperature > threshold) {
// increment counter if threshold is exceeded
highTempCnt += 1
// emit update with subtask index and counter
out.collect((subtaskIdx, highTempCnt))
}
}
override def restoreState(
state: util.List[java.lang.Long]): Unit = {
highTempCnt = 0
// restore state by adding all longs of the list
for (cnt <- state.asScala) {
highTempCnt += cnt
}
}
override def snapshotState(
chkpntId: Long,
ts: Long): java.util.List[java.lang.Long] = {
// snapshot state as list with a single count
java.util.Collections.singletonList(highTempCnt)
}
}
上面的例子中,每一個并行執行個體都計數了本執行個體有多少溫度值超過了設定的門檻值。例子中使用了操作符狀态,并且每一個并行執行個體都擁有自己的狀态變量,這個狀态變量将會被檢查點操作儲存下來,并且可以通過使用ListCheckpointed接口來恢複狀态變量。
看了上面的例子,我們可能會有疑問,那就是為什麼操作符狀态是狀态對象的清單。這是因為清單資料結構支援包含操作符狀态的函數的并行度改變的操作。為了增加或者減少包含了操作符狀态的函數的并行度,操作符狀态需要被重新分區到更多或者更少的并行任務執行個體中去。而這樣的操作需要合并或者分割狀态對象。而對于每一個有狀态的函數,分割和合并狀态對象都是很常見的操作,是以這顯然不是任何類型的狀态都能自動完成的。
通過提供一個狀态對象的清單,擁有操作符狀态的函數可以使用snapshotState()方法和restoreState()方法來實作以上所說的邏輯。snapshotState()方法将操作符狀态分割成多個部分,restoreState()方法從所有的部分中将狀态對象收集起來。當函數的操作符狀态恢複時,狀态變量将被分區到函數的所有不同的并行執行個體中去,并作為參數傳遞給restoreState()方法。如果并行任務的數量大于狀态對象的數量,那麼一些并行任務在開始的時候是沒有狀态的,是以restoreState()函數的參數為空清單。
再來看一下上面的程式,我們可以看到操作符的每一個并行執行個體都暴露了一個狀态對象的清單。如果我們增加操作符的并行度,那麼一些并行任務将會從0開始計數。為了獲得更好的狀态分區的行為,當HighTempCounter函數擴容時,我們可以按照下面的程式來實作snapshotState()方法,這樣就可以把計數值配置設定到不同的并行計數中去了。
override def snapshotState(
chkpntId: Long,
ts: Long): java.util.List[java.lang.Long] = {
// split count into ten partial counts
val div = highTempCnt / 10
val mod = (highTempCnt % 10).toInt
// return count as ten parts
(List.fill(mod)(new java.lang.Long(div + 1)) ++
List.fill(10 - mod)(new java.lang.Long(div))).asJava
}
1.3 使用連接配接的廣播狀态
一個常見的需求就是流應用需要将同樣的事件分發到操作符的所有的并行執行個體中,而這樣的分發操作還得是可恢複的。
我們舉個例子:一條流是一個規則(比如5秒鐘内連續兩個超過門檻值的溫度),另一條流是待比對的流。也就是說,規則流和事件流。是以每一個操作符的并行執行個體都需要把規則流儲存在操作符狀态中。也就是說,規則流需要被廣播到所有的并行執行個體中去。
在Flink中,這樣的狀态叫做廣播狀态(broadcast state)。廣播狀态和DataStream或者KeyedStream都可以做連接配接操作。
下面的例子實作了一個溫度報警應用,應用有可以動态設定的門檻值,動态設定通過廣播流來實作。
val sensorData: DataStream[SensorReading] = ...
val thresholds: DataStream[ThresholdUpdate] = ...
val keyedSensorData: KeyedStream[SensorReading, String] = sensorData
.keyBy(_.id)
// the descriptor of the broadcast state
val broadcastStateDescriptor =
new MapStateDescriptor[String, Double](
"thresholds", classOf[String], classOf[Double])
val broadcastThresholds: BroadcastStream[ThresholdUpdate] = thresholds
.broadcast(broadcastStateDescriptor)
// connect keyed sensor stream and broadcasted rules stream
val alerts: DataStream[(String, Double, Double)] = keyedSensorData
.connect(broadcastThresholds)
.process(new UpdatableTemperatureAlertFunction())
帶有廣播狀态的函數在應用到兩條流上時分三個步驟:
- 調用DataStream.broadcast()來建立BroadcastStream,定義一個或者多個MapStateDescriptor對象。
- 将BroadcastStream和DataStream/KeyedStream做connect操作。
- 在connected streams上調用KeyedBroadcastProcessFunction/BroadcastProcessFunction。
下面的例子實作了動态設定溫度門檻值的功能。
class UpdatableTemperatureAlertFunction()
extends KeyedBroadcastProcessFunction[String,
SensorReading, ThresholdUpdate, (String, Double, Double)] {
// the descriptor of the broadcast state
private lazy val thresholdStateDescriptor =
new MapStateDescriptor[String, Double](
"thresholds", classOf[String], classOf[Double])
// the keyed state handle
private var lastTempState: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
// create keyed state descriptor
val lastTempDescriptor = new ValueStateDescriptor[Double](
"lastTemp", classOf[Double])
// obtain the keyed state handle
lastTempState = getRuntimeContext
.getState[Double](lastTempDescriptor)
}
override def processBroadcastElement(
update: ThresholdUpdate,
ctx: KeyedBroadcastProcessFunction[String,
SensorReading, ThresholdUpdate,
(String, Double, Double)]#Context,
out: Collector[(String, Double, Double)]): Unit = {
// get broadcasted state handle
val thresholds = ctx
.getBroadcastState(thresholdStateDescriptor)
if (update.threshold != 0.0d) {
// configure a new threshold for the sensor
thresholds.put(update.id, update.threshold)
} else {
// remove threshold for the sensor
thresholds.remove(update.id)
}
}
override def processElement(
reading: SensorReading,
readOnlyCtx: KeyedBroadcastProcessFunction
[String, SensorReading, ThresholdUpdate,
(String, Double, Double)]#ReadOnlyContext,
out: Collector[(String, Double, Double)]): Unit = {
// get read-only broadcast state
val thresholds = readOnlyCtx
.getBroadcastState(thresholdStateDescriptor)
// check if we have a threshold
if (thresholds.contains(reading.id)) {
// get threshold for sensor
val sensorThreshold: Double = thresholds.get(reading.id)
// fetch the last temperature from state
val lastTemp = lastTempState.value()
// check if we need to emit an alert
val tempDiff = (reading.temperature - lastTemp).abs
if (tempDiff > sensorThreshold) {
// temperature increased by more than the threshold
out.collect((reading.id, reading.temperature, tempDiff))
}
}
// update lastTemp state
this.lastTempState.update(reading.temperature)
}
}
2 配置檢查點
10秒鐘儲存一次檢查點。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;
// set checkpointing interval to 10 seconds (10000 milliseconds)
env.enableCheckpointing(10000L);
2.1 将hdfs配置為狀态後端
首先在IDEA的pom檔案中添加依賴:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.3</version>
<!-- <scope>provided</scope>-->
</dependency>
在
hdfs-site.xml
添加:
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
别忘了重新開機hdfs檔案系統!
然後添加本地檔案夾和hdfs檔案的映射:
hdfs getconf -confKey fs.default.name
hdfs dfs -put /home/parallels/flink/checkpoint hdfs://localhost:9000/flink
然後在代碼中添加:
env.enableCheckpointing(5000) env.setStateBackend(new FsStateBackend("hdfs://localhost:9000/flink"))
檢查一下檢查點正确儲存了沒有:
hdfs dfs -ls hdfs://localhost:9000/flink
3 保證有狀态應用的可維護性
3.1 指定唯一的操作符辨別符
每一個操作符都可以指定唯一的辨別符。辨別符将會作為操作符的中繼資料和狀态資料一起儲存到savepoint中去。當應用從儲存點恢複時,辨別符可以用來在savepoint中查找辨別符對應的操作符的狀态資料。辨別符必須是唯一的,否則應用不知道從哪一個辨別符恢複。
強烈建議為應用的每一個操作符定義唯一辨別符。例子:
DataStream<Tuple3<String, Double, Double>> alerts = keyedSensorData
.flatMap(new TemperatureAlertFunction(1.1))
.uid("TempAlert");
3.2 指定操作符的最大并行度
操作符的最大并行度定義了操作符的keyed state可以被分到多少個key groups中。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;
// set the maximum parallelism for this application
env.setMaxParallelism(512);
DataStream<Tuple3<String, Double, Double>> alerts = keyedSensorData
.flatMap(new TemperatureAlertFunction(1.1))
// set the maximum parallelism for this operator and
// override the application-wide value
.setMaxParallelism(1024);
4 有狀态應用的性能和健壯性
4.1 選擇一個狀态後端
- MemoryStateBackend将狀态當作Java的對象(沒有序列化操作)存儲在TaskManager JVM程序的堆上。
- FsStateBackend将狀态存儲在本地的檔案系統或者遠端的檔案系統如HDFS。
- RocksDBStateBackend将狀态存儲在RocksDB \footnote{Facebook開源的KV資料庫} 中。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;
String checkpointPath = ???
// configure path for checkpoints on the remote filesystem
// env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
val backend = new RocksDBStateBackend(checkpointPath)
// configure the state backend
env.setStateBackend(backend);
4.2 防止狀态洩露
流應用通常需要運作幾個月或者幾年。如果state資料不斷增長的話,會爆炸。是以控制state資料的大小十分重要。而Flink并不會清理state和gc。是以所有的stateful operator都需要控制他們各自的狀态資料大小,保證不爆炸。
例如我們之前講過增量聚合函數ReduceFunction/AggregateFunction,就可以提前聚合而不給state太多壓力。
我們來看一個例子,我們實作了一個KeyedProcessFunction,用來計算連續兩次的溫度的內插補點,如果內插補點超過門檻值,報警。
class SelfCleaningTemperatureAlertFunction(val threshold: Double)
extends KeyedProcessFunction[String,
SensorReading, (String, Double, Double)] {
// the keyed state handle for the last temperature
private var lastTempState: ValueState[Double] = _
// the keyed state handle for the last registered timer
private var lastTimerState: ValueState[Long] = _
override def open(parameters: Configuration): Unit = {
// register state for last temperature
val lastTempDesc = new ValueStateDescriptor[Double](
"lastTemp", classOf[Double])
lastTempState = getRuntimeContext
.getState[Double](lastTempDescriptor)
// register state for last timer
val lastTimerDesc = new ValueStateDescriptor[Long](
"lastTimer", classOf[Long])
lastTimerState = getRuntimeContext
.getState(timestampDescriptor)
}
override def processElement(
reading: SensorReading,
ctx: KeyedProcessFunction
[String, SensorReading, (String, Double, Double)]#Context,
out: Collector[(String, Double, Double)]): Unit = {
// compute timestamp of new clean up timer
// as record timestamp + one hour
val newTimer = ctx.timestamp() + (3600 * 1000)
// get timestamp of current timer
val curTimer = lastTimerState.value()
// delete previous timer and register new timer
ctx.timerService().deleteEventTimeTimer(curTimer)
ctx.timerService().registerEventTimeTimer(newTimer)
// update timer timestamp state
lastTimerState.update(newTimer)
// fetch the last temperature from state
val lastTemp = lastTempState.value()
// check if we need to emit an alert
val tempDiff = (reading.temperature - lastTemp).abs
if (tempDiff > threshold) {
// temperature increased by more than the threshold
out.collect((reading.id, reading.temperature, tempDiff))
}
// update lastTemp state
this.lastTempState.update(reading.temperature)
}
override def onTimer(
timestamp: Long,
ctx: KeyedProcessFunction[String,
SensorReading, (String, Double, Double)]#OnTimerContext,
out: Collector[(String, Double, Double)]): Unit = {
// clear all state for the key
lastTempState.clear()
lastTimerState.clear()
}
}