天天看點

踩坑記| flink state 序列化 java enum 竟然岔劈了1.序篇-先說結論2.踩坑場景篇-這個坑是啥樣的3.問題排查篇-坑的排查過程4.問題原了解析篇-導緻問題的機制是什麼5.避坑篇-如何避免這種問題6.總結篇

1.序篇-先說結論

本文主要記錄部落客在生産環境中踩的 flink 針對 java enum serde 時的坑。

結論:在 flink 程式中,如果狀态中有存儲 java enum,那麼添加或者删除 enum 中的一個枚舉值時,就有可能導緻狀态恢複異常,這裡的異常可能不是在恢複過程中會實際抛出一個異常,而是有可能是 enum A 的值恢複給 enum B。

我從以下幾個章節說明、解決這個問題,希望能抛磚引玉,帶給大家一些啟發。

  1. 踩坑場景篇-這個坑是啥樣的
  2. 問題排查篇-坑的排查過程
  3. 問題原了解析篇-導緻問題的機制是什麼
  4. 避坑篇-如何避免這種問題
  5. 總結篇

2.踩坑場景篇-這個坑是啥樣的

對任務做一個簡單的過濾條件修改,任務重新上線之後,從 flink web ui 确認是從 savepoint 重新開機成功了,但是實際最終産出的資料上來看卻像是沒有從 savepoint 重新開機。

邏輯就是計算分次元的當天累計 pv。代碼很簡單,在後面會貼出來。

如下圖:

踩坑記| flink state 序列化 java enum 竟然岔劈了1.序篇-先說結論2.踩坑場景篇-這個坑是啥樣的3.問題排查篇-坑的排查過程4.問題原了解析篇-導緻問題的機制是什麼5.避坑篇-如何避免這種問題6.總結篇

2

在 00:04 分重新開機時出現了當天累計 pv 出現了從零累計的情況。

但是預期正常的曲線應該張下面這樣。

踩坑記| flink state 序列化 java enum 竟然岔劈了1.序篇-先說結論2.踩坑場景篇-這個坑是啥樣的3.問題排查篇-坑的排查過程4.問題原了解析篇-導緻問題的機制是什麼5.避坑篇-如何避免這種問題6.總結篇

1

任務是使用 DataStream 編寫(基于 flink 1.13.1)。

public class SenerioTest {
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setParallelism(1);

        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        env.addSource(new SourceFunction<SourceModel>() {

            private volatile boolean isCancel = false;

            @Override
            public void run(SourceContext<SourceModel> ctx) throws Exception {
                // 資料源
            }

            @Override
            public void cancel() {
                this.isCancel = true;
            }
        })
        .keyBy(new KeySelector<SourceModel, Long>() {
            @Override
            public Long getKey(SourceModel value) throws Exception {
                return value.getUserId() % 1000;
            }
        })
        .timeWindow(Time.minutes(1))
        .aggregate(
            new AggregateFunction<SourceModel, Map<Tuple2<DimNameEnum, String>, Long>, Map<Tuple2<DimNameEnum, String>, Long>>() {

                @Override
                public Map<Tuple2<DimNameEnum, String>, Long> createAccumulator() {
                    return new HashMap<>();
                }

                @Override
                public Map<Tuple2<DimNameEnum, String>, Long> add(SourceModel value,
                        Map<Tuple2<DimNameEnum, String>, Long> accumulator) {

                    Lists.newArrayList(Tuple2.of(DimNameEnum.province, value.getProvince())
                            , Tuple2.of(DimNameEnum.age, value.getAge())
                            , Tuple2.of(DimNameEnum.sex, value.getSex()))
                            .forEach(t -> {
                                Long l = accumulator.get(t);

                                if (null == l) {
                                    accumulator.put(t, 1L);
                                } else {
                                    accumulator.put(t, l + 1);
                                }
                            });

                    return accumulator;
                }

                @Override
                public Map<Tuple2<DimNameEnum, String>, Long> getResult(
                        Map<Tuple2<DimNameEnum, String>, Long> accumulator) {
                    return accumulator;
                }

                @Override
                public Map<Tuple2<DimNameEnum, String>, Long> merge(
                        Map<Tuple2<DimNameEnum, String>, Long> a,
                        Map<Tuple2<DimNameEnum, String>, Long> b) {
                    return null;
                }
            },
            new ProcessWindowFunction<Map<Tuple2<DimNameEnum, String>, Long>, SinkModel, Long, TimeWindow>() {

                private transient ValueState<Map<Tuple2<DimNameEnum, String>, Long>> todayPv;

                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    this.todayPv = getRuntimeContext().getState(new ValueStateDescriptor<Map<Tuple2<DimNameEnum, String>, Long>>(
                            "todayPv", TypeInformation.of(
                            new TypeHint<Map<Tuple2<DimNameEnum, String>, Long>>() {
                            })));
                }

                @Override
                public void process(Long aLong, Context context,
                        Iterable<Map<Tuple2<DimNameEnum, String>, Long>> elements, Collector<SinkModel> out)
                        throws Exception {
                    // 将 elements 資料 merge 到 todayPv 中
                    // 每天零點将 state 清空重新累計
                    // 然後 out#collect 出去即可
                }
            });

        env.execute();
    }

    @Data
    @Builder
    private static class SourceModel {
        private long userId;
        private String province;
        private String age;
        private String sex;
        private long timestamp;
    }


    @Data
    @Builder
    private static class SinkModel {
        private String dimName;
        private String dimValue;
        private long timestamp;
    }

    enum DimNameEnum {
        province,
        age,
        sex,
        ;
    }

}
           

複制

3.問題排查篇-坑的排查過程

3.1.愚蠢的懷疑引擎

首先懷疑是狀态沒有正常恢複。

但是檢視 flink web ui 以及 tm 日志,都顯示是從 savepoint 正常恢複了。

還懷疑是不是出現了 flink web ui 展示的内容和實際的執行不一緻的情況。

但是發現任務的 ck 大小是正常的,複合預期的。

3.2.老老實實打 log 吧

既然能從 savepoint 正常恢複,那麼就把狀态值用 log 打出來看看到底發生了什麼事情呗。

如下列代碼,在

ProcessWindowFunction

中加上 log 日志。

this.todayPv.value()
    .forEach(new BiConsumer<Tuple2<DimNameEnum, String>, Long>() {
        @Override
        public void accept(Tuple2<DimNameEnum, String> k,
                Long v) {
            log.info("key 值:{},value 值:{}", k.toString(), v);
        }
    });
           

複制

發現結果如下:

...
key 值:(uv_type,男),value 值:1000
...
           

複制

發現狀态中存儲的

DimNameEnum.province

DimNameEnum.age

的資料都是正确的,但是缺缺少了

DimNameEnum.sex

,多了

(uv_type,男)

這樣的資料,于是檢視代碼,發現之前多加了一種枚舉類型

DimNameEnum.uv_type

。代碼如下:

enum DimNameEnum {
    province,
    age,
    uv_type,
    sex,
    ;
}
           

複制

于是懷疑 flink 針對枚舉值的 serde 不是按照枚舉值名稱來進行比對的,而是按照枚舉值下标來進行比對的。是以就出現了

DimNameEnum.uv_type

DimNameEnum.sex

的位置占了的情況。

4.問題原了解析篇-導緻問題的機制是什麼

來看看源碼吧。

測試代碼如下:

public class EnumsStateTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setParallelism(1);

        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        TypeInformation<StateTestEnums> t = TypeInformation.of(StateTestEnums.class);

        EnumSerializer<StateTestEnums> e = (EnumSerializer<StateTestEnums>) t.createSerializer(env.getConfig());

        DataOutputSerializer d = new DataOutputSerializer(10000);

        e.serialize(StateTestEnums.A, d);

        env.execute();
    }

    enum StateTestEnums {
        A,
        B,
        C
        ;
    }
}
           

複制

debug 結果如下:

首先看看對應的

TypeInformation

TypeSerializer

踩坑記| flink state 序列化 java enum 竟然岔劈了1.序篇-先說結論2.踩坑場景篇-這個坑是啥樣的3.問題排查篇-坑的排查過程4.問題原了解析篇-導緻問題的機制是什麼5.避坑篇-如何避免這種問題6.總結篇

3

發現 enum 類型的序列化器是

EnumSerializer

, 看看

EnumSerializer

的 serde 實作,如圖所示:

踩坑記| flink state 序列化 java enum 竟然岔劈了1.序篇-先說結論2.踩坑場景篇-這個坑是啥樣的3.問題排查篇-坑的排查過程4.問題原了解析篇-導緻問題的機制是什麼5.避坑篇-如何避免這種問題6.總結篇

4

最關鍵的兩個變量:

  1. 序列化時用

    valueToOrdinal

  2. 反序列化時用

    values

進而印證了上面的說法。flink enum 序列化時使用的是枚舉值下标進行 serde,是以一旦枚舉值順序發生改變,或者添加、删除一個枚舉值,就會導緻其他枚舉值的下标出現錯位的情況。進而導緻資料錯誤。

5.避坑篇-如何避免這種問題

5.1.枚舉解決

在上述場景中,如果又想要把新枚舉值加進去,又需要狀态能夠正常恢複,正常産出資料。

那麼可以把新的枚舉值在尾部添加,比如下面這樣。

enum DimNameEnum {
    province,
    age,
    sex,
    uv_type, // 添加在尾部
    ;
}
           

複制

5.2.非枚舉解決

還有一種方法如标題,就是别用枚舉值,直接用 string 就 vans 了。

6.總結篇

本文主要介紹了 flink 枚舉值 serde 中的坑,當在 enum 中添加删除枚舉值時,就有可能導緻狀态岔劈。随後給出了原因是由于 enum serde 器的實作導緻的這種情況,最後給出了解決方案。