文章目錄
- 1, oracle(11g) cdc 配置
- 2, 配置 oraclecdc-source
-
- 2.1, redo log對應的topic: 包含執行的sql語句
- 2.2, record event 對應的topic: 一條表資料
- 3, oraclecdc-- > hbase
1, oracle(11g) cdc 配置
oraclecdc-source 配置文檔:https://docs.confluent.io/kafka-connect-oracle-cdc/current/overview.html
#1, Oracle 開啟archeve/supplemental log
SQL> SHUTDOWN IMMEDIATE;
SQL> ALTER DATABASE ARCHIVELOG;
SQL> ALTER DATABASE OPEN;
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
SQL> select name,log_mode from v$database;
NAME LOG_MODE
--------- ------------
HELOWIN ARCHIVELOG
#To enable full supplemental logging for specific tables:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER SESSION SET CONTAINER=<pdb-name>; -- ONLY FOR PDB
ALTER TABLE <schema name>.<table name> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
#2, 建立測試表
SQL> alter user scott account unlock;
SQL> alter user scott identified by scott;
SQL> conn scott/scott
SQL> create table test ( id int , name varchar2(10), mark varchar2(10));
Table created.
#3, 建立cdc 使用者和權限
CREATE ROLE CDC_PRIVS;
GRANT CREATE SESSION TO CDC_PRIVS;
GRANT EXECUTE ON SYS.DBMS_LOGMNR TO CDC_PRIVS;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO CDC_PRIVS;
GRANT SELECT ON V_$DATABASE TO CDC_PRIVS;
GRANT SELECT ON V_$THREAD TO CDC_PRIVS;
GRANT SELECT ON V_$PARAMETER TO CDC_PRIVS;
GRANT SELECT ON V_$NLS_PARAMETERS TO CDC_PRIVS;
GRANT SELECT ON V_$TIMEZONE_NAMES TO CDC_PRIVS;
GRANT SELECT ON ALL_INDEXES TO CDC_PRIVS;
GRANT SELECT ON ALL_OBJECTS TO CDC_PRIVS;
GRANT SELECT ON ALL_USERS TO CDC_PRIVS;
GRANT SELECT ON ALL_CATALOG TO CDC_PRIVS;
GRANT SELECT ON ALL_CONSTRAINTS TO CDC_PRIVS;
GRANT SELECT ON ALL_CONS_COLUMNS TO CDC_PRIVS;
GRANT SELECT ON ALL_TAB_COLS TO CDC_PRIVS;
GRANT SELECT ON ALL_IND_COLUMNS TO CDC_PRIVS;
GRANT SELECT ON ALL_ENCRYPTED_COLUMNS TO CDC_PRIVS;
GRANT SELECT ON ALL_LOG_GROUPS TO CDC_PRIVS;
GRANT SELECT ON ALL_TAB_PARTITIONS TO CDC_PRIVS;
GRANT SELECT ON SYS.DBA_REGISTRY TO CDC_PRIVS;
GRANT SELECT ON SYS.OBJ$ TO CDC_PRIVS;
GRANT SELECT ON DBA_TABLESPACES TO CDC_PRIVS;
GRANT SELECT ON DBA_OBJECTS TO CDC_PRIVS;
GRANT SELECT ON SYS.ENC$ TO CDC_PRIVS;
GRANT SELECT ANY TRANSACTION TO CDC_PRIVS;
GRANT SELECT ON scott.test TO CDC_PRIVS;
CREATE USER CDC_USER IDENTIFIED BY CDC_USER DEFAULT TABLESPACE USERS;
ALTER USER CDC_USER QUOTA UNLIMITED ON USERS;
GRANT CDC_PRIVS to CDC_USER;
GRANT FLASHBACK ANY TABLE TO CDC_USER;
#4, 檢查使用者權限
SQL> set lines 256
SQL> set pages 42
SQL> SELECT GRANTEE, OWNER, TABLE_NAME FROM DBA_TAB_PRIVS
WHERE GRANTEE IN (SELECT granted_role
FROM DBA_ROLE_PRIVS
WHERE GRANTEE = 'CDC_USER')
AND (TABLE_NAME='DBMS_LOGMNR' OR TABLE_NAME='V_$LOGMNR_CONTENTS');
GRANTEE OWNER TABLE_NAME
------------------------------ ------------------------------ ------------------------------
CDC_PRIVS SYS V_$LOGMNR_CONTENTS
CDC_PRIVS SYS DBMS_LOGMNR
2, 配置 oraclecdc-source
注意:
- table.inclusion.regex: ORCL.SCOTT.TEST (執行個體名.使用者名.表名)
- key.template: 預設 ${primaryKeyStructOrValue},即使用primary key (無主鍵可以自定義key使用的字段)
curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '
{
"name": "SimpleOracleCDC_24",
"config":{
"connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
"name": "SimpleOracleCDC_24",
"tasks.max":3,
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.template":"ID,NAME",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schemas.enable": "false",
"confluent.topic.bootstrap.servers":"localhost:9092",
"oracle.server": "192.168.56.117",
"oracle.port": "1522",
"oracle.sid": "helowin",
"oracle.username": "system",
"oracle.password": "system",
"start.from":"snapshot",
"redo.log.topic.name": "SimpleOracleCDC_24",
"redo.log.consumer.bootstrap.servers":"localhost:9092",
"table.inclusion.regex":"helowin.SCOTT.TEST2",
"numeric.mapping":"best_fit",
"_table.topic.name.template_":"Using template vars to set change event topic for each table",
"table.topic.name.template": "${databaseName}24.${schemaName}24.${tableName}24",
"connection.pool.max.size": 20,
"confluent.topic.replication.factor":1,
"topic.creation.groups": "redo",
"topic.creation.redo.include": "SimpleOracleCDC_24",
"topic.creation.redo.replication.factor": 1,
"topic.creation.redo.partitions": 1,
"topic.creation.redo.cleanup.policy": "delete",
"topic.creation.redo.retention.ms": 1209600000,
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 1,
"topic.creation.default.cleanup.policy": "compact"
}
}'

2.1, redo log對應的topic: 包含執行的sql語句
"OPERATION": {
"string": "INSERT"
},
"OPERATION_CODE": {
"int": 1
},
"SQL_REDO": {
"string": "insert into \"SCOTT\".\"TEST2\"(\"ID\",\"NAME\",\"MARK\") values ('3','aa','ee');"
},
2.2, record event 對應的topic: 一條表資料
"ID": {
"bytes": "\u0003"
},
"NAME": {
"string": "aa"
},
"MARK": {
"string": "ee"
},
3, oraclecdc-- > hbase
- oracle插入資料 ( 3,‘aa’, ‘ee’)
Confluent Platform: ksqlDB 實時資料 ETL2 (Oracle-cdc Source)1, oracle(11g) cdc 配置2, 配置 oraclecdc-source3, oraclecdc-- > hbase
ksql> CREATE sink CONNECTOR `www-hbase-sink2` WITH(
"connector.class"='io.svectors.hbase.sink.HBaseSinkConnector',
"zookeeper.quorum"='192.168.56.161:2181',
"event.parser.class"='io.svectors.hbase.parser.AvroEventParser',
"hbase.table.name"='www',
"hbase.www.rowkey.columns"='ID',
"hbase.www.family"='f',
"topics"='helowin24.SCOTT24.TEST224'
);
#檢視hbase資料
hbase(main):037:0> scan 'www'
ROW COLUMN+CELL
java.nio.HeapByteBuffer[pos=0 lim=1 cap column=f:ID, timestamp=1626144115526, value=java.nio.HeapByteBuffer[pos=0 lim=1 cap=1]
=1]
java.nio.HeapByteBuffer[pos=0 lim=1 cap column=f:MARK, timestamp=1626144115526, value=ee
=1]
java.nio.HeapByteBuffer[pos=0 lim=1 cap column=f:NAME, timestamp=1626144115526, value=aa
=1]
java.nio.HeapByteBuffer[pos=0 lim=1 cap column=f:current_ts, timestamp=1626144115526, value=1626144115950
=1]
...
1 row(s) in 0.0060 seconds