天天看点

Confluent Platform: ksqlDB 实时数据 ETL2 (Oracle-cdc Source)1, oracle(11g) cdc 配置2, 配置 oraclecdc-source3, oraclecdc-- > hbase

文章目录

  • 1, oracle(11g) cdc 配置
  • 2, 配置 oraclecdc-source
    • 2.1, redo log对应的topic: 包含执行的sql语句
    • 2.2, record event 对应的topic: 一条表数据
  • 3, oraclecdc-- > hbase

1, oracle(11g) cdc 配置

oraclecdc-source 配置文档:https://docs.confluent.io/kafka-connect-oracle-cdc/current/overview.html

#1, Oracle 开启archeve/supplemental log
SQL> SHUTDOWN IMMEDIATE;

SQL> ALTER DATABASE ARCHIVELOG;
SQL> ALTER DATABASE OPEN;
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

SQL> select name,log_mode from v$database;
NAME      LOG_MODE
--------- ------------
HELOWIN   ARCHIVELOG
#To enable full supplemental logging for specific tables:
	ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
	ALTER SESSION SET CONTAINER=<pdb-name>; -- ONLY FOR PDB
	ALTER TABLE <schema name>.<table name> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

#2, 创建测试表
SQL> alter user scott account unlock;
SQL> alter user scott identified by scott;
SQL> conn scott/scott
SQL> create table test ( id int , name varchar2(10), mark varchar2(10));
Table created.

#3, 创建cdc 用户和权限
CREATE ROLE CDC_PRIVS;
GRANT CREATE SESSION TO CDC_PRIVS;
GRANT EXECUTE ON SYS.DBMS_LOGMNR TO CDC_PRIVS;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO CDC_PRIVS;
GRANT SELECT ON V_$DATABASE TO CDC_PRIVS;
GRANT SELECT ON V_$THREAD TO CDC_PRIVS;
GRANT SELECT ON V_$PARAMETER TO CDC_PRIVS;
GRANT SELECT ON V_$NLS_PARAMETERS TO CDC_PRIVS;
GRANT SELECT ON V_$TIMEZONE_NAMES TO CDC_PRIVS;
GRANT SELECT ON ALL_INDEXES TO CDC_PRIVS;
GRANT SELECT ON ALL_OBJECTS TO CDC_PRIVS;
GRANT SELECT ON ALL_USERS TO CDC_PRIVS;
GRANT SELECT ON ALL_CATALOG TO CDC_PRIVS;
GRANT SELECT ON ALL_CONSTRAINTS TO CDC_PRIVS;
GRANT SELECT ON ALL_CONS_COLUMNS TO CDC_PRIVS;
GRANT SELECT ON ALL_TAB_COLS TO CDC_PRIVS;
GRANT SELECT ON ALL_IND_COLUMNS TO CDC_PRIVS;
GRANT SELECT ON ALL_ENCRYPTED_COLUMNS TO CDC_PRIVS;
GRANT SELECT ON ALL_LOG_GROUPS TO CDC_PRIVS;
GRANT SELECT ON ALL_TAB_PARTITIONS TO CDC_PRIVS;
GRANT SELECT ON SYS.DBA_REGISTRY TO CDC_PRIVS;
GRANT SELECT ON SYS.OBJ$ TO CDC_PRIVS;
GRANT SELECT ON DBA_TABLESPACES TO CDC_PRIVS;
GRANT SELECT ON DBA_OBJECTS TO CDC_PRIVS;
GRANT SELECT ON SYS.ENC$ TO CDC_PRIVS;
GRANT SELECT ANY TRANSACTION TO CDC_PRIVS;
GRANT SELECT ON scott.test TO CDC_PRIVS;

CREATE USER CDC_USER IDENTIFIED BY CDC_USER DEFAULT TABLESPACE USERS;
ALTER USER CDC_USER QUOTA UNLIMITED ON USERS;
GRANT CDC_PRIVS to CDC_USER;
GRANT FLASHBACK ANY TABLE TO CDC_USER;

#4, 检查用户权限
SQL> set lines 256
SQL> set pages 42
SQL> SELECT GRANTEE, OWNER, TABLE_NAME FROM DBA_TAB_PRIVS
WHERE GRANTEE IN (SELECT granted_role
                  FROM DBA_ROLE_PRIVS
                  WHERE GRANTEE = 'CDC_USER')
AND (TABLE_NAME='DBMS_LOGMNR' OR TABLE_NAME='V_$LOGMNR_CONTENTS');  

GRANTEE                        OWNER                          TABLE_NAME
------------------------------ ------------------------------ ------------------------------
CDC_PRIVS                      SYS                            V_$LOGMNR_CONTENTS
CDC_PRIVS                      SYS                            DBMS_LOGMNR
           

2, 配置 oraclecdc-source

注意:

  • table.inclusion.regex: ORCL.SCOTT.TEST (实例名.用户名.表名)
  • key.template: 默认 ${primaryKeyStructOrValue},即使用primary key (无主键可以自定义key使用的字段)
curl -s -X POST -H 'Content-Type: application/json'  http://localhost:8083/connectors  -d '
{
 "name": "SimpleOracleCDC_24",
 "config":{
   "connector.class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector",
   "name": "SimpleOracleCDC_24",
   "tasks.max":3,
   "key.converter":"org.apache.kafka.connect.storage.StringConverter",
   "key.template":"ID,NAME",
 
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url": "http://localhost:8081",
   "value.converter.schemas.enable": "false",
   "confluent.topic.bootstrap.servers":"localhost:9092",

    "oracle.server": "192.168.56.117",
    "oracle.port": "1522",
    "oracle.sid": "helowin",
    "oracle.username": "system",
    "oracle.password": "system",
   "start.from":"snapshot",

   "redo.log.topic.name": "SimpleOracleCDC_24",
   "redo.log.consumer.bootstrap.servers":"localhost:9092",
   "table.inclusion.regex":"helowin.SCOTT.TEST2",
   "numeric.mapping":"best_fit",
   "_table.topic.name.template_":"Using template vars to set change event topic for each table",
   "table.topic.name.template": "${databaseName}24.${schemaName}24.${tableName}24",
   "connection.pool.max.size": 20,
   "confluent.topic.replication.factor":1,
   "topic.creation.groups": "redo",
   "topic.creation.redo.include": "SimpleOracleCDC_24",
   "topic.creation.redo.replication.factor": 1,
   "topic.creation.redo.partitions": 1,
   "topic.creation.redo.cleanup.policy": "delete",
   "topic.creation.redo.retention.ms": 1209600000,
   "topic.creation.default.replication.factor": 1,
   "topic.creation.default.partitions": 1,
   "topic.creation.default.cleanup.policy": "compact"
 }
}'
           
Confluent Platform: ksqlDB 实时数据 ETL2 (Oracle-cdc Source)1, oracle(11g) cdc 配置2, 配置 oraclecdc-source3, oraclecdc-- &gt; hbase

2.1, redo log对应的topic: 包含执行的sql语句

"OPERATION": {
    "string": "INSERT"
  },
  "OPERATION_CODE": {
    "int": 1
  },
 "SQL_REDO": {
    "string": "insert into \"SCOTT\".\"TEST2\"(\"ID\",\"NAME\",\"MARK\") values ('3','aa','ee');"
  },
           
Confluent Platform: ksqlDB 实时数据 ETL2 (Oracle-cdc Source)1, oracle(11g) cdc 配置2, 配置 oraclecdc-source3, oraclecdc-- &gt; hbase

2.2, record event 对应的topic: 一条表数据

"ID": {
    "bytes": "\u0003"
  },
  "NAME": {
    "string": "aa"
  },
  "MARK": {
    "string": "ee"
  },
           
Confluent Platform: ksqlDB 实时数据 ETL2 (Oracle-cdc Source)1, oracle(11g) cdc 配置2, 配置 oraclecdc-source3, oraclecdc-- &gt; hbase

3, oraclecdc-- > hbase

  • oracle插入数据 ( 3,‘aa’, ‘ee’)
    Confluent Platform: ksqlDB 实时数据 ETL2 (Oracle-cdc Source)1, oracle(11g) cdc 配置2, 配置 oraclecdc-source3, oraclecdc-- &gt; hbase
ksql> CREATE sink CONNECTOR `www-hbase-sink2` WITH(
    "connector.class"='io.svectors.hbase.sink.HBaseSinkConnector',
    "zookeeper.quorum"='192.168.56.161:2181',
    "event.parser.class"='io.svectors.hbase.parser.AvroEventParser',
    "hbase.table.name"='www',
    "hbase.www.rowkey.columns"='ID',
    "hbase.www.family"='f',
    "topics"='helowin24.SCOTT24.TEST224'
    );

#查看hbase数据
hbase(main):037:0> scan 'www'
ROW                                      COLUMN+CELL
 java.nio.HeapByteBuffer[pos=0 lim=1 cap column=f:ID, timestamp=1626144115526, value=java.nio.HeapByteBuffer[pos=0 lim=1 cap=1]
 =1]
 java.nio.HeapByteBuffer[pos=0 lim=1 cap column=f:MARK, timestamp=1626144115526, value=ee
 =1]
 java.nio.HeapByteBuffer[pos=0 lim=1 cap column=f:NAME, timestamp=1626144115526, value=aa
 =1]
 java.nio.HeapByteBuffer[pos=0 lim=1 cap column=f:current_ts, timestamp=1626144115526, value=1626144115950
 =1]
...
1 row(s) in 0.0060 seconds