一,在需要使用Flink,需要使用state過期的時候,沒百度到怎麼使用代碼,我隻記得自己以前寫過一篇,還好翻出來了。
位址連結:https://blog.csdn.net/qq_31866793/article/details/102947285
二,目前Flink版本更新了,代碼可能發生了一點點變化
如下所示:
1,啟動過期清理
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>(
"mapState",
String.class,
Integer.class);
mapStateDescriptor.enableTimeToLive(ttlConfig);
testMap = getRuntimeContext().getMapState(mapStateDescriptor);
2,完整快照清理
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.seconds(1))
.cleanupFullSnapshot()
.build();
MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>(
"mapState",
String.class,
Integer.class);
mapStateDescriptor.enableTimeToLive(ttlConfig);
testMap = getRuntimeContext().getMapState(mapStateDescriptor);
3,使用最多的 增量清理,可能有的同學還不是很明白增量清理
//todo 設定TTL,适用于增量TTL
/* StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.seconds(1))
.cleanupIncrementally(100,true) //第一個是每個清除觸發的已檢查狀态條目數。它總是在每個狀态通路時觸發
.build();*/
我們跳轉到Flink的首頁面:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/stream/state/state.html
不明白不要緊,開始使用的時候配合checkpoints 的rocksDb 增量使用,代碼如下開啟增量就好了 。
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.of(30L, TimeUnit.SECONDS)));
env.enableCheckpointing(5 * 60 * 1000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000L);
env.getCheckpointConfig().setCheckpointTimeout(2 * 60 * 1000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new RocksDBStateBackend(checkpointDir, true));
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
4,Flink TTL 簡單代碼實踐:
package flink.dev;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.util.Collector;
import java.util.Random;
/**
* @program: flink
* @description:
* @author: Mr.Wang
* @create: 2021-01-18 15:42
**/
public class TtlDemoTest {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// env.setRestartStrategy(RestartStrategies.noRestart());
//todo 這裡需要按業務需求分類,可以多送出幾個任務
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tableEnvironment = TableEnvironment.create(bsSettings);
Configuration configuration = tableEnvironment.getConfig().getConfiguration();
DataStreamSource<Integer> dataSourceStream = env.addSource(new SourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
Random random = new Random();
while (true) {
int i = random.nextInt(3);
Thread.sleep(5000);
ctx.collect(i);
}
}
@Override
public void cancel() {
}
});
dataSourceStream.keyBy(new KeySelector<Integer, String>() {
@Override
public String getKey(Integer key) throws Exception {
return "aaa";
}
}).flatMap(new RichFlatMapFunction<Integer, String>() {
private transient MapState<String, Integer> testMap;
private final int isExists = 0;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//todo 測試TTL
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
StateTtlConfig ttlConfig2 = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.seconds(1))
.cleanupFullSnapshot()
.build();
MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>(
"mapState",
String.class,
Integer.class);
mapStateDescriptor.enableTimeToLive(ttlConfig);
testMap = getRuntimeContext().getMapState(mapStateDescriptor);
}
@Override
public void flatMap(Integer input, Collector<String> out) throws Exception {
String key = "key_"+input;
if (testMap.contains(key)){
System.out.println("---存在key,key="+key);
}else {
System.out.println("===不存在key,key="+key);
testMap.put(key,input);
}
}
});
try {
env.execute("");
} catch (Exception e) {
e.printStackTrace();
}
}
}
結果是:
分析: 我們TTL設定為1秒,資料源生産為5秒,是以每次都會過期。 我們把參數改變一下,過期未5秒,生産為1秒生産一個資料,如下圖。