1、简介
Debezium是用于捕获变更数据的开源分布式平台。可以响应数据库的所有插入,更新和删除操作。Debezium依赖于kafka上,所以在安装Debezium时需要提前安装好Zookeeper,Kafka,以及Kakfa Connect,可以处理的数据源有很多,包括关系型数据库如MySQL,Oracle,SQL Server等,NoSQL数据如MongoDB,Cassandra等,是一个优秀的CDC工具。
类似框架还有阿里Canal,也支持与消息中间件处理大数据响应的处理手段,学习路径可参考我的Redis专题:双写一致性,链接地址:https://blog.csdn.net/huxiang19851114/article/details/113622114
2、原理
Debezium&MySQL
与Canal类同,核心也是基于MySQL binlog机制,Debezium
目录
1、简介
2、原理
Debezium&MySQL
Debezuim&MongoDB
3、DockerCompose配置
4、启动及容器配置
5、禁用Kafka自动创建主题
6、业务实现
相关POM依赖
Kafka主题父类
Kafka主题业务类
连接器父类
连接器业务类MySQL
连接器业务类MongoDB
连接器公用配置
连接器私有配置MySQL
连接器私有配置MongoDB
7、业务输出
作为一个MySQL 伪Slave节点,请求获取Master节点的binlog,同步处理数据库的事务操作到Kafka数据队列,然后通过Kafka Connect作为输出端,异步对接业务应用
Debezuim&MongoDB
Debezium MongoDB 的cdc 是基于复制集实现的,通过Mongo stream 进行数据的捕获处理,所以对于MongoDB来说,跟MySQL不同之处在于需要设置其replSet(哪怕是单机也需要),其实这也是另一种形式上的伪Slave节点
3、DockerCompose配置
version: '2'
services:
zookeeper:
container_name: zookeeper
image: debezium/zookeeper
restart: always
ports:
- 2181:2181
kafka:
container_name: kafka
image: debezium/kafka
restart: always
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.137.129:9092
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
volumes:
- /usr/local/kafkadata:/data
mysql:
container_name: mysql
image: mysql:5.6
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=root
- MYSQL_USER=huxiang
- MYSQL_PASSWORD=huxiang
mongodb:
image: debezium/example-mongodb:0.9
container_name: mongodb
ports:
- 27001:27017
volumes:
- /usr/local/mongodb/db:/data/db
- /usr/local/mongodb/log:/var/logs/mongodb
- /usr/local/mongodb/config:/etc/mongo
environment:
- MONGODB_USER=admin
- MONGODB_PASSWORD=123456
command: mongod --auth --replSet rs
connect:
container_name: connect
image: debezium/connect
ports:
- 8083:8083
links:
- kafka
- mysql
- mongodb
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=test1
- OFFSET_STORAGE_TOPIC=test2
- STATUS_STORAGE_TOPIC=test3
以上为基本的Docker Compose配置,需要注意的几点:
- debezuim版本尽量不指定,使用最新版本,如果需要指定,请使用0.9+开源版本
- 注意部署各容器之间的links依赖关系,如果不是同一个docker容器,需要通过ip+容器名来配置
- MySQL和MongoDB都是需要认证请求的,所以需要设置账号密码,全新容器没有账号密码时会反复报连接失败,没关系,不影响数据库的使用,创建对应的账号密码后,会显示连接成功,如:
- docker容器对数据持久化的处理可以通过配置存储卷来备份,各位根据自己需要进行配置volumes
4、启动及容器配置
通过docker-compose命令执行DockerCompose.yaml配置:
docker-compose -f docker-compose-mysql-mongo.yaml up -d
启动后MySQL需要开启binlog日志,MongoDB需要设置副本集
#1.开启mysql binlog日志,并且设置binlog_format为row
docker exec mysql bash -c "echo 'log-bin=/var/lib/mysql/mysql-bin' >> /etc/mysql/mysql.conf.d/mysqld.cnf"
docker exec mysql bash -c "echo 'server-id=1' >> /etc/mysql/mysql.conf.d/mysqld.cnf"
docker exec mysql bash -c "echo 'binlog_format=ROW' >> /etc/mysql/mysql.conf.d/mysqld.cnf"
docker restart mysql
#查询是否开启成功
show variables like 'log_bin';
show variables like 'binlog_format';
#2.登录mongo客户端,设置mongo 副本集
docker exec -it mongodb mongo admin;
use admin;
rs.initiate({_id:'rs',members:[{_id:0,host:'192.168.137.129:27001'}]});#注意端口为映射端口,而不是服务端口
5、禁用Kafka自动创建主题
实际生产中,很多时候kafka topic主题不希望使用自动创建模式,而希望自己在业务代码中指定,所以需要禁用kafka自动创建主题,开启debezuim connect连接器代理创建:
#1.禁用kafka自动创建主题,docker容器config目录下server.properties
docker exec -it kafka /bin/bash #进入kafka容器
vi config/server.properties #编辑
auto.create.topics.enable = false #设置kafka不自动创建主题
#重启kafka
docker restart kafka
#2.开启debezium connect 连接器代理创建,docker容器config目录下server.properties
docker exec -it connect /bin/bash #进入debezium connect容器
vi config/server.properties #编辑
topic.creation.enable = true #设置debezium connect可以自定义创建主题
#重启connect
docker restart connect
6、业务实现
相关POM依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.3.1</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
</dependencies>
Kafka主题父类
import lombok.Data;
/**
* 主题父类,其他个性化主题注册类继承该类
*/
@Data
public class Topic {
private String include;
private String replicationFactor;
private String partitions;
private String cleanupPolicy;
private String retentionMs;
private String compressionType;
}
Kafka主题业务类
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 个性化主题类,继承Topic父类
*/
@Component
@ConfigurationProperties(prefix = "debezium.jira.topic.creation.jira")
public class JiraTopic extends Topic{
}
连接器父类
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
/**
* 初始化连接器类--公共配置父类
*/
@RefreshScope
public class Connector{
/**********************公共配置部分**********************************/
@Value("${debezium.register.url}")
protected String registerUrl;
@Value("${debezium.topic.creation.default.replication.factor}")
protected String defaultFactor;
@Value("${debezium.topic.creation.default.partitions}")
protected String defaultPartitions;
@Value("${debezium.topic.creation.default.cleanup.policy}")
protected String defaultCleanupPolicy;
@Value("${debezium.topic.creation.default.compression.type}")
protected String defaultCompressionType;
@Value("${debezium.kafka.servers}")
protected String kafkaServers;
@Value("${debezium.override.enabled}")
protected boolean overrideEnabled;
@Value("${debezium.position}")
protected String channelPosition;
@Value("${debezium.snapshot.mode}")
protected String snapshotMode;
}
连接器业务类MySQL
import com.fasterxml.jackson.databind.ObjectMapper;
import com.paratera.console.datasync.bean.JiraTopic;
import com.paratera.console.datasync.bean.Topic;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 初始化连接器类
*/
@Component
@Order(value = 12)
@ConfigurationProperties(prefix = "debezium.jira")
public class JiraConnector extends Connector implements ApplicationRunner {
/**********************自定义配置部分**********************************/
@Value("${debezium.jira.connect.name}")
private String connectName;
@Value("${debezium.jira.database.hostname}")
private String dbHostName;
@Value("${debezium.jira.database.port}")
private String dbPort;
@Value("${debezium.jira.database.user}")
private String dbUser;
@Value("${debezium.jira.database.password}")
private String dbPassword;
@Value("${debezium.jira.column.include.list}")
private String columnList;
@Value("${debezium.jira.table.include.list}")
private String tableList;
@Value("${debezium.jira.database.include.list}")
private String databaseList;
@Value("${debezium.jira.topic.creation.groups}")
private String topicCreationGroups;
@Autowired
private JiraTopic jiraTopic;
@Override
public void run(ApplicationArguments applicationArguments) throws Exception {
CloseableHttpClient httpclient = HttpClients.createDefault();
ResponseHandler<String> responseHandler = new ResponseHandler<String>() {
@Override
public String handleResponse(final HttpResponse response)
throws ClientProtocolException, IOException {//
int status = response.getStatusLine().getStatusCode();
if (status >= 200 && status < 300) {
HttpEntity entity = response.getEntity();
return entity != null ? EntityUtils.toString(entity) : null;
} else {
throw new ClientProtocolException(
"Unexpected response status: " + status);
}
}
};
System.out.println("------------------------------------------------------------");
System.out.println("开始查询: " + connectName + " 数据连接器.................");
HttpGet httpGet = new HttpGet(registerUrl);
httpGet.addHeader("Accept", "application/json;charset=UTF-8");
String response = httpclient.execute(httpGet, responseHandler);
ObjectMapper mapper = new ObjectMapper();
List array = mapper.readValue(response , List.class);
if(array.contains(connectName)){
System.out.println("连接器: " +connectName+ " 已存在,判断是否允许覆盖!");
if(overrideEnabled){
HttpDelete httpDelete = new HttpDelete(registerUrl + "/" + connectName);
httpclient.execute(httpDelete, responseHandler);
System.out.println("原连接器已删除!");
}else{
return;
}
}
System.out.println("开始注册" + connectName + "数据连接器.................");
Map obj = new HashMap();
obj.put("name" , connectName);
Map config = new HashMap();
config.put("connector.class" , "io.debezium.connector.mysql.MySqlConnector");
config.put("tasks.max" , "1");
config.put("database.hostname" , dbHostName);
config.put("database.port" , dbPort);
config.put("database.user" , dbUser);
config.put("database.password" , dbPassword);
config.put("database.server.id" , "184051");
config.put("database.server.name" , "dbserver");
config.put("database.include.list" , databaseList);
config.put("table.include.list" , tableList);
config.put("column.include.list" , columnList);
config.put("database.history.kafka.bootstrap.servers" , kafkaServers);
config.put("database.history.kafka.topic" , "dbhistory.console");
//config.put("gtid.new.channel.position" , channelPosition);
//config.put("snapshot.mode" , snapshotMode);
//设置默认topic组属性
config.put("topic.creation.default.replication.factor" , defaultFactor);
config.put("topic.creation.default.partitions" , defaultPartitions);
config.put("topic.creation.default.cleanup.policy" , defaultCleanupPolicy);
config.put("topic.creation.default.compression.type" , defaultCompressionType);
//设置自定义topic组
config.put("topic.creation.groups",topicCreationGroups);
//设置jira topic
setttingTopic(config , jiraTopic,"jira");
obj.put("config" , config);
//注册连接器
HttpPost httpPost = new HttpPost(registerUrl);
httpPost.addHeader("Content-Type", "application/json;charset=UTF-8");
httpPost.addHeader("Accept", "application/json;charset=UTF-8");
StringEntity stringEntity = new StringEntity(mapper.writeValueAsString(obj), "UTF-8");
stringEntity.setContentEncoding("UTF-8");
httpPost.setEntity(stringEntity);
String responseBody = httpclient.execute(httpPost, responseHandler);
System.out.println("连接器:" + connectName + " 完成注册!!");
// System.out.println(responseBody); // for security
System.out.println("------------------------------------------------------------\n");
}
/**
* 设置topic属性
* @param config
* @param topic
* @param topicName
*/
private void setttingTopic(Map config , Topic topic , String topicName) {
config.put("topic.creation." + topicName + ".include",topic.getInclude());
config.put("topic.creation." + topicName + ".replication.factor",topic.getReplicationFactor());
config.put("topic.creation." + topicName + ".partitions",topic.getPartitions());
config.put("topic.creation." + topicName + ".cleanup.policy",topic.getCleanupPolicy());
config.put("topic.creation." + topicName + ".retention.ms",topic.getRetentionMs());
config.put("topic.creation." + topicName + ".compression.type",topic.getCompressionType());
}
}
连接器业务类MongoDB
import com.fasterxml.jackson.databind.ObjectMapper;
import com.paratera.console.datasync.bean.Topic;
import com.paratera.console.datasync.bean.UserprefTopic;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 初始化连接器类
*/
@Component
@Order(value = 13)
@ConfigurationProperties(prefix = "debezium.userpref")
public class UserprefConnector extends Connector implements ApplicationRunner {
/**********************自定义配置部分**********************************/
@Value("${debezium.userpref.connect.name}")
private String connectName;
@Value("${debezium.userpref.mongodb.hosts}")
private String dbHostName;
@Value("${debezium.userpref.mongodb.name}")
private String dbTopicName;
@Value("${debezium.userpref.mongodb.user}")
private String dbUser;
@Value("${debezium.userpref.mongodb.password}")
private String dbPassword;
@Value("${debezium.userpref.mongodb.database.whitelist}")
private String dbList;
@Value("${debezium.userpref.mongodb.table.whitelist}")
private String tableList;
@Value("${debezium.userpref.topic.creation.groups}")
private String topicCreationGroups;
@Autowired
private UserprefTopic userprefTopic;
@Override
public void run(ApplicationArguments applicationArguments) throws Exception {
CloseableHttpClient httpclient = HttpClients.createDefault();
ResponseHandler<String> responseHandler = new ResponseHandler<String>() {
@Override
public String handleResponse(final HttpResponse response)
throws ClientProtocolException, IOException {//
int status = response.getStatusLine().getStatusCode();
if (status >= 200 && status < 300) {
HttpEntity entity = response.getEntity();
return entity != null ? EntityUtils.toString(entity) : null;
} else {
throw new ClientProtocolException(
"Unexpected response status: " + status);
}
}
};
System.out.println("------------------------------------------------------------");
System.out.println("开始查询: " + connectName + " 数据连接器.................");
HttpGet httpGet = new HttpGet(registerUrl);
httpGet.addHeader("Accept", "application/json;charset=UTF-8");
String response = httpclient.execute(httpGet, responseHandler);
ObjectMapper mapper = new ObjectMapper();
List array = mapper.readValue(response , List.class);
if(array.contains(connectName)){
System.out.println("连接器: " +connectName+ " 已存在,判断是否允许覆盖!");
if(overrideEnabled){
HttpDelete httpDelete = new HttpDelete(registerUrl + "/" + connectName);
httpclient.execute(httpDelete, responseHandler);
System.out.println("原连接器已删除!");
}else{
return;
}
}
System.out.println("开始注册" + connectName + "数据连接器.................");
Map obj = new HashMap();
obj.put("name" , connectName);
Map config = new HashMap();
config.put("connector.class" , "io.debezium.connector.mongodb.MongoDbConnector");
config.put("tasks.max" , "1");
config.put("mongodb.hosts" , dbHostName);
config.put("mongodb.name" , dbTopicName);
config.put("database.whitelist",dbList);
config.put("table.whitelist", tableList);
config.put("mongodb.user" , dbUser);
config.put("mongodb.password" , dbPassword);
config.put("database.server.id" , "184054");
config.put("database.server.name" , "dbserver");
config.put("database.history.kafka.bootstrap.servers" , kafkaServers);
config.put("database.history.kafka.topic" , "dbhistory.console");
//设置默认topic组属性
config.put("topic.creation.default.replication.factor" , defaultFactor);
config.put("topic.creation.default.partitions" , defaultPartitions);
config.put("topic.creation.default.cleanup.policy" , defaultCleanupPolicy);
config.put("topic.creation.default.compression.type" , defaultCompressionType);
//设置自定义topic组
config.put("topic.creation.groups",topicCreationGroups);
//设置userpref topic
setttingTopic(config , userprefTopic,"userpref");
obj.put("config" , config);
//注册连接器
HttpPost httpPost = new HttpPost(registerUrl);
httpPost.addHeader("Content-Type", "application/json;charset=UTF-8");
httpPost.addHeader("Accept", "application/json;charset=UTF-8");
StringEntity stringEntity = new StringEntity(mapper.writeValueAsString(obj), "UTF-8");
stringEntity.setContentEncoding("UTF-8");
httpPost.setEntity(stringEntity);
String responseBody = httpclient.execute(httpPost, responseHandler);
System.out.println("连接器:" + connectName + " 完成注册!!");
// System.out.println(responseBody); // for security
System.out.println("------------------------------------------------------------\n");
}
/**
* 设置topic属性
* @param config
* @param topic
* @param topicName
*/
private void setttingTopic(Map config , Topic topic , String topicName) {
config.put("topic.creation." + topicName + ".include",topic.getInclude());
config.put("topic.creation." + topicName + ".replication.factor",topic.getReplicationFactor());
config.put("topic.creation." + topicName + ".partitions",topic.getPartitions());
config.put("topic.creation." + topicName + ".cleanup.policy",topic.getCleanupPolicy());
config.put("topic.creation." + topicName + ".retention.ms",topic.getRetentionMs());
config.put("topic.creation." + topicName + ".compression.type",topic.getCompressionType());
}
}
连接器公用配置
#--------------------debezium注册连接器配置----------------------------------
debezium:
register:
url: ${console.debezium.connect.url}
kafka:
servers: ${console.debezium.kafka.url} #可以多个,逗号间隔
override:
enabled: true
topic:
creation:
#必须指定默认的topic属性
default:
replication:
factor: 1
partitions: 10
cleanup:
policy: compact
compression:
type: lz4
position: earliest #同步坐标,默认(earliest )从第一个有效事务开始,latest从新的事务开始
snapshot:
mode: schema_only
连接器私有配置MySQL
#--------------------debezium注册连接器配置----------------------------------
debezium:
jira:
connect:
name: jira_connector
database:
hostname: ${console.debezium.jira.db.url} #数据库docker容器名,如果不是docker容器,需要写具体的ip或者域名
port: ${console.debezium.jira.db.port}
user: ${console.debezium.jira.db.user}
password: ${console.debezium.jira.db.password}
include:
list: "jira" #可以多个,逗号间隔
table:
include:
#可以多个,逗号间隔
list: "jira.my_ticket_mv"
column:
include:
#可以多个,逗号间隔
list: "jira.my_ticket_mv.ticket_id,jira.my_ticket_mv.state,jira.my_ticket_mv.issuetype,jira.my_ticket_mv.state_code"
topic:
creation:
groups: "jira" #自定义topic组,可多个,逗号间隔,与下面属性配置对应
jira:
include: dbserver\\.jira\\.*
replicationFactor: 1
partitions: 20
cleanupPolicy: delete
retentionMs: 7776000000
compressionType: producer
连接器私有配置MongoDB
#--------------------debezium注册连接器配置----------------------------------
debezium:
userpref:
connect:
name: userpref_connector
mongodb:
hosts: ${console.debezium.userpref.mongodb.hosts}
name: dbserver
user: ${console.debezium.userpref.mongodb.user}
password: ${console.debezium.userpref.mongodb.password}
database:
#可以多个,逗号间隔
whitelist: "userpref"
table:
whitelist: "userpref"
topic:
creation:
groups: "userpref" #自定义topic组,可多个,逗号间隔,与下面属性配置对应
userpref:
include: dbserver\\.userpref\\.*
replicationFactor: 1
partitions: 20
cleanupPolicy: delete
retentionMs: 7776000000
compressionType: producer
7、业务输出
package com.paratera.console.datasync.listener;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.paratera.console.datasync.facade.BizClient;
import com.paratera.console.datasync.model.UserInfo;
import com.paratera.console.datasync.model.UserPref;
import com.paratera.console.datasync.service.UserInfoService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.BasicQuery;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* 消费者监听类
*/
@Component
public class UserprefConsumer {
private Logger logger = LoggerFactory.getLogger(UserprefConsumer.class);
@Autowired
private UserInfoService userInfoService;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private BizClient bizClient;
// 消费监听
@KafkaListener(topics = {"dbserver.userpref.userpref"},groupId = "userpref",properties = {"print.key=true"})
public void onMessage(List<ConsumerRecord> consumerRecords) throws IOException {
ObjectMapper mapper = new ObjectMapper();
Iterator<ConsumerRecord> iterator = consumerRecords.iterator();
while (iterator.hasNext()) {
ConsumerRecord record = iterator.next();
String value = (String) record.value();
String key = (String) record.key();
if (value != null) {
Map jsonObject = mapper.readValue(value, Map.class);
Map payload = (Map) jsonObject.get("payload");
String op = (String) payload.get("op");
switch (op) {
//新增操作
case "c":
//获取到对象信息
Map after = mapper.readValue((String) payload.get("after"),Map.class);
if(after != null){
logger.debug("触发新增操作,需要更新userinfo的数据为:" + after.toString());
String userId = (String) after.get("userId");
Map<String, String> up = (Map) after.get("userPref");
settingUserInfo(userId, up);
};
break;
case "u":
//获取到对象信息
Map patch = mapper.readValue((String) payload.get("patch"),Map.class);
Map filter = mapper.readValue((String) payload.get("filter"),Map.class);
if(patch != null){
logger.debug("触发更新操作,需要更新的数据为:" + patch.toString());
logger.debug("触发更新操作,更新的条件为:" + filter.toString());
//因为debezuim对于更新,删除操作不会把实际的查询条件同步过来,只会同步文档id,所以需要把userId查询出来
UserPref userPref = mongoTemplate.findById(new ObjectId((String) filter.get("_id")), UserPref.class,"userpref");
String userId = userPref.getUserId();
Map<String, String> up = userPref.getUserPref();
settingUserInfo(userId, up);
};
break;
case "d":
if(key != null){
logger.debug("触发删除操作,需要删除的数据为:" + payload.get("filter"));
}
break;
default:
logger.debug("未知操作");
}
}
}
}
private void settingUserInfo(String userId, Map<String, String> up) {
/**
* 设置用户偏好
* 1、如果billingShowMoney为0,billingShowCoreTime为1,则设置2(用量)
* 2、其他情况,设置1(金额)
*/
UserInfo userInfo = new UserInfo();
userInfo.setId(userId);
if(Double.valueOf(String.valueOf(up.get("billingShowMoney"))) == 0.0
&& Double.valueOf(String.valueOf(up.get("billingShowCoreTime"))) == 1.0){
userInfo.setUserPref(2);
}else {
userInfo.setUserPref(1);
}
userInfoService.updateByPrimaryKeySelective(userInfo);
//同时清除用户缓存
bizClient.clearCacheOfUserInfo(userId);
}
}