天天看点

Flink小知识--State Processor API的简单讲解(2) State的Writer

在上一章节中,我简单介绍了State 的读取操作

Flink小知识–State Processor API的简单讲解(1) State的读取

本章节将重点简述下 state 的写以及修改,主要以 Keyed State为例

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/

1. Writing New Savepoints

基于上一期的key state 案例,本期生成的state 数据结构将与上一期一致

  • Keyed State writer
{
		ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
        
        String oldsavepointPath ="file:///D:\\IDEAspaces\\bigdata_study\\bigdata_flink\\data\\checkpoint\\ce4e7457dfcd7bf92d046a0e70b4a992\\chk-1";
        String savepointPath ="file:///D:\\IDEAspaces\\bigdata_study\\bigdata_flink\\data\\checkpoint\\ce4e7457dfcd7bf92d046a0e70b4a992\\chk-3";
        int maxParallelism = 128;
        
		//测试离线数据demo
        DataSource<Tuple2<String,Integer>> fromCollection = bEnv.fromCollection(Arrays.asList(
                    Tuple2.of("a", 10),
                    Tuple2.of("b", 10),
                    Tuple2.of("d", 10)
                )
            );
            
		//依据离线数据获得最终 key 结果 
        BootstrapTransformation<Tuple2<String, Integer>> transform = OperatorTransformation
                .bootstrapWith(fromCollection)
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
                .transform(new keyBootstrapper());
                
         //创建新的savepoint 用于追数 注意并行度的选择
        Savepoint.create(new MemoryStateBackend(),maxParallelism)
                .withOperator("key_uid", transform)
                .write(savepointPath);

        //获取老的state 数据,加上 新增的 算子 uid 组成一个新的savepoint
//        Savepoint.load(bEnv,oldsavepointPath,new MemoryStateBackend())
//                .withOperator("key_uid2", transform)
//                .write(savepointPath);

        bEnv.execute();
}
   ​// 以下代码逻辑与执行作业中的 代码逻辑一致
    public static class keyBootstrapper extends KeyedStateBootstrapFunction<String, Tuple2<String,Integer>> {
        ValueState<Integer> state;
        ListState<Long> updateTimes;

        @Override
        public void open(Configuration parameters) {
            ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
            state = getRuntimeContext().getState(stateDescriptor);

            ListStateDescriptor<Long> updateDescriptor = new ListStateDescriptor<>("times", Types.LONG);
            updateTimes = getRuntimeContext().getListState(updateDescriptor);
        }

        @Override
        public void processElement(Tuple2<String,Integer>  value, Context ctx) throws Exception {
            state.update(value.f1 + 1);
            updateTimes.add(System.currentTimeMillis());
              //可以注册processtime eventtime timer
//            ctx.timerService().registerEventTimeTimer(1000);
        }
    }
           
  • 读取结果验证结果集

在savepointPath 目录中新生成_metaata

KeyedStates{key='a', value=11, times=[1626158781896]}
KeyedStates{key='d', value=11, times=[1626158781896]}
KeyedStates{key='b', value=11, times=[1626158781896]}
           

2. Modifying Savepoints

获取老的state 数据,加上 新增的 算子 uid 组成一个新的savepoint

//获取老的state 数据,加上 新增的 算子 uid 组成一个新的savepoint
        Savepoint.load(bEnv,oldsavepointPath,new MemoryStateBackend())
                .withOperator("key_uid2", transform)
                .write(savepointPath);
           
  • 结果
// uid key_uid
KeyedStates{key='a', value=2, times=[1626157285704]}
KeyedStates{key='d', value=2, times=[1626157292202]}
KeyedStates{key='c', value=2, times=[1626157289201]}
 // uid key_uid2
KeyedStates{key='a', value=11, times=[1626158781896]}
KeyedStates{key='d', value=11, times=[1626158781896]}
KeyedStates{key='b', value=11, times=[1626158781896]}
           
Flink小知识--State Processor API的简单讲解(2) State的Writer

继续阅读