功能介紹
Flink1.9 新添加的功能,其能夠幫助使用者直接通路Flink中存儲的State,API能夠幫助使用者非常友善地讀取、修改甚至重建整個State。這個功能的強大之處在于幾個方面,第一個就是靈活地讀取外部的資料,比如從一個資料庫中讀取自主地建構Savepoint,解決作業冷啟動問題,這樣就不用從N天前開始重跑整個資料
可以使用的場景
- 異步校驗或者檢視某個階段的狀态,一般而言,flink作業的最終結果都會持久化輸出,但在面臨問題的時候,如何确定哪一級出現問題,state processor api也提供了一種可能,去檢驗state中的資料是否與預期的一緻。
- 髒資料訂正,比如有一條髒資料污染了State,就可以用State Processor API對于狀态進行修複和訂正。
- 狀态遷移,當使用者修改了作業邏輯,還想要複用原來作業中大部分的State,或者想要更新這個State的結構就可以用這個API來完成相應的工作。
- 解決作業冷啟動問題,這樣就不用從N天前開始重跑整個資料。
一些限制點
- window state暫時修改不了
- 每個有狀态的算子都必須手動指定uid
- 無法通過讀取savepoint 直接擷取到metadata 資訊(existing operator ids)
關聯的知識點
State 分為: 1: Operator States 2: Keyed States
在讀取state的時候需要根據對應的類型選擇不同的讀取方式
Operator States | Keyed States |
---|---|
readListState | readKeyedState |
readUnionState | |
readBroadcastState |
基于batch 熱加載資料生成Savepoint 和 Savepoint state 修改
最後會給出對應的兩個demo。
基本流程兩者比較類似
- 基于batch 熱加載資料
1: batch讀取資料 --> Dataset (比如讀取文本檔案) 2: 編寫業務邏輯處理資料 --> 擷取轉換後的DataSet(處理文本生成一個Tuple2<key, num> 3: 将資料結果轉換為state --> KeyedStateBootstrapFunction 4: 生成外部Savepoint(注意對uid的指定和StateBackend 類型的選擇)
- Savepoint state 修改
1: 調用Savepoint.load 加載目前已經存在的Savepoint(注意StateBackend 必須和之前生成的任務一緻) 2: 調用 savepoint.readKeyedState 讀取擷取到的ExistingSavepoint,結果是一個DataSet資料集 3:編寫Batch 業務邏輯調整生成的DataSet(比如删除某個元素),其結果還算一個DataSet 4: 自定義 KeyedStateBootstrapFunction 将資料結果轉換為state 5: 生成外部Savepoint(注意對uid的指定和StateBackend 類型的選擇)
基于batch 重新建構stream樣例
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//擷取外部離線資料源
DataSource<String> textSource = env.readTextFile("D:\\sources\\data.txt");
DataSet<Tuple2<String, Integer>> sourceDataSet = textSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] strArr = value.split(",");
for (String str : strArr) {
Tuple2<String, Integer> worldTuple = new Tuple2<>(str, 1);
out.collect(worldTuple);
}
}
});
//計算出需要的曆史狀态
DataSet<ReadAndModifyState.KeyedValueState> dataSet = sourceDataSet
.groupBy(0)
.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, ReadAndModifyState.KeyedValueState>() {
@Override
public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<ReadAndModifyState.KeyedValueState> out) throws Exception {
Iterator iterator = values.iterator();
Long countNum = 0L;
String worldkey = null;
while(iterator.hasNext()){
Tuple2<String, Integer> info = (Tuple2<String, Integer>) iterator.next();
if(worldkey == null){
worldkey = info.f0;
}
countNum++;
}
ReadAndModifyState.KeyedValueState keyedValueState = new ReadAndModifyState.KeyedValueState();
keyedValueState.key = new Tuple1<>(worldkey);
keyedValueState.countNum = countNum;
out.collect(keyedValueState);
}
});
//将曆史狀态轉換為state 并轉換為savepoint 寫入hdfs上
BootstrapTransformation<ReadAndModifyState.KeyedValueState> transformation = OperatorTransformation
.bootstrapWith(dataSet)
.keyBy(new KeySelector<ReadAndModifyState.KeyedValueState, Tuple1<String>>() {
@Override
public Tuple1<String> getKey(ReadAndModifyState.KeyedValueState value) throws Exception {
return value.key;
}
})
.transform(new ReadAndModifyState.KeyedValueStateBootstrapper());
String uid = "keyby_summarize";
String savePointPath = "hdfs://ns1/user/xc/savepoint-from-batch";
StateBackend rocksDBBackEnd = new RocksDBStateBackend("hdfs://ns1/user/xc");
Savepoint.create(rocksDBBackEnd, 128)
.withOperator(uid, transformation)
.write(savePointPath);
env.execute("batch build save point");
System.out.println("-------end------------");
}
讀取和修改樣例
public static void main(String[] args) throws Exception {
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
String savePointPath = "hdfs://ns1/user/xc/savepoint-61b8e1-bbee958b3087";
StateBackend rocksDBBackEnd = new RocksDBStateBackend("hdfs://ns1/user/xc");
ExistingSavepoint savepoint = Savepoint.load(bEnv, savePointPath, rocksDBBackEnd);
//讀取
String uid = "keyby_summarize";
DataSet<KeyedValueState> keyState = savepoint.readKeyedState(uid, new StateReaderFunc());
//修改
DataSet<KeyedValueState> dataSet = keyState.flatMap((FlatMapFunction<KeyedValueState, KeyedValueState>) (value, out) -> {
value.countNum = value.countNum * 2;
out.collect(value);
}).returns(KeyedValueState.class);
BootstrapTransformation<KeyedValueState> transformation = OperatorTransformation
.bootstrapWith(dataSet)
//注意keyby操作的key一定要和原來的相同
.keyBy(new KeySelector<KeyedValueState, Tuple1<String>>() {
@Override
public Tuple1<String> getKey(KeyedValueState value) throws Exception {
return value.key;
}
})
.transform(new KeyedValueStateBootstrapper());
Savepoint.create(rocksDBBackEnd, 128)
.withOperator(uid, transformation)
.write("hdfs://ns1/user/xc/savepoint-after-modify3");
bEnv.execute("read the list state");
System.out.println("-----end------------");
}
public static class StateReaderFunc extends KeyedStateReaderFunction<Tuple1<String>, KeyedValueState> {
private static final long serialVersionUID = -3616180524951046897L;
private transient ValueState<Long> state;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor currentCountDescriptor = new ValueStateDescriptor("currentCountState", Long.class);
state = getRuntimeContext().getState(currentCountDescriptor);
}
@Override
public void readKey(Tuple1<String> key, Context ctx, Collector<KeyedValueState> out) throws Exception {
System.out.println(key.f0 +":" + state.value());
KeyedValueState keyedValueState = new KeyedValueState();
keyedValueState.key = new Tuple1<>(key.f0);
keyedValueState.countNum = state.value();
out.collect(keyedValueState);
}
}
public static class KeyedValueState {
Tuple1<String> key;
Long countNum;
}
private static class KeyedValueStateBootstrapper extends KeyedStateBootstrapFunction<Tuple1<String>, KeyedValueState>{
private static final long serialVersionUID = 1893716139133502118L;
private ValueState<Long> currentCount = null;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor currentCountDescriptor = new ValueStateDescriptor("currentCountState", Long.class, 0L);
currentCount = getRuntimeContext().getState(currentCountDescriptor);
}
@Override
public void processElement(KeyedValueState value, Context ctx) throws Exception {
currentCount.update(value.countNum);
}
}