天天看点

Debezium实现大数据变更状态捕获1、简介

1、简介

Debezium是用于捕获变更数据的开源分布式平台。可以响应数据库的所有插入,更新和删除操作。Debezium依赖于kafka上,所以在安装Debezium时需要提前安装好Zookeeper,Kafka,以及Kakfa Connect,可以处理的数据源有很多,包括关系型数据库如MySQL,Oracle,SQL Server等,NoSQL数据如MongoDB,Cassandra等,是一个优秀的CDC工具。

类似框架还有阿里Canal,也支持与消息中间件处理大数据响应的处理手段,学习路径可参考我的Redis专题:双写一致性,链接地址:https://blog.csdn.net/huxiang19851114/article/details/113622114

2、原理

Debezium&MySQL

与Canal类同,核心也是基于MySQL binlog机制,Debezium

目录

1、简介

2、原理

Debezium&MySQL

Debezuim&MongoDB

3、DockerCompose配置

4、启动及容器配置

5、禁用Kafka自动创建主题

6、业务实现

相关POM依赖

Kafka主题父类

Kafka主题业务类

连接器父类

连接器业务类MySQL

连接器业务类MongoDB

连接器公用配置

连接器私有配置MySQL

连接器私有配置MongoDB

7、业务输出

作为一个MySQL 伪Slave节点,请求获取Master节点的binlog,同步处理数据库的事务操作到Kafka数据队列,然后通过Kafka Connect作为输出端,异步对接业务应用

Debezium实现大数据变更状态捕获1、简介

Debezuim&MongoDB

Debezium MongoDB 的cdc 是基于复制集实现的,通过Mongo stream 进行数据的捕获处理,所以对于MongoDB来说,跟MySQL不同之处在于需要设置其replSet(哪怕是单机也需要),其实这也是另一种形式上的伪Slave节点

3、DockerCompose配置

version: '2'
services:
  zookeeper:
    container_name: zookeeper
    image: debezium/zookeeper
    restart: always
    ports:
     - 2181:2181
  kafka:
    container_name: kafka
    image: debezium/kafka
    restart: always
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
     - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.137.129:9092
     - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
    volumes:
     - /usr/local/kafkadata:/data
  mysql:
    container_name: mysql
    image: mysql:5.6
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=root
     - MYSQL_USER=huxiang
     - MYSQL_PASSWORD=huxiang
  mongodb:
    image: debezium/example-mongodb:0.9
    container_name: mongodb
    ports:
     - 27001:27017
    volumes:
     - /usr/local/mongodb/db:/data/db
     - /usr/local/mongodb/log:/var/logs/mongodb
     - /usr/local/mongodb/config:/etc/mongo
    environment:
     - MONGODB_USER=admin
     - MONGODB_PASSWORD=123456
    command: mongod --auth --replSet rs
  connect:
    container_name: connect
    image: debezium/connect
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
     - mongodb
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=test1
     - OFFSET_STORAGE_TOPIC=test2
     - STATUS_STORAGE_TOPIC=test3      

以上为基本的Docker Compose配置,需要注意的几点:

  • debezuim版本尽量不指定,使用最新版本,如果需要指定,请使用0.9+开源版本
  • 注意部署各容器之间的links依赖关系,如果不是同一个docker容器,需要通过ip+容器名来配置
  • MySQL和MongoDB都是需要认证请求的,所以需要设置账号密码,全新容器没有账号密码时会反复报连接失败,没关系,不影响数据库的使用,创建对应的账号密码后,会显示连接成功,如:
    Debezium实现大数据变更状态捕获1、简介
  • docker容器对数据持久化的处理可以通过配置存储卷来备份,各位根据自己需要进行配置volumes

4、启动及容器配置

通过docker-compose命令执行DockerCompose.yaml配置:

docker-compose -f docker-compose-mysql-mongo.yaml up -d      

启动后MySQL需要开启binlog日志,MongoDB需要设置副本集

#1.开启mysql binlog日志,并且设置binlog_format为row
docker exec mysql bash -c "echo 'log-bin=/var/lib/mysql/mysql-bin' >> /etc/mysql/mysql.conf.d/mysqld.cnf"
docker exec mysql bash -c "echo 'server-id=1' >> /etc/mysql/mysql.conf.d/mysqld.cnf"
docker exec mysql bash -c "echo 'binlog_format=ROW' >> /etc/mysql/mysql.conf.d/mysqld.cnf"
docker restart mysql
#查询是否开启成功
show variables like 'log_bin';
show variables like 'binlog_format';
​
#2.登录mongo客户端,设置mongo 副本集
docker exec -it mongodb mongo admin;
use admin;
rs.initiate({_id:'rs',members:[{_id:0,host:'192.168.137.129:27001'}]});#注意端口为映射端口,而不是服务端口      

5、禁用Kafka自动创建主题

实际生产中,很多时候kafka topic主题不希望使用自动创建模式,而希望自己在业务代码中指定,所以需要禁用kafka自动创建主题,开启debezuim connect连接器代理创建:

#1.禁用kafka自动创建主题,docker容器config目录下server.properties
docker exec -it kafka /bin/bash #进入kafka容器
vi config/server.properties #编辑
auto.create.topics.enable = false #设置kafka不自动创建主题
#重启kafka
docker restart kafka
​
#2.开启debezium connect 连接器代理创建,docker容器config目录下server.properties
docker exec -it connect /bin/bash #进入debezium connect容器
vi config/server.properties #编辑
topic.creation.enable = true #设置debezium connect可以自定义创建主题
#重启connect
docker restart connect      

6、业务实现

相关POM依赖

<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>2.5.1</version>
   <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
​
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
​
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
​
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.3.1</version>
        </dependency>
​
        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
        </dependency>
​
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
​
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpmime</artifactId>
            <version>4.5.2</version>
        </dependency>
​
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.11</version>
        </dependency>
​
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
    </dependencies>      

Kafka主题父类

import lombok.Data;
/**
 * 主题父类,其他个性化主题注册类继承该类
 */
@Data
public class Topic {
​
    private String include;
​
    private String replicationFactor;
​
    private String partitions;
​
    private String cleanupPolicy;
​
    private String retentionMs;
​
    private String compressionType;
}      

Kafka主题业务类

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
 * 个性化主题类,继承Topic父类
 */
@Component
@ConfigurationProperties(prefix = "debezium.jira.topic.creation.jira")
public class JiraTopic extends Topic{
}      

连接器父类

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
​
/**
 * 初始化连接器类--公共配置父类
 */
@RefreshScope
public class Connector{
​
   /**********************公共配置部分**********************************/
​
   @Value("${debezium.register.url}")
   protected String registerUrl;
​
   @Value("${debezium.topic.creation.default.replication.factor}")
   protected String defaultFactor;
​
   @Value("${debezium.topic.creation.default.partitions}")
   protected String defaultPartitions;
​
   @Value("${debezium.topic.creation.default.cleanup.policy}")
   protected String defaultCleanupPolicy;
​
   @Value("${debezium.topic.creation.default.compression.type}")
   protected String defaultCompressionType;
​
   @Value("${debezium.kafka.servers}")
   protected String kafkaServers;
​
   @Value("${debezium.override.enabled}")
   protected boolean overrideEnabled;
​
   @Value("${debezium.position}")
   protected String channelPosition;
​
   @Value("${debezium.snapshot.mode}")
   protected String snapshotMode;
}      

连接器业务类MySQL

import com.fasterxml.jackson.databind.ObjectMapper;
import com.paratera.console.datasync.bean.JiraTopic;
import com.paratera.console.datasync.bean.Topic;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
​
/**
 * 初始化连接器类
 */
​
@Component
@Order(value = 12)
@ConfigurationProperties(prefix = "debezium.jira")
public class JiraConnector extends Connector implements ApplicationRunner {
​
   /**********************自定义配置部分**********************************/
​
   @Value("${debezium.jira.connect.name}")
   private String connectName;
​
   @Value("${debezium.jira.database.hostname}")
   private String dbHostName;
​
   @Value("${debezium.jira.database.port}")
   private String dbPort;
​
   @Value("${debezium.jira.database.user}")
   private String dbUser;
​
   @Value("${debezium.jira.database.password}")
   private String dbPassword;
​
   @Value("${debezium.jira.column.include.list}")
   private String columnList;
​
   @Value("${debezium.jira.table.include.list}")
   private String tableList;
​
   @Value("${debezium.jira.database.include.list}")
   private String databaseList;
​
   @Value("${debezium.jira.topic.creation.groups}")
   private String topicCreationGroups;
​
   @Autowired
   private JiraTopic jiraTopic;
​
   @Override
   public void run(ApplicationArguments applicationArguments) throws Exception {
      CloseableHttpClient httpclient = HttpClients.createDefault();
      ResponseHandler<String> responseHandler = new ResponseHandler<String>() {
         @Override
         public String handleResponse(final HttpResponse response)
               throws ClientProtocolException, IOException {//
            int status = response.getStatusLine().getStatusCode();
            if (status >= 200 && status < 300) {
               HttpEntity entity = response.getEntity();
               return entity != null ? EntityUtils.toString(entity) : null;
            } else {
               throw new ClientProtocolException(
                     "Unexpected response status: " + status);
            }
         }
      };
      System.out.println("------------------------------------------------------------");
      System.out.println("开始查询: " + connectName + " 数据连接器.................");
      HttpGet httpGet = new HttpGet(registerUrl);
      httpGet.addHeader("Accept", "application/json;charset=UTF-8");
      String response = httpclient.execute(httpGet, responseHandler);
      ObjectMapper mapper = new ObjectMapper();
      List array = mapper.readValue(response , List.class);
      if(array.contains(connectName)){
         System.out.println("连接器: " +connectName+ " 已存在,判断是否允许覆盖!");
         if(overrideEnabled){
            HttpDelete httpDelete = new HttpDelete(registerUrl + "/" + connectName);
            httpclient.execute(httpDelete, responseHandler);
            System.out.println("原连接器已删除!");
         }else{
            return;
         }
      }
      System.out.println("开始注册" + connectName + "数据连接器.................");
      Map obj = new HashMap();
      obj.put("name" , connectName);
      Map config = new HashMap();
      config.put("connector.class" , "io.debezium.connector.mysql.MySqlConnector");
      config.put("tasks.max" , "1");
      config.put("database.hostname" , dbHostName);
      config.put("database.port" , dbPort);
      config.put("database.user" , dbUser);
      config.put("database.password" , dbPassword);
      config.put("database.server.id" , "184051");
      config.put("database.server.name" , "dbserver");
      config.put("database.include.list" , databaseList);
      config.put("table.include.list" , tableList);
      config.put("column.include.list" , columnList);
      config.put("database.history.kafka.bootstrap.servers" , kafkaServers);
      config.put("database.history.kafka.topic" , "dbhistory.console");
      //config.put("gtid.new.channel.position" , channelPosition);
      //config.put("snapshot.mode" , snapshotMode);
​
      //设置默认topic组属性
      config.put("topic.creation.default.replication.factor" , defaultFactor);
      config.put("topic.creation.default.partitions" , defaultPartitions);
      config.put("topic.creation.default.cleanup.policy" , defaultCleanupPolicy);
      config.put("topic.creation.default.compression.type" , defaultCompressionType);
​
      //设置自定义topic组
      config.put("topic.creation.groups",topicCreationGroups);
      //设置jira topic
      setttingTopic(config , jiraTopic,"jira");
      obj.put("config" , config);
​
      //注册连接器
      HttpPost httpPost = new HttpPost(registerUrl);
      httpPost.addHeader("Content-Type", "application/json;charset=UTF-8");
      httpPost.addHeader("Accept", "application/json;charset=UTF-8");
      StringEntity stringEntity = new StringEntity(mapper.writeValueAsString(obj), "UTF-8");
      stringEntity.setContentEncoding("UTF-8");
      httpPost.setEntity(stringEntity);
​
      String responseBody = httpclient.execute(httpPost, responseHandler);
      System.out.println("连接器:" + connectName + " 完成注册!!");
//    System.out.println(responseBody); // for security
      System.out.println("------------------------------------------------------------\n");
   }
​
   /**
    * 设置topic属性
    * @param config
    * @param topic
    * @param topicName
    */
   private void setttingTopic(Map config , Topic topic , String topicName) {
      config.put("topic.creation." + topicName + ".include",topic.getInclude());
      config.put("topic.creation." + topicName + ".replication.factor",topic.getReplicationFactor());
      config.put("topic.creation." + topicName + ".partitions",topic.getPartitions());
      config.put("topic.creation." + topicName + ".cleanup.policy",topic.getCleanupPolicy());
      config.put("topic.creation." + topicName + ".retention.ms",topic.getRetentionMs());
      config.put("topic.creation." + topicName + ".compression.type",topic.getCompressionType());
   }
}      

连接器业务类MongoDB

import com.fasterxml.jackson.databind.ObjectMapper;
import com.paratera.console.datasync.bean.Topic;
import com.paratera.console.datasync.bean.UserprefTopic;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
​
/**
 * 初始化连接器类
 */
​
@Component
@Order(value = 13)
@ConfigurationProperties(prefix = "debezium.userpref")
public class UserprefConnector extends Connector implements ApplicationRunner {
​
   /**********************自定义配置部分**********************************/
​
   @Value("${debezium.userpref.connect.name}")
   private String connectName;
​
   @Value("${debezium.userpref.mongodb.hosts}")
   private String dbHostName;
​
   @Value("${debezium.userpref.mongodb.name}")
   private String dbTopicName;
​
   @Value("${debezium.userpref.mongodb.user}")
   private String dbUser;
​
   @Value("${debezium.userpref.mongodb.password}")
   private String dbPassword;
​
   @Value("${debezium.userpref.mongodb.database.whitelist}")
   private String dbList;
​
   @Value("${debezium.userpref.mongodb.table.whitelist}")
   private String tableList;
​
   @Value("${debezium.userpref.topic.creation.groups}")
   private String topicCreationGroups;
​
   @Autowired
   private UserprefTopic userprefTopic;
​
   @Override
   public void run(ApplicationArguments applicationArguments) throws Exception {
      CloseableHttpClient httpclient = HttpClients.createDefault();
      ResponseHandler<String> responseHandler = new ResponseHandler<String>() {
         @Override
         public String handleResponse(final HttpResponse response)
               throws ClientProtocolException, IOException {//
            int status = response.getStatusLine().getStatusCode();
            if (status >= 200 && status < 300) {
               HttpEntity entity = response.getEntity();
               return entity != null ? EntityUtils.toString(entity) : null;
            } else {
               throw new ClientProtocolException(
                     "Unexpected response status: " + status);
            }
         }
      };
      System.out.println("------------------------------------------------------------");
      System.out.println("开始查询: " + connectName + " 数据连接器.................");
      HttpGet httpGet = new HttpGet(registerUrl);
      httpGet.addHeader("Accept", "application/json;charset=UTF-8");
      String response = httpclient.execute(httpGet, responseHandler);
      ObjectMapper mapper = new ObjectMapper();
      List array = mapper.readValue(response , List.class);
      if(array.contains(connectName)){
         System.out.println("连接器: " +connectName+ " 已存在,判断是否允许覆盖!");
         if(overrideEnabled){
            HttpDelete httpDelete = new HttpDelete(registerUrl + "/" + connectName);
            httpclient.execute(httpDelete, responseHandler);
            System.out.println("原连接器已删除!");
         }else{
            return;
         }
      }
      System.out.println("开始注册" + connectName + "数据连接器.................");
      Map obj = new HashMap();
      obj.put("name" , connectName);
      Map config = new HashMap();
      config.put("connector.class" , "io.debezium.connector.mongodb.MongoDbConnector");
      config.put("tasks.max" , "1");
      config.put("mongodb.hosts" , dbHostName);
      config.put("mongodb.name" , dbTopicName);
      config.put("database.whitelist",dbList);
      config.put("table.whitelist", tableList);
      config.put("mongodb.user" , dbUser);
      config.put("mongodb.password" , dbPassword);
      config.put("database.server.id" , "184054");
      config.put("database.server.name" , "dbserver");
      config.put("database.history.kafka.bootstrap.servers" , kafkaServers);
      config.put("database.history.kafka.topic" , "dbhistory.console");
​
      //设置默认topic组属性
      config.put("topic.creation.default.replication.factor" , defaultFactor);
      config.put("topic.creation.default.partitions" , defaultPartitions);
      config.put("topic.creation.default.cleanup.policy" , defaultCleanupPolicy);
      config.put("topic.creation.default.compression.type" , defaultCompressionType);
​
      //设置自定义topic组
      config.put("topic.creation.groups",topicCreationGroups);
      //设置userpref topic
      setttingTopic(config , userprefTopic,"userpref");
      obj.put("config" , config);
​
      //注册连接器
      HttpPost httpPost = new HttpPost(registerUrl);
      httpPost.addHeader("Content-Type", "application/json;charset=UTF-8");
      httpPost.addHeader("Accept", "application/json;charset=UTF-8");
      StringEntity stringEntity = new StringEntity(mapper.writeValueAsString(obj), "UTF-8");
      stringEntity.setContentEncoding("UTF-8");
      httpPost.setEntity(stringEntity);
​
      String responseBody = httpclient.execute(httpPost, responseHandler);
      System.out.println("连接器:" + connectName + " 完成注册!!");
//    System.out.println(responseBody); // for security
      System.out.println("------------------------------------------------------------\n");
   }
​
   /**
    * 设置topic属性
    * @param config
    * @param topic
    * @param topicName
    */
   private void setttingTopic(Map config , Topic topic , String topicName) {
      config.put("topic.creation." + topicName + ".include",topic.getInclude());
      config.put("topic.creation." + topicName + ".replication.factor",topic.getReplicationFactor());
      config.put("topic.creation." + topicName + ".partitions",topic.getPartitions());
      config.put("topic.creation." + topicName + ".cleanup.policy",topic.getCleanupPolicy());
      config.put("topic.creation." + topicName + ".retention.ms",topic.getRetentionMs());
      config.put("topic.creation." + topicName + ".compression.type",topic.getCompressionType());
   }
}      

连接器公用配置

#--------------------debezium注册连接器配置----------------------------------
debezium:
  register:
    url: ${console.debezium.connect.url}
  kafka:
    servers: ${console.debezium.kafka.url} #可以多个,逗号间隔
  override:
    enabled: true
  topic:
    creation:
      #必须指定默认的topic属性
      default:
        replication:
          factor: 1
        partitions: 10
        cleanup:
          policy: compact
        compression:
          type: lz4
  position: earliest #同步坐标,默认(earliest )从第一个有效事务开始,latest从新的事务开始
  snapshot:
    mode: schema_only      

连接器私有配置MySQL

#--------------------debezium注册连接器配置----------------------------------
debezium:
  jira:
    connect:
      name: jira_connector
    database:
      hostname: ${console.debezium.jira.db.url} #数据库docker容器名,如果不是docker容器,需要写具体的ip或者域名
      port: ${console.debezium.jira.db.port}
      user: ${console.debezium.jira.db.user}
      password: ${console.debezium.jira.db.password}
      include:
        list: "jira" #可以多个,逗号间隔
    table:
      include:
        #可以多个,逗号间隔
        list: "jira.my_ticket_mv"
    column:
      include:
        #可以多个,逗号间隔
        list: "jira.my_ticket_mv.ticket_id,jira.my_ticket_mv.state,jira.my_ticket_mv.issuetype,jira.my_ticket_mv.state_code"
    topic:
      creation:
        groups: "jira" #自定义topic组,可多个,逗号间隔,与下面属性配置对应
        jira:
          include: dbserver\\.jira\\.*
          replicationFactor: 1
          partitions: 20
          cleanupPolicy: delete
          retentionMs: 7776000000
          compressionType: producer      

连接器私有配置MongoDB

#--------------------debezium注册连接器配置----------------------------------
debezium:
  userpref:
    connect:
      name: userpref_connector
    mongodb:
      hosts: ${console.debezium.userpref.mongodb.hosts}
      name: dbserver
      user: ${console.debezium.userpref.mongodb.user}
      password: ${console.debezium.userpref.mongodb.password}
      database:
        #可以多个,逗号间隔
        whitelist: "userpref"
      table:
        whitelist: "userpref"
    topic:
      creation:
        groups: "userpref" #自定义topic组,可多个,逗号间隔,与下面属性配置对应
        userpref:
          include: dbserver\\.userpref\\.*
          replicationFactor: 1
          partitions: 20
          cleanupPolicy: delete
          retentionMs: 7776000000
          compressionType: producer      

7、业务输出

package com.paratera.console.datasync.listener;
​
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.paratera.console.datasync.facade.BizClient;
import com.paratera.console.datasync.model.UserInfo;
import com.paratera.console.datasync.model.UserPref;
import com.paratera.console.datasync.service.UserInfoService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.BasicQuery;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
​
/**
 * 消费者监听类
 */
​
@Component
public class UserprefConsumer {
​
    private Logger logger = LoggerFactory.getLogger(UserprefConsumer.class);
    
    @Autowired
    private UserInfoService userInfoService;
​
    @Autowired
    private MongoTemplate mongoTemplate;
​
    @Autowired
    private BizClient bizClient;
​
    // 消费监听
    @KafkaListener(topics = {"dbserver.userpref.userpref"},groupId = "userpref",properties = {"print.key=true"})
    public void onMessage(List<ConsumerRecord> consumerRecords) throws IOException {
        ObjectMapper mapper = new ObjectMapper();
        Iterator<ConsumerRecord> iterator = consumerRecords.iterator();
        while (iterator.hasNext()) {
            ConsumerRecord record = iterator.next();
            String value = (String) record.value();
            String key = (String) record.key();
            if (value != null) {
                Map jsonObject = mapper.readValue(value, Map.class);
                Map payload = (Map) jsonObject.get("payload");
                String op = (String) payload.get("op");
                switch (op) {
                    //新增操作
                    case "c":
                        //获取到对象信息
                        Map after = mapper.readValue((String) payload.get("after"),Map.class);
                        if(after != null){
                            logger.debug("触发新增操作,需要更新userinfo的数据为:" + after.toString());
                            String userId = (String) after.get("userId");
                            Map<String, String> up = (Map) after.get("userPref");
                            settingUserInfo(userId, up);
                        };
                        break;
                    case "u":
                        //获取到对象信息
                        Map patch = mapper.readValue((String) payload.get("patch"),Map.class);
                        Map filter = mapper.readValue((String) payload.get("filter"),Map.class);
                        if(patch != null){
                            logger.debug("触发更新操作,需要更新的数据为:" + patch.toString());
                            logger.debug("触发更新操作,更新的条件为:" + filter.toString());
                            //因为debezuim对于更新,删除操作不会把实际的查询条件同步过来,只会同步文档id,所以需要把userId查询出来
                            UserPref userPref = mongoTemplate.findById(new ObjectId((String) filter.get("_id")), UserPref.class,"userpref");
                            String userId = userPref.getUserId();
                            Map<String, String> up = userPref.getUserPref();
                            settingUserInfo(userId, up);
                        };
                        break;
                    case "d":
                        if(key != null){
                            logger.debug("触发删除操作,需要删除的数据为:" + payload.get("filter"));
                        }
                        break;
                    default:
                        logger.debug("未知操作");
                }
            }
        }
    }
​
    private void settingUserInfo(String userId, Map<String, String> up) {
        /**
         * 设置用户偏好
         *  1、如果billingShowMoney为0,billingShowCoreTime为1,则设置2(用量)
         *  2、其他情况,设置1(金额)
         */
        UserInfo userInfo = new UserInfo();
        userInfo.setId(userId);
        if(Double.valueOf(String.valueOf(up.get("billingShowMoney"))) == 0.0
            && Double.valueOf(String.valueOf(up.get("billingShowCoreTime"))) == 1.0){
            userInfo.setUserPref(2);
        }else {
            userInfo.setUserPref(1);
        }
        userInfoService.updateByPrimaryKeySelective(userInfo);
        //同时清除用户缓存
        bizClient.clearCacheOfUserInfo(userId);
    }
}