Canal Server發送binlog消息到Kafka消息隊列中
- 一、背景
- 二、需要修改的地方
- 1、canal.properties 配置檔案修改
- 1、修改canal.serverMode的值
- 2、修改kafka配置
- 2、修改 instance.propertios 配置檔案
- 3、canal發消息到mq性能優化
- 三、kafka接收消息
- 1、canal 發送過來的消息
- 2、監聽消息
- 3、擷取消息
- 四、MQ配置相關的參數
- 五、MQ接收binlog代碼
- 六、參考文章
一、背景
在上一篇文章中,我們使用 Canal Admin 搭建了Canal Server 叢集,在這篇文章中,我們使用上篇文章的基礎,将消息發送到kafka消息隊列中。
二、需要修改的地方
以下 配置檔案的修改,都是在 Canal Admin 上修改的。
1、canal.properties 配置檔案修改
1、修改canal.serverMode的值
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5yN5UjNyEzN4UDZ5EDM1gTYyYzXxQDOwcTM0EzLcZDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
2、修改kafka配置
2、修改 instance.propertios 配置檔案
3、canal發消息到mq性能優化
影響性能的幾個參數:
-
(表示是否需要提前做序列化,非flatMessage場景需要設定為true)canal.instance.memory.rawEntry = true
-
(false代表二進制協定,true代表使用json格式,二進制協定有更好的性能)canal.mq.flatMessage = false
-
(動态topic配置定義,可以針對不同表設定不同的topic,在flatMessage模式下可以提升并行效率)canal.mq.dynamicTopic
-
(分區配置,對寫入性能有反作用,不過可以提升消費端的吞吐)canal.mq.partitionsNum/canal.mq.partitionHash
參考連結:https://github.com/alibaba/canal/wiki/Canal-MQ-Performance
三、kafka接收消息
1、canal 發送過來的消息
/**
* canal 發送過來的消息
*
* @author huan.fu 2021/9/2 - 下午4:06
*/
@Getter
@Setter
@ToString
public class CanalMessage {
/**
* 測試得出 同一個事物下産生多個修改,這個id的值是一樣的。
*/
private Integer id;
/**
* 資料庫或schema
*/
private String database;
/**
* 表名
*/
private String table;
/**
* 主鍵字段名
*/
private List<String> pkNames;
/**
* 是否是ddl語句
*/
private Boolean isDdl;
/**
* 類型:INSERT/UPDATE/DELETE
*/
private String type;
/**
* binlog executeTime, 執行耗時
*/
private Long es;
/**
* dml build timeStamp, 同步時間
*/
private Long ts;
/**
* 執行的sql,dml sql為空
*/
private String sql;
/**
* 資料清單
*/
private List<Map<String, Object>> data;
/**
* 舊資料清單,用于update,size和data的size一一對應
*/
private List<Map<String, Object>> old;
}
2、監聽消息
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "customer", groupId = "canal-kafka-springboot-001", concurrency = "5")
public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException {
log.info(Thread.currentThread().getName() + ":" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "接收到kafka消息,partition:" + record.partition() + ",offset:" + record.offset() + "value:" + record.value());
CanalMessage canalMessage = JSON.parseObject(record.value(), CanalMessage.class);
log.info("\r=================================");
log.info("接收到的原始 canal message為: {}", record.value());
log.info("轉換成Java對象後轉換成Json為 : {}", JSON.toJSONString(canalMessage));
ack.acknowledge();
}
}
3、擷取消息
四、MQ配置相關的參數
參數名 | 參數說明 | 預設值 |
canal.mq.servers | kafka為bootstrap.servers rocketMQ中為nameserver清單 | 127.0.0.1:6667 |
canal.mq.retries | 發送失敗重試次數 | |
canal.mq.batchSize | kafka為 rocketMQ無意義 | 16384 |
canal.mq.maxRequestSize | kafka為 rocketMQ無意義 | 1048576 |
canal.mq.lingerMs | kafka為 , 如果是flatMessage格式建議将該值調大, 如: 200 rocketMQ無意義 | 1 |
canal.mq.bufferMemory | kafka為 rocketMQ無意義 | 33554432 |
canal.mq.acks | kafka為 rocketMQ無意義 | all |
canal.mq.kafka.kerberos.enable | kafka為 rocketMQ無意義 | false |
canal.mq.kafka.kerberos.krb5FilePath | kafka kerberos認證 rocketMQ無意義 | …/conf/kerberos/krb5.conf |
canal.mq.kafka.kerberos.jaasFilePath | kafka kerberos認證 rocketMQ無意義 | …/conf/kerberos/jaas.conf |
canal.mq.producerGroup | kafka無意義 rocketMQ為ProducerGroup名 | Canal-Producer |
canal.mq.accessChannel | kafka無意義 rocketMQ為channel模式,如果為aliyun則配置為cloud | local |
— | — | — |
canal.mq.vhost= | rabbitMQ配置 | 無 |
canal.mq.exchange= | rabbitMQ配置 | 無 |
canal.mq.username= | rabbitMQ配置 | 無 |
canal.mq.password= | rabbitMQ配置 | 無 |
canal.mq.aliyunuid= | rabbitMQ配置 | 無 |
— | — | — |
canal.mq.canalBatchSize | 擷取canal資料的批次大小 | 50 |
canal.mq.canalGetTimeout | 擷取canal資料的逾時時間 | 100 |
canal.mq.parallelThreadSize | mq資料轉換并行處理的并發度 | 8 |
canal.mq.flatMessage | 是否為json格式 如果設定為false,對應MQ收到的消息為protobuf格式 需要通過CanalMessageDeserializer進行解碼 | false |
— | — | — |
canal.mq.topic | mq裡的topic名 | 無 |
canal.mq.dynamicTopic | mq裡的動态topic規則, 1.1.3版本支援 | 無 |
canal.mq.partition | 單隊列模式的分區下标, | 1 |
canal.mq.partitionsNum | 散列模式的分區數 | 無 |
canal.mq.partitionHash | 散列規則定義 庫名.表名 : 唯一主鍵,比如mytest.person: id 1.1.3版本支援新文法,見下文 |
參考文檔:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart