åè¨
FlinkCDC æ¯ä¸æ¬¾åºäº Change Data Captureï¼CDCï¼ææ¯çæ°æ®åæ¥å·¥å ·ï¼å¯ä»¥ç¨äºå°å ³ç³»åæ°æ®åºä¸çæ°æ®å®æ¶åæ¥å° Flink æµå¤çä¸è¿è¡å®æ¶è®¡ç®ååæï¼ä¸å¾æ¥èªå®ç½çä»ç»ã
ä¸å¾1æ¯ FlinkCDC ä¸å ¶å®å¸¸è§ å¼æº CDC æ¹æ¡ç对æ¯ï¼
å¯ä»¥çè§çæ¯ç¸æ¯äºå ¶å®å¼æºäº§åï¼FlinkCDC ä¸ä» æ¯æå¢éåæ¥ï¼è¿æ¯æå ¨é/å ¨é+å¢éçåæ¥ï¼åæ¶ FlinkCDC è¿æ¯ææ éæ¢å¤ï¼åºäºæ£æ¥ç¹æºå¶å®ç°ï¼ï¼è½å¤å¿«éæ¢å¤æ°æ®åæ¥çè¿åº¦ï¼å¹¶ä¸æ¯æçæ°æ®æºä¹å¾ä¸°å¯[2[ï¼å¨ 2.3 çæ¬å·²æ¯æ MongoDBãMySQLãOceanBaseãOracleãPostgressSQLãSQLServerãTiDBãDb2 çæ°æ®æºï¼ã
æ¬æå°ä»ç» FlinkCDC å¨æ°æ®åæ¥åæ éæ¢å¤çæ¹é¢çå 容ï¼ä»¥ MySQL å Oracle 为ä¾ï¼ï¼åæ¶å®æ´ä»£ç ä¹å·²ä¸ä¼ å°GitHubã
ææå±ç¤º
MySQL
Oracleï¼ç¸æ¯ MySQL 延è¿ä¼ç¨é«ï¼
æ°æ®åºé ç½®
MySQL(5.7)
ä¿®æ¹my.cnfé ç½®æ件ï¼Windows ä¸æ¯ my.ini æ件ï¼ï¼å¢å 以ä¸é ç½®å 容ï¼
[mysqld]
# å¼å¯ binlog
log-bin=mysql-bin
# éæ© ROW 模å¼
binlog-format=ROW
# å¯¹äº MySQL é群, ä¸åèç¹ç server_id å¿
é¡»ä¸å
server_id=1
# è¿ææ¶é´
expire_logs_days=30
Tips: ä¿®æ¹å®æåéè¦éå¯ MySQL æå¡
建åºå»ºè¡¨ï¼
# 建åº
create database flink;
# 建表
create table flink.`user` (
`id` bigint(20) not null,
`username` varchar(20) default null,
`password` varchar(63) default null,
`status` int(2) default null,
`create_time` datetime default null,
primary key (`id`)
) ENGINE = InnoDB default CHARSET = utf8mb4;
å建ç¨æ·å¹¶ææï¼
# å建ç¨æ· flink
CREATE USER flink IDENTIFIED BY 'flink';
# ææ
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink'@'%';
# å° flink åºçæææéææç» flink ç¨æ·
GRANT ALL PRIVILEGES ON flink.* TO 'flink'@'%';
# å·æ°æé
FLUSH PRIVILEGES;
Oracle(11g)
以 DBA 身份è¿æ¥ï¼
# SID éè¦æ ¹æ®å®é
æ
åµè¿è¡è®¾ç½®, æ¯å¦: XE.
export ORACLE_SID=SID
sqlplus /nolog
CONNECT sys/manager AS SYSDBA
é ç½®æ¥å¿ï¼
alter system set db_recovery_file_dest_size = 20G;
# æ¥å¿æ件çå°åå¯ä»¥æ ¹æ®èªå·±çæ
åµè¿è¡è®¾ç½®
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
确认æ¯å¦é ç½®æåï¼
archive log list;
å建ç¨æ·å¹¶ææï¼
CREATE USER flink IDENTIFIED BY flink;
GRANT CREATE SESSION TO flink;
GRANT FLASHBACK ANY TABLE TO flink;
GRANT SELECT ANY TABLE TO flink;
GRANT SELECT_CATALOG_ROLE TO flink;
GRANT EXECUTE_CATALOG_ROLE TO flink;
GRANT SELECT ANY TRANSACTION TO flink;
GRANT CREATE TABLE TO flink;
建表并å¢å æ¥å¿è®°å½ï¼
# 建表
CREATE TABLE flink."user" (
id NUMBER NOT NULL,
username VARCHAR2(20),
password VARCHAR2(63),
status INTEGER,
create_time TIMESTAMP,
PRIMARY KEY(id)
);
# æ¥å¿é
ç½®
ALTER TABLE flink."user" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
代ç é ç½®
è¿è¡ç¯å¢
ä¾èµ | çæ¬ |
Java | 17 |
flink-connector | 2.1.0 |
flink | 1.13.0 |
maven | 3.6.2 |
è¿æ¥é ç½®
flinkcdc:
data-source:
# é»è®¤ç±»å为 MySQL
addr: localhost:3306
database: flink
username: flink
password: flink
table-list:
- user
Tips: å ³äºæ°æ®æºçè¿æ¥å®æ´é ç½®å±æ§å¯åè DataSourceProperties.java æ件ï¼å ³äºæ£æ¥ç¹çé ç½®å¯åè CheckPointProperties.java æ件
æ¢å¤ç¹é ç½®
为äºå®ç°æ éæ¢å¤ï¼åºç¨åæ¢è¿è¡è¿ç¨ä¸æ°æ®åºæå¢å æ¹æä½çæ åµï¼çæ åµï¼éè¦å¨ä»£ç ä¸è¿è¡æ¢å¤ç¹çç¸å ³é ç½®ï¼
// è·åé
ç½®çæ¢å¤ç¹è·¯å¾, é¦æ¬¡è¿è¡ä¸åå¨ä¼é»è®¤è¿è¡å建
var saveDir = checkPointProperties.getSaveDir();
var folder = new File(saveDir);
if (!folder.exists() && !folder.isDirectory()) {
if (!folder.mkdirs()) {
throw new IllegalStateException("æ件夹å建失败");
}
}
var dataSourceType = dataSourceProperties.getType().name().toLowerCase();
var dataSourceSaveDir = saveDir + File.separator + dataSourceType;
var savepointDir = SavepointUtils.getSavepointRestore(dataSourceSaveDir);
var configuration = new Configuration();
if (savepointDir != null) {
// 设置æ¢å¤ç¹è·¯å¾
var savepointRestoreSettings = SavepointRestoreSettings.forPath(savepointDir);
SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, configuration);
}
// å¯ç¨æ£æ¥ç¹å¹¶è®¾ç½®æ£æ¥ç¹çä¿åè·¯å¾
var env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.enableCheckpointing(checkPointProperties.getInterval(), CheckpointingMode.EXACTLY_ONCE);
var checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage(checkPointProperties.getStorageType().getPrefix() + dataSourceSaveDir);
éç¨æ³¨æç¹
为äºé¿å æ°å¼ç±»åæ¾ç¤ºæ¯ä¸å å符串ï¼éè¦å¢å 以ä¸é ç½®ï¼
// è¯¦è§ https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)#%E9%80%9A%E7%94%A8-faq Q5
prop.setProperty("bigint.unsigned.handling.mode","long");
prop.setProperty("decimal.handling.mode","double");
ORACLE é 置注æç¹
为äºé¿å æ¥å¿å¢é¿è¿å¿«ä»¥å读åæ¥å¿æ»¡çé®é¢ï¼éè¦å¢å 以ä¸é ç½®ï¼
// è¯¦è§ https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)#oracle-cdc-faq Q1
prop.setProperty("log.mining.strategy", "online_catalog");
prop.setProperty("log.mining.continuous.mine", "true");
å¯¹äº Oracle 11gï¼è¿æ¥é ç½®ä¸éè¦å¢å ï¼
// è¯¦è§ https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)#oracle-cdc-faq Q2
prop.setProperty("database.tablename.case.insensitive", "false");
项ç®è¿è¡å使ç¨ä»ç»
ä¸è½½ä»£ç
ç±äºæ¬äººå°å客ç¸å ³ç示ä¾ä»£ç é½éä¸å°äºä¸ä¸ªä»åºï¼å æ¤å¦æä¸æ³æåæ´ä¸ªä»åºï¼æ¨è使ç¨GitZip for githubè¿ä¸ªæ件ï¼å°±å¯ä»¥åªä¸è½½é¨åçæ件ï¼éä¸æå®æ件åç¹å»å³ä¸è§çä¸è½½æé®ï¼ï¼
使ç¨ä»ç»
对äºéè¦çæ§ç表ï¼åªéè¦å建ç¸åºçå®ä½ç±»ï¼å¹¶æ°å»ºä¸ä¸ªç±»ç»§æ¿AbstractMessageListenerï¼å¯éåå ¶ä¸ç createãdeleteãupdateãreadçæ¹æ³å¤çç¸åºçäºä»¶ï¼å³å¯ï¼å ¶ä¸ FlickCdcMessageListener 注解å çåæ°å¡«ç¸åºç表åå³å¯çå¬ç¸åºç表åæ´äºä»¶ï¼åæ¶éè¦å¨ yaml æä»¶ä¸ tableList ä¸å¢å è¦çå¬ç表ï¼å¦ææ¯ Oracle æ°æ®åºè¿éè¦å¢å æ¥å¿é ç½®ï¼ï¼
import cn.butterfly.flinkcdc.annotation.FlickCdcMessageListener;
import cn.butterfly.flinkcdc.pojo.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* ç¨æ·è¡¨æ¶æ¯çå¬å¨
*
* @author zjw
* @date 2023-03-14
*/
@Slf4j
@Component
@FlickCdcMessageListener("user")
public class UserMessageListener extends AbstractMessageListener<User> {
@Override
public void create(User user) {
log.info("æ°å¢ç¨æ·: {}", user);
}
}
å ¶å®æ³¨æç¹
- FlinkCDC é»è®¤çåæ¥çç¥æ¯ç¬¬ä¸æ¬¡è¿è¡å è¿è¡å ¨éåæ¥ï¼åç»å³å¯è¿è¡å¢é读åï¼å æ¤è¡¨æ°æ®éæ¯è¾å¤§çæ¶åï¼éå AbstractMessageListener#read æ¹æ³æ¶éè¦ç¹å«æ³¨æå¤ç大éæ°æ®çæ åµã
- ç±äº Flink CDC æ¯æ ¹æ®æ°æ®åºçäºå¡æ¥å¿æ¥è·åæ°æ®æ´æ¹çï¼å¦ææ¢å¤ç¹ä¹ååçäºæ°æ®æ´æ¹ï¼é£ä¹å¨æ¢å¤ç¹ä¹åçæ°æ®å°è¢«éå¤è¯»åï¼å æ¤éè¦èèéå¤è¯»åçæ åµã
æ»ç»
æ¬æç®åä»ç»äº FlinkCDC çæ°æ®åæ¥åæ éæ¢å¤æ¹é¢çå 容ï¼å¯¹ç¸å ³åºç¡ç¥è¯è¿è¡äºçç¥