1.序篇-先說結論
本文主要記錄部落客在生産環境中踩的 flink 針對 java enum serde 時的坑。
結論:在 flink 程式中,如果狀态中有存儲 java enum,那麼添加或者删除 enum 中的一個枚舉值時,就有可能導緻狀态恢複異常,這裡的異常可能不是在恢複過程中會實際抛出一個異常,而是有可能是 enum A 的值恢複給 enum B。
我從以下幾個章節說明、解決這個問題,希望能抛磚引玉,帶給大家一些啟發。
- 踩坑場景篇-這個坑是啥樣的
- 問題排查篇-坑的排查過程
- 問題原了解析篇-導緻問題的機制是什麼
- 避坑篇-如何避免這種問題
- 總結篇
2.踩坑場景篇-這個坑是啥樣的
對任務做一個簡單的過濾條件修改,任務重新上線之後,從 flink web ui 确認是從 savepoint 重新開機成功了,但是實際最終産出的資料上來看卻像是沒有從 savepoint 重新開機。
邏輯就是計算分次元的當天累計 pv。代碼很簡單,在後面會貼出來。
如下圖:

2
在 00:04 分重新開機時出現了當天累計 pv 出現了從零累計的情況。
但是預期正常的曲線應該張下面這樣。
1
任務是使用 DataStream 編寫(基于 flink 1.13.1)。
public class SenerioTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.addSource(new SourceFunction<SourceModel>() {
private volatile boolean isCancel = false;
@Override
public void run(SourceContext<SourceModel> ctx) throws Exception {
// 資料源
}
@Override
public void cancel() {
this.isCancel = true;
}
})
.keyBy(new KeySelector<SourceModel, Long>() {
@Override
public Long getKey(SourceModel value) throws Exception {
return value.getUserId() % 1000;
}
})
.timeWindow(Time.minutes(1))
.aggregate(
new AggregateFunction<SourceModel, Map<Tuple2<DimNameEnum, String>, Long>, Map<Tuple2<DimNameEnum, String>, Long>>() {
@Override
public Map<Tuple2<DimNameEnum, String>, Long> createAccumulator() {
return new HashMap<>();
}
@Override
public Map<Tuple2<DimNameEnum, String>, Long> add(SourceModel value,
Map<Tuple2<DimNameEnum, String>, Long> accumulator) {
Lists.newArrayList(Tuple2.of(DimNameEnum.province, value.getProvince())
, Tuple2.of(DimNameEnum.age, value.getAge())
, Tuple2.of(DimNameEnum.sex, value.getSex()))
.forEach(t -> {
Long l = accumulator.get(t);
if (null == l) {
accumulator.put(t, 1L);
} else {
accumulator.put(t, l + 1);
}
});
return accumulator;
}
@Override
public Map<Tuple2<DimNameEnum, String>, Long> getResult(
Map<Tuple2<DimNameEnum, String>, Long> accumulator) {
return accumulator;
}
@Override
public Map<Tuple2<DimNameEnum, String>, Long> merge(
Map<Tuple2<DimNameEnum, String>, Long> a,
Map<Tuple2<DimNameEnum, String>, Long> b) {
return null;
}
},
new ProcessWindowFunction<Map<Tuple2<DimNameEnum, String>, Long>, SinkModel, Long, TimeWindow>() {
private transient ValueState<Map<Tuple2<DimNameEnum, String>, Long>> todayPv;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.todayPv = getRuntimeContext().getState(new ValueStateDescriptor<Map<Tuple2<DimNameEnum, String>, Long>>(
"todayPv", TypeInformation.of(
new TypeHint<Map<Tuple2<DimNameEnum, String>, Long>>() {
})));
}
@Override
public void process(Long aLong, Context context,
Iterable<Map<Tuple2<DimNameEnum, String>, Long>> elements, Collector<SinkModel> out)
throws Exception {
// 将 elements 資料 merge 到 todayPv 中
// 每天零點将 state 清空重新累計
// 然後 out#collect 出去即可
}
});
env.execute();
}
@Data
@Builder
private static class SourceModel {
private long userId;
private String province;
private String age;
private String sex;
private long timestamp;
}
@Data
@Builder
private static class SinkModel {
private String dimName;
private String dimValue;
private long timestamp;
}
enum DimNameEnum {
province,
age,
sex,
;
}
}
複制
3.問題排查篇-坑的排查過程
3.1.愚蠢的懷疑引擎
首先懷疑是狀态沒有正常恢複。
但是檢視 flink web ui 以及 tm 日志,都顯示是從 savepoint 正常恢複了。
還懷疑是不是出現了 flink web ui 展示的内容和實際的執行不一緻的情況。
但是發現任務的 ck 大小是正常的,複合預期的。
3.2.老老實實打 log 吧
既然能從 savepoint 正常恢複,那麼就把狀态值用 log 打出來看看到底發生了什麼事情呗。
如下列代碼,在
ProcessWindowFunction
中加上 log 日志。
this.todayPv.value()
.forEach(new BiConsumer<Tuple2<DimNameEnum, String>, Long>() {
@Override
public void accept(Tuple2<DimNameEnum, String> k,
Long v) {
log.info("key 值:{},value 值:{}", k.toString(), v);
}
});
複制
發現結果如下:
...
key 值:(uv_type,男),value 值:1000
...
複制
發現狀态中存儲的
DimNameEnum.province
,
DimNameEnum.age
的資料都是正确的,但是缺缺少了
DimNameEnum.sex
,多了
(uv_type,男)
這樣的資料,于是檢視代碼,發現之前多加了一種枚舉類型
DimNameEnum.uv_type
。代碼如下:
enum DimNameEnum {
province,
age,
uv_type,
sex,
;
}
複制
于是懷疑 flink 針對枚舉值的 serde 不是按照枚舉值名稱來進行比對的,而是按照枚舉值下标來進行比對的。是以就出現了
DimNameEnum.uv_type
将
DimNameEnum.sex
的位置占了的情況。
4.問題原了解析篇-導緻問題的機制是什麼
來看看源碼吧。
測試代碼如下:
public class EnumsStateTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
TypeInformation<StateTestEnums> t = TypeInformation.of(StateTestEnums.class);
EnumSerializer<StateTestEnums> e = (EnumSerializer<StateTestEnums>) t.createSerializer(env.getConfig());
DataOutputSerializer d = new DataOutputSerializer(10000);
e.serialize(StateTestEnums.A, d);
env.execute();
}
enum StateTestEnums {
A,
B,
C
;
}
}
複制
debug 結果如下:
首先看看對應的
TypeInformation
和
TypeSerializer
。
3
發現 enum 類型的序列化器是
EnumSerializer
, 看看
EnumSerializer
的 serde 實作,如圖所示:
4
最關鍵的兩個變量:
- 序列化時用
valueToOrdinal
- 反序列化時用
values
進而印證了上面的說法。flink enum 序列化時使用的是枚舉值下标進行 serde,是以一旦枚舉值順序發生改變,或者添加、删除一個枚舉值,就會導緻其他枚舉值的下标出現錯位的情況。進而導緻資料錯誤。
5.避坑篇-如何避免這種問題
5.1.枚舉解決
在上述場景中,如果又想要把新枚舉值加進去,又需要狀态能夠正常恢複,正常産出資料。
那麼可以把新的枚舉值在尾部添加,比如下面這樣。
enum DimNameEnum {
province,
age,
sex,
uv_type, // 添加在尾部
;
}
複制
5.2.非枚舉解決
還有一種方法如标題,就是别用枚舉值,直接用 string 就 vans 了。
6.總結篇
本文主要介紹了 flink 枚舉值 serde 中的坑,當在 enum 中添加删除枚舉值時,就有可能導緻狀态岔劈。随後給出了原因是由于 enum serde 器的實作導緻的這種情況,最後給出了解決方案。