天天看点

基于mysql binlog和flink broadcast实现配置动态更新CDCmysql binlog

在flink流式计算中,需要动态的更新配置,而无需重启作业进程。

通过mysql binlog机制,可以实现配置的变化并可以捕获这一变化;

通过flink的broadcast机制,可以将这一动态变化广播到业务流,并进行相应的逻辑处理,最终实现配置的动态更新。

下面写一个简单的demo,仅供平时学习积累使用。

CDC

CDC全称Change Data Capture,变动数据捕获。它的核心思想是,监测并捕获数据库的变动,将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

mysql binlog

MySQL 可以通过 binlog记录数据库的变动事件,而 MySQL 本身可以借助 binlog 来实现主从复制。binglog存储成二进制文件,可以通过mysql-binlog-connector-java包提供的类进行解析。下面是一个demo。

1. 创建一张配置表。

CREATE TABLE tb_config(
`id` INT(11) NOT NULL AUTO_INCREMENT,
`source` VARCHAR(15),
`parameter` VARCHAR(15),
`operator` INT COMMENT "0是等于;是不等于;2是小于;3是大于",
`value` VARCHAR(15),
PRIMARY KEY (`id`) 
);
           

2. 查看数据库中binlog信息,Log_name就是bin log文件名,FileSize是该文件的大小。

SHOW MASTER LOGS;
           
基于mysql binlog和flink broadcast实现配置动态更新CDCmysql binlog

3. 通过BinaryLogClient解析binlog。

public static void main(String[] args) throws Exception {
        final BinaryLogClient client = 
                new BinaryLogClient("127.0.0.1", 3306, "root", "root");
        client.setBinlogFilename("LAPTOP-LH-bin.000038"); //指定binlog文件
        client.setBinlogPosition(0); //指定从binlog的哪个位置开始读取
        client.registerEventListener(new BinaryLogClient.EventListener() {
            public void onEvent(Event event) {
                System.out.println("===========" + client.getBinlogPosition() + "===========");
                System.out.println("info=" + event.toString());
            }
        });
        client.connect();
    }
           

日志如下,此时如果对于数据库表进行增删改操作,binlog都会追加相应的信息。

===========2901===========
info=Event{header=EventHeaderV4{timestamp=1618712448000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=29, nextPosition=3016, flags=0}, data=WriteRowsEventData{tableId=1527, includedColumns={0, 1, 2}, rows=[
    [uid, 0, 004]
]}}
===========3016===========
info=Event{header=EventHeaderV4{timestamp=1618712448000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=3047, flags=0}, data=XidEventData{xid=1794}}
           

配置动态更新Demo

数据源准备

测试数据来源于kafka的tb_data topic,配置数据源来源于mysql的tb_config表,最终根据配置,将处理后的测试数据,输出到控制台。

tb_data的数据格式是一个简单json:

{"uid": "001", "timestamp":"1618559580", "actionID":1, "source":"a"}
{"uid": "003", "timestamp":"1618559590", "actionID":3, "source":"a"}
{"uid": "003", "timestamp":"1618559690", "actionID":2, "source":"a"}
{"uid": "001", "timestamp":"1618559779", "actionID":2, "source":"a"}
           

tb_config表的数据样例如下,id是自增键值;source是用于和配置关联的资源类型;parameter是操作字段;operator是操作类型,是一个枚举值;value是操作值,即用于和数据进行比较的值。

第一条规则是过滤出uid为003的值,第二条规则是过滤出timestamp大于1618559580的值。这两条配置的关系是与,即当两个条件都满足的数据,才会输出。

基于mysql binlog和flink broadcast实现配置动态更新CDCmysql binlog

完整Demo

个人原创,和业务无关,仅供学习积累

/**
 * Created by LH on 2021/4/14 17:52
 */
public class BroadCastDemo {
    public static void main(String[] args) throws Exception {
        //1. 流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2. 执行环境配置
        env.setParallelism(1);

        //3. 配置kafka数据源DataStream
        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "lh-node-2:9092");
        kafkaProps.put("group.id", "flink_group_1");
        kafkaProps.put("session.timeout.ms", "30000");
        kafkaProps.put("auto.offset.reset", "earliest");
        kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        FlinkKafkaConsumer<String> dataConsumer = new FlinkKafkaConsumer<String>("tb_data", new SimpleStringSchema(), kafkaProps);

        DataStreamSource<String> kafkaDataSource = env.addSource(dataConsumer);

        DataStream<Map<String, Object>> kafkaDS = kafkaDataSource.map(new MapFunction<String, Map<String, Object>>() {
            @Override
            public Map<String, Object> map(String s) throws Exception {
                return JSONObject.parseObject(s, new TypeReference<Map<String, Object>>(){});
            }
        });

        // 4.mysql配置源DataStream
        DebeziumSourceFunction<MyConfig> mysqlSource = MySQLSource.<MyConfig>builder()
                .hostname("127.0.0.1")
                .port(3306)
                .username("root")
                .password("root")
                .databaseList("flinkdb")
                .tableList("flinkdb.tb_config")
                .debeziumProperties(new Properties())
                .deserializer(new MyDeserializationSchema())
                .build();

        MapStateDescriptor<String, MyConfig> configStateDescriptor =
                new MapStateDescriptor<>(
                        "config-cdc",
                        BasicTypeInfo.STRING_TYPE_INFO,
                        TypeInformation.of(new TypeHint<MyConfig>() {}));

        BroadcastStream<MyConfig> configBroadCast = env.addSource(mysqlSource).broadcast(configStateDescriptor);

        DataStream<String> outDataStream = kafkaDS
                .connect(configBroadCast)
                .process(new MyBroadcastProcessFunction());

        kafkaDS.print("in");

        outDataStream.print("out");

        env.execute();
    }

    public static class MyDeserializationSchema implements DebeziumDeserializationSchema<MyConfig>{
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<MyConfig> collector) {
            Struct struct  = (Struct) sourceRecord.value(); //org.apache.kafka.connect.data.Struct
            Struct source = struct.getStruct("source");
            String db = source.getString("db");
            String table = source.getString("table");
            String op = struct.getString("op");

            if (db.equals("flinkdb") && table.equals("tb_config")){
                Struct conf = null;
                int type = 0;
                if (op.equals("c")){ //新增配置
                    conf = struct.getStruct("after");
                    type = ConfType.ADD.type;
                } else if (op.equals("u")){ //更新配置
                    conf = struct.getStruct("after");
                    type = ConfType.UPDATE.type;
                } else if (op.equals("d")){ //删除配置
                    conf = struct.getStruct("before");
                    type = ConfType.DELETE.type;
                } else return; //跳过

                int id = conf.getInt32("id");
                String tb_source = conf.getString("source");
                String parameter = conf.getString("parameter");
                int operator = conf.getInt32("operator");
                String value = conf.getString("value");
                MyConfig myConfig = new MyConfig(id, type, tb_source, parameter, operator, value);
                System.out.println("========collect config========");
                System.out.println(myConfig);

                collector.collect(myConfig);
            }
        }

        @Override
        public TypeInformation<MyConfig> getProducedType() {
            return TypeInformation.of(MyConfig.class);
        }
    }


    public static class MyBroadcastProcessFunction
            extends BroadcastProcessFunction<Map<String, Object>, MyConfig, String> {

        /**
         * configStateDescriptor用于存储广播到的配置信息
         * key是source名称,根据不同的source资源,存储一组不同的配置信息
         * value是一组配置信息列表,key是配置信息的主键,也是唯一标识;value是配置信息。map中的配置关系是与关系。
         */
        private MapStateDescriptor<String, Map<Integer, MyConfig>> configStateDescriptor =
                new MapStateDescriptor<>(
                        "config-cdc",
                        BasicTypeInfo.STRING_TYPE_INFO, //source的类型,通过source匹配data和config
                        TypeInformation.of(new TypeHint<Map<Integer, MyConfig>>() {}));

        @Override
        public void processElement(Map<String, Object> dataMap, ReadOnlyContext readOnlyContext, Collector<String> collector)
                throws Exception {
            System.out.println("======processElement=======");

            String source = dataMap.get("source").toString();

            Map<Integer, MyConfig> configMap =
                    readOnlyContext.getBroadcastState(configStateDescriptor).get(source);

            if (configMap != null && configMap.size() > 0){
                //满足配置中的所有条件,才能输出,一旦不满足,就直接返回,不采集这条数据,即过滤掉了。
                for (Map.Entry<Integer, MyConfig> configEntry: configMap.entrySet()){
                    MyConfig config = configEntry.getValue();
                    String parameter = config.parameter;
                    int operator = config.operator;
                    String value = config.value;

                    if (dataMap.containsKey(parameter)){
                        String dataValue = dataMap.get(parameter).toString();
                        if ((operator == Operator.EQUAL.operator && dataValue.equals(value))
                                || (operator == Operator.NEQUAL.operator && !dataValue.equals(value))
                                || (operator == Operator.SMALLER.operator && dataValue.compareTo(value) < 0)
                                || (operator == Operator.BIGGER.operator && dataValue.compareTo(value) > 0)
                        ) {
                            System.out.println("data: " + dataMap.toString() + " pass the inspection, conf: " + config);
                        } else {
                            System.out.println("data: " + dataMap.toString() + " didn't pass the inspection, conf: " + config);
                            return; //不满足其中一个条件,就直接返回了,不采集。
                        }
                    }
                }
            }

            collector.collect(dataMap.toString());
        }

        @Override
        public void processBroadcastElement(MyConfig myConfig, Context context, Collector<String> collector) throws Exception {
            System.out.println("======processBroadcastElement=======");

            String source = myConfig.getSource();
            int confType = myConfig.getType();
            int confID = myConfig.getId();

            BroadcastState<String, Map<Integer, MyConfig>> confState = context.getBroadcastState(configStateDescriptor);

            Map<Integer, MyConfig> confMap = confState.get(source);
            if (confMap != null && confMap.size() > 0){
                if (confType == ConfType.DELETE.type){
                    MyConfig oldConf = confMap.remove(confID); //如果配置更新的类型是删掉,那么删掉map中的配置
                    System.out.println("delete conf of source=" + source + ", id=" + confID + ", " + oldConf);
                } else {
                    confMap.put(confID, myConfig); //更新或新增配置
                    System.out.println("update or add conf of source=" + source + ", id=" + confID + ", " + myConfig);

                }
            } else if (confType != ConfType.DELETE.type){
                confMap = new HashMap<>();
                confMap.put(confID, myConfig); //更新或新增配置
                confState.put(source, confMap);
                System.out.println("update or add conf of source=" + source + ", id=" + confID + ", " + myConfig);
            }
        }
    }
    public static class MyConfig {
        private int id;            //唯一标识
        private int type;       //新增、更新、删除
        private String source;     //资源名
        private String parameter;
        private int operator;
        private String value;

        public MyConfig(int id, int type, String source, String parameter, int operator, String value) {
            this.id = id;
            this.type = type;
            this.source = source;
            this.parameter = parameter;
            this.operator = operator;
            this.value = value;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public int getType() {
            return type;
        }

        public void setType(int type) {
            this.type = type;
        }

        public String getSource() {
            return source;
        }

        public void setSource(String source) {
            this.source = source;
        }

        public String getParameter() {
            return parameter;
        }

        public void setParameter(String parameter) {
            this.parameter = parameter;
        }

        public int getOperator() {
            return operator;
        }

        public void setOperator(int operator) {
            this.operator = operator;
        }

        public String getValue() {
            return value;
        }

        public void setValue(String value) {
            this.value = value;
        }

        @Override
        public String toString() {
            return "MyConfig{" +
                    "id=" + id +
                    ", type=" + type +
                    ", source='" + source + '\'' +
                    ", parameter='" + parameter + '\'' +
                    ", operator=" + operator +
                    ", value='" + value + '\'' +
                    '}';
        }
    }

    public enum ConfType {
        ADD(0),
        UPDATE(1),
        DELETE(2);

        private int type;

        ConfType(int type) {
            this.type = type;
        }
    }

    public enum Operator {
        EQUAL(0),
        NEQUAL(1),
        SMALLER(2),
        BIGGER(3);

        private int operator;

        Operator(int op) {
            this.operator = op;
        }
    }
}
           

为了记录方便,将该demo写到一个java文件中了,最后的运行的日志打印如下:

基于mysql binlog和flink broadcast实现配置动态更新CDCmysql binlog

可以看到,数据

{"uid": "003", "timestamp":"1618559590", "actionID":3, "source":"a"}
           

因为不满足红框中的条件,所以被过滤掉了。而数据

{"uid": "003", "timestamp":"1618559690", "actionID":2, "source":"a"}
           

因为满足条件,所以被输出到控制台了。并且这个配置是不需要重启生效的,直接修改mysql的配置表,就能即时生效了。

继续阅读