在flink流式计算中,需要动态的更新配置,而无需重启作业进程。
通过mysql binlog机制,可以实现配置的变化并可以捕获这一变化;
通过flink的broadcast机制,可以将这一动态变化广播到业务流,并进行相应的逻辑处理,最终实现配置的动态更新。
下面写一个简单的demo,仅供平时学习积累使用。
CDC
CDC全称Change Data Capture,变动数据捕获。它的核心思想是,监测并捕获数据库的变动,将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
mysql binlog
MySQL 可以通过 binlog记录数据库的变动事件,而 MySQL 本身可以借助 binlog 来实现主从复制。binglog存储成二进制文件,可以通过mysql-binlog-connector-java包提供的类进行解析。下面是一个demo。
1. 创建一张配置表。
CREATE TABLE tb_config(
`id` INT(11) NOT NULL AUTO_INCREMENT,
`source` VARCHAR(15),
`parameter` VARCHAR(15),
`operator` INT COMMENT "0是等于;是不等于;2是小于;3是大于",
`value` VARCHAR(15),
PRIMARY KEY (`id`)
);
2. 查看数据库中binlog信息,Log_name就是bin log文件名,FileSize是该文件的大小。
SHOW MASTER LOGS;
3. 通过BinaryLogClient解析binlog。
public static void main(String[] args) throws Exception {
final BinaryLogClient client =
new BinaryLogClient("127.0.0.1", 3306, "root", "root");
client.setBinlogFilename("LAPTOP-LH-bin.000038"); //指定binlog文件
client.setBinlogPosition(0); //指定从binlog的哪个位置开始读取
client.registerEventListener(new BinaryLogClient.EventListener() {
public void onEvent(Event event) {
System.out.println("===========" + client.getBinlogPosition() + "===========");
System.out.println("info=" + event.toString());
}
});
client.connect();
}
日志如下,此时如果对于数据库表进行增删改操作,binlog都会追加相应的信息。
===========2901===========
info=Event{header=EventHeaderV4{timestamp=1618712448000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=29, nextPosition=3016, flags=0}, data=WriteRowsEventData{tableId=1527, includedColumns={0, 1, 2}, rows=[
[uid, 0, 004]
]}}
===========3016===========
info=Event{header=EventHeaderV4{timestamp=1618712448000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=3047, flags=0}, data=XidEventData{xid=1794}}
配置动态更新Demo
数据源准备
测试数据来源于kafka的tb_data topic,配置数据源来源于mysql的tb_config表,最终根据配置,将处理后的测试数据,输出到控制台。
tb_data的数据格式是一个简单json:
{"uid": "001", "timestamp":"1618559580", "actionID":1, "source":"a"}
{"uid": "003", "timestamp":"1618559590", "actionID":3, "source":"a"}
{"uid": "003", "timestamp":"1618559690", "actionID":2, "source":"a"}
{"uid": "001", "timestamp":"1618559779", "actionID":2, "source":"a"}
tb_config表的数据样例如下,id是自增键值;source是用于和配置关联的资源类型;parameter是操作字段;operator是操作类型,是一个枚举值;value是操作值,即用于和数据进行比较的值。
第一条规则是过滤出uid为003的值,第二条规则是过滤出timestamp大于1618559580的值。这两条配置的关系是与,即当两个条件都满足的数据,才会输出。
完整Demo
个人原创,和业务无关,仅供学习积累
/**
* Created by LH on 2021/4/14 17:52
*/
public class BroadCastDemo {
public static void main(String[] args) throws Exception {
//1. 流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2. 执行环境配置
env.setParallelism(1);
//3. 配置kafka数据源DataStream
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "lh-node-2:9092");
kafkaProps.put("group.id", "flink_group_1");
kafkaProps.put("session.timeout.ms", "30000");
kafkaProps.put("auto.offset.reset", "earliest");
kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer<String> dataConsumer = new FlinkKafkaConsumer<String>("tb_data", new SimpleStringSchema(), kafkaProps);
DataStreamSource<String> kafkaDataSource = env.addSource(dataConsumer);
DataStream<Map<String, Object>> kafkaDS = kafkaDataSource.map(new MapFunction<String, Map<String, Object>>() {
@Override
public Map<String, Object> map(String s) throws Exception {
return JSONObject.parseObject(s, new TypeReference<Map<String, Object>>(){});
}
});
// 4.mysql配置源DataStream
DebeziumSourceFunction<MyConfig> mysqlSource = MySQLSource.<MyConfig>builder()
.hostname("127.0.0.1")
.port(3306)
.username("root")
.password("root")
.databaseList("flinkdb")
.tableList("flinkdb.tb_config")
.debeziumProperties(new Properties())
.deserializer(new MyDeserializationSchema())
.build();
MapStateDescriptor<String, MyConfig> configStateDescriptor =
new MapStateDescriptor<>(
"config-cdc",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<MyConfig>() {}));
BroadcastStream<MyConfig> configBroadCast = env.addSource(mysqlSource).broadcast(configStateDescriptor);
DataStream<String> outDataStream = kafkaDS
.connect(configBroadCast)
.process(new MyBroadcastProcessFunction());
kafkaDS.print("in");
outDataStream.print("out");
env.execute();
}
public static class MyDeserializationSchema implements DebeziumDeserializationSchema<MyConfig>{
@Override
public void deserialize(SourceRecord sourceRecord, Collector<MyConfig> collector) {
Struct struct = (Struct) sourceRecord.value(); //org.apache.kafka.connect.data.Struct
Struct source = struct.getStruct("source");
String db = source.getString("db");
String table = source.getString("table");
String op = struct.getString("op");
if (db.equals("flinkdb") && table.equals("tb_config")){
Struct conf = null;
int type = 0;
if (op.equals("c")){ //新增配置
conf = struct.getStruct("after");
type = ConfType.ADD.type;
} else if (op.equals("u")){ //更新配置
conf = struct.getStruct("after");
type = ConfType.UPDATE.type;
} else if (op.equals("d")){ //删除配置
conf = struct.getStruct("before");
type = ConfType.DELETE.type;
} else return; //跳过
int id = conf.getInt32("id");
String tb_source = conf.getString("source");
String parameter = conf.getString("parameter");
int operator = conf.getInt32("operator");
String value = conf.getString("value");
MyConfig myConfig = new MyConfig(id, type, tb_source, parameter, operator, value);
System.out.println("========collect config========");
System.out.println(myConfig);
collector.collect(myConfig);
}
}
@Override
public TypeInformation<MyConfig> getProducedType() {
return TypeInformation.of(MyConfig.class);
}
}
public static class MyBroadcastProcessFunction
extends BroadcastProcessFunction<Map<String, Object>, MyConfig, String> {
/**
* configStateDescriptor用于存储广播到的配置信息
* key是source名称,根据不同的source资源,存储一组不同的配置信息
* value是一组配置信息列表,key是配置信息的主键,也是唯一标识;value是配置信息。map中的配置关系是与关系。
*/
private MapStateDescriptor<String, Map<Integer, MyConfig>> configStateDescriptor =
new MapStateDescriptor<>(
"config-cdc",
BasicTypeInfo.STRING_TYPE_INFO, //source的类型,通过source匹配data和config
TypeInformation.of(new TypeHint<Map<Integer, MyConfig>>() {}));
@Override
public void processElement(Map<String, Object> dataMap, ReadOnlyContext readOnlyContext, Collector<String> collector)
throws Exception {
System.out.println("======processElement=======");
String source = dataMap.get("source").toString();
Map<Integer, MyConfig> configMap =
readOnlyContext.getBroadcastState(configStateDescriptor).get(source);
if (configMap != null && configMap.size() > 0){
//满足配置中的所有条件,才能输出,一旦不满足,就直接返回,不采集这条数据,即过滤掉了。
for (Map.Entry<Integer, MyConfig> configEntry: configMap.entrySet()){
MyConfig config = configEntry.getValue();
String parameter = config.parameter;
int operator = config.operator;
String value = config.value;
if (dataMap.containsKey(parameter)){
String dataValue = dataMap.get(parameter).toString();
if ((operator == Operator.EQUAL.operator && dataValue.equals(value))
|| (operator == Operator.NEQUAL.operator && !dataValue.equals(value))
|| (operator == Operator.SMALLER.operator && dataValue.compareTo(value) < 0)
|| (operator == Operator.BIGGER.operator && dataValue.compareTo(value) > 0)
) {
System.out.println("data: " + dataMap.toString() + " pass the inspection, conf: " + config);
} else {
System.out.println("data: " + dataMap.toString() + " didn't pass the inspection, conf: " + config);
return; //不满足其中一个条件,就直接返回了,不采集。
}
}
}
}
collector.collect(dataMap.toString());
}
@Override
public void processBroadcastElement(MyConfig myConfig, Context context, Collector<String> collector) throws Exception {
System.out.println("======processBroadcastElement=======");
String source = myConfig.getSource();
int confType = myConfig.getType();
int confID = myConfig.getId();
BroadcastState<String, Map<Integer, MyConfig>> confState = context.getBroadcastState(configStateDescriptor);
Map<Integer, MyConfig> confMap = confState.get(source);
if (confMap != null && confMap.size() > 0){
if (confType == ConfType.DELETE.type){
MyConfig oldConf = confMap.remove(confID); //如果配置更新的类型是删掉,那么删掉map中的配置
System.out.println("delete conf of source=" + source + ", id=" + confID + ", " + oldConf);
} else {
confMap.put(confID, myConfig); //更新或新增配置
System.out.println("update or add conf of source=" + source + ", id=" + confID + ", " + myConfig);
}
} else if (confType != ConfType.DELETE.type){
confMap = new HashMap<>();
confMap.put(confID, myConfig); //更新或新增配置
confState.put(source, confMap);
System.out.println("update or add conf of source=" + source + ", id=" + confID + ", " + myConfig);
}
}
}
public static class MyConfig {
private int id; //唯一标识
private int type; //新增、更新、删除
private String source; //资源名
private String parameter;
private int operator;
private String value;
public MyConfig(int id, int type, String source, String parameter, int operator, String value) {
this.id = id;
this.type = type;
this.source = source;
this.parameter = parameter;
this.operator = operator;
this.value = value;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public String getParameter() {
return parameter;
}
public void setParameter(String parameter) {
this.parameter = parameter;
}
public int getOperator() {
return operator;
}
public void setOperator(int operator) {
this.operator = operator;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public String toString() {
return "MyConfig{" +
"id=" + id +
", type=" + type +
", source='" + source + '\'' +
", parameter='" + parameter + '\'' +
", operator=" + operator +
", value='" + value + '\'' +
'}';
}
}
public enum ConfType {
ADD(0),
UPDATE(1),
DELETE(2);
private int type;
ConfType(int type) {
this.type = type;
}
}
public enum Operator {
EQUAL(0),
NEQUAL(1),
SMALLER(2),
BIGGER(3);
private int operator;
Operator(int op) {
this.operator = op;
}
}
}
为了记录方便,将该demo写到一个java文件中了,最后的运行的日志打印如下:
可以看到,数据
{"uid": "003", "timestamp":"1618559590", "actionID":3, "source":"a"}
因为不满足红框中的条件,所以被过滤掉了。而数据
{"uid": "003", "timestamp":"1618559690", "actionID":2, "source":"a"}
因为满足条件,所以被输出到控制台了。并且这个配置是不需要重启生效的,直接修改mysql的配置表,就能即时生效了。