天天看點

Flink ttl 代碼實踐

一,在需要使用Flink,需要使用state過期的時候,沒百度到怎麼使用代碼,我隻記得自己以前寫過一篇,還好翻出來了。

位址連結:https://blog.csdn.net/qq_31866793/article/details/102947285

二,目前Flink版本更新了,代碼可能發生了一點點變化

如下所示:

1,啟動過期清理

StateTtlConfig ttlConfig = StateTtlConfig
                        .newBuilder(org.apache.flink.api.common.time.Time.seconds(1))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                        .build();
                MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>(
                        "mapState",
                        String.class,
                        Integer.class);

                mapStateDescriptor.enableTimeToLive(ttlConfig);
                testMap = getRuntimeContext().getMapState(mapStateDescriptor);
           

2,完整快照清理

StateTtlConfig ttlConfig = StateTtlConfig
                        .newBuilder(org.apache.flink.api.common.time.Time.seconds(1))
                        .cleanupFullSnapshot()
                        .build();
MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>(
                        "mapState",
                        String.class,
                        Integer.class);

                mapStateDescriptor.enableTimeToLive(ttlConfig);
                testMap = getRuntimeContext().getMapState(mapStateDescriptor);
           

3,使用最多的 增量清理,可能有的同學還不是很明白增量清理

//todo 設定TTL,适用于增量TTL
       /* StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(org.apache.flink.api.common.time.Time.seconds(1))
                .cleanupIncrementally(100,true) //第一個是每個清除觸發的已檢查狀态條目數。它總是在每個狀态通路時觸發
                .build();*/
           

我們跳轉到Flink的首頁面:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/stream/state/state.html

Flink ttl 代碼實踐

不明白不要緊,開始使用的時候配合checkpoints 的rocksDb 增量使用,代碼如下開啟增量就好了 。

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.of(30L, TimeUnit.SECONDS)));
        env.enableCheckpointing(5 * 60 * 1000L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000L);
        env.getCheckpointConfig().setCheckpointTimeout(2 * 60 * 1000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.setStateBackend(new RocksDBStateBackend(checkpointDir, true));
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
           

4,Flink TTL 簡單代碼實踐:

package flink.dev;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.util.Collector;

import java.util.Random;

/**
 * @program: flink
 * @description:
 * @author: Mr.Wang
 * @create: 2021-01-18 15:42
 **/
public class TtlDemoTest {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//            env.setRestartStrategy(RestartStrategies.noRestart());
        //todo 這裡需要按業務需求分類,可以多送出幾個任務

        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        TableEnvironment tableEnvironment = TableEnvironment.create(bsSettings);
        Configuration configuration = tableEnvironment.getConfig().getConfiguration();

        DataStreamSource<Integer> dataSourceStream = env.addSource(new SourceFunction<Integer>() {
            @Override
            public void run(SourceContext<Integer> ctx) throws Exception {
                Random random = new Random();
                while (true) {
                    int i = random.nextInt(3);
                    Thread.sleep(5000);
                    ctx.collect(i);
                }
            }

            @Override
            public void cancel() {

            }
        });


        dataSourceStream.keyBy(new KeySelector<Integer, String>() {
            @Override
            public String getKey(Integer key) throws Exception {
                return "aaa";
            }
        }).flatMap(new RichFlatMapFunction<Integer, String>() {
            private transient MapState<String, Integer> testMap;
            private final int isExists = 0;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);

                //todo 測試TTL
                StateTtlConfig ttlConfig = StateTtlConfig
                        .newBuilder(org.apache.flink.api.common.time.Time.seconds(1))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                        .build();

                StateTtlConfig ttlConfig2 = StateTtlConfig
                        .newBuilder(org.apache.flink.api.common.time.Time.seconds(1))
                        .cleanupFullSnapshot()
                        .build();
                MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>(
                        "mapState",
                        String.class,
                        Integer.class);

                mapStateDescriptor.enableTimeToLive(ttlConfig);
                testMap = getRuntimeContext().getMapState(mapStateDescriptor);
            }

            @Override
            public void flatMap(Integer input, Collector<String> out) throws Exception {
                String key =  "key_"+input;
                if (testMap.contains(key)){
                    System.out.println("---存在key,key="+key);
                }else {
                    System.out.println("===不存在key,key="+key);
                    testMap.put(key,input);
                }

            }
        });
        try {
            env.execute("");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
           

結果是:

Flink ttl 代碼實踐

分析: 我們TTL設定為1秒,資料源生産為5秒,是以每次都會過期。 我們把參數改變一下,過期未5秒,生産為1秒生産一個資料,如下圖。

Flink ttl 代碼實踐

繼續閱讀