天天看點

flink1.11從kafka寫到kafka table_基于OGG 實作Oracle到Kafka增量資料實時同步

背景

在大資料時代,存在大量基于資料的業務。資料需要在不同的系統之間流動、整合。通常,核心業務系統的資料存在OLTP資料庫系統中,其它業務系統需要擷取OLTP系統中的資料。傳統的數倉通過批量資料同步的方式,定期從OLTP系統中抽取資料。但是随着業務需求的更新,批量同步無論從實時性,還是對線上OLTP系統的抽取壓力,都無法滿足要求。需要實時從OLTP系統中擷取資料變更,實時同步到下遊業務系統。

本文基于Oracle OGG,介紹一種将Oracle資料庫的資料實時同步到Kafka消息隊列的方法。

Kafka是一種高效的消息隊列實作,通過訂閱kafka的消息隊列,下遊系統可以實時擷取線上Oracle系統的資料變更情況,實作業務系統。

環境介紹

元件版本

flink1.11從kafka寫到kafka table_基于OGG 實作Oracle到Kafka增量資料實時同步

整體架構圖

flink1.11從kafka寫到kafka table_基于OGG 實作Oracle到Kafka增量資料實時同步

名詞解釋

1.OGG Manager

OGG Manager用于配置和管理其它OGG元件,配置資料抽取、資料推送、資料複制,啟動和停止相關元件,檢視相關元件的運作情況。

2.資料抽取(Extract)

抽取源端資料庫的變更(DML, DDL)。資料抽取主要分如下幾種類型:

本地抽取

從本地資料庫捕獲增量變更資料,寫入到本地Trail檔案

資料推送(Data Pump)

從本地Trail檔案讀取資料,推送到目标端。

初始資料抽取

從資料庫表中導出全量資料,用于初次資料加載

3.資料推送(Data Pump)

Data Pump是一種特殊的資料抽取(Extract)類型,從本地Trail檔案中讀取資料,并通過網絡将資料發送到目标端OGG

4.Trail檔案

資料抽取從源端資料庫抓取到的事物變更資訊會寫入到Trail檔案。

5.資料接收(Collector)

資料接收程式運作在目标端機器,用于接收Data Pump發送過來的Trail日志,并将資料寫入到本地Trail檔案。

6.資料複制(Replicat)

資料複制運作在目标端機器,從Trail檔案讀取資料變更,并将變更資料應用到目标端資料存儲系統。本案例中,資料複制将資料推送到kafka消息隊列。

7.檢查點(Checkpoint)

檢查點用于記錄資料庫事物變更。

操作步驟

源端Oracle配置

1.檢查歸檔

使用OGG,需要在源端開啟歸檔日志

SQL> archive log list;

Database log mode Archive Mode

Automatic archival Enabled

Archive destination /u01/app/oracle/product/12.2.0/db_1/dbs/arch

Oldest online log sequence 2576

Next log sequence to archive 2577

Current log sequence 2577

2.檢查資料庫配置

SQL> select force_logging, supplemental_log_data_min from v$database;

FORCE_LOGG SUPPLEMENTAL_LOG_DATA_MI

---------- ------------------------

YES YES

如果沒有開啟輔助日志,需要開啟:

SQL> alter database force logging;

SQL> alter database add supplemental log data;

3.開啟goldengate複制參數

SQL> alter system set enable_goldengate_replication = true;

4.建立源端Oracle賬号

SQL> create tablespace tbs_ogg datafile '/oradata/dtstack/tbs_ogg.dbf' size 1024M autoextend on;

SQL> create user ggsadmin identified by oracle default tablespace tbs_ogg;

SQL> grant dba to ggsadmin;

5.建立測試表

SQL> create table baiyang.ora_to_kfk as select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id < 500;

SQL> alter table baiyang.ora_to_kfk add constraint pk_kfk_obj primary key(object_id);

SQL> select count(*) from baiyang.ora_to_kfk;

COUNT(*)

----------

436

源端OGG配置

1.檢查源端OGG環境

cd /oradata/oggorcl/ogg

./ggsci

GGSCI (dtproxy) 1> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER STOPPED

2.建立相關檔案夾

GGSCI (dtproxy) 2> create subdirs

Creating subdirectories under current directory /oradata/oggorcl/ogg

Parameter file /oradata/oggorcl/ogg/dirprm: created.

Report file /oradata/oggorcl/ogg/dirrpt: created.

Checkpoint file /oradata/oggorcl/ogg/dirchk: created.

Process status files /oradata/oggorcl/ogg/dirpcs: created.

SQL script files /oradata/oggorcl/ogg/dirsql: created.

Database definitions files /oradata/oggorcl/ogg/dirdef: created.

Extract data files /oradata/oggorcl/ogg/dirdat: created.

Temporary files /oradata/oggorcl/ogg/dirtmp: created.

Credential store files /oradata/oggorcl/ogg/dircrd: created.

Masterkey wallet files /oradata/oggorcl/ogg/dirwlt: created.

Dump files /oradata/oggorcl/ogg/dirdmp: created.

3.配置源端Manager

GGSCI (dtproxy) 4> dblogin userid ggsadmin password oracle

Successfully logged into database.

GGSCI (dtproxy as [email protected]) 5> edit param ./globals

-- 添加

oggschema ggsadmin

GGSCI (dtproxy as [email protected]) 6> edit param mgr

-- 添加

PORT 7810 --預設監聽端口

DYNAMICPORTLIST 7811-7820 --動态端口清單

AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --程序有問題,每3分鐘重新開機一次,一共重新開機五次

PURGEOLDEXTRACTS ./dirdat

LAGREPORTHOURS 1 --每隔一小時檢查一次傳輸延遲情況

LAGINFOMINUTES 30 --傳輸延時超過30分鐘将寫入錯誤日志

LAGCRITICALMINUTES 45 --傳輸延時超過45分鐘将寫入警告日志

PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 --定期清理trail檔案

ACCESSRULE, PROG *, IPADDR 172.*.*.*, ALLOW --設定172網段可連接配接

-- 添加同步的表

GGSCI (dtproxy as [email protected]) 9> add trandata baiyang.ora_to_kfk

-- Oracle Goldengate marked following column as key columns on table BAIYANG.ORA_TO_KFK: OBJECT_ID.

GGSCI (dtproxy as [email protected]) 10> info trandata baiyang.ora_to_kfk

-- Prepared CSN for table BAIYANG.ORA_TO_KFK: 192881239

目标端OGG配置

1.目标端檢查環境

GGSCI (172-16-101-242) 1> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER STOPPED

2.建立目錄

GGSCI (172-16-101-242) 2> create subdirs

Creating subdirectories under current directory /app/ogg

Parameter file /app/ogg/dirprm: created.

Report file /app/ogg/dirrpt: created.

Checkpoint file /app/ogg/dirchk: created.

Process status files /app/ogg/dirpcs: created.

SQL script files /app/ogg/dirsql: created.

Database definitions files /app/ogg/dirdef: created.

Extract data files /app/ogg/dirdat: created.

Temporary files /app/ogg/dirtmp: created.

Credential store files /app/ogg/dircrd: created.

Masterkey wallet files /app/ogg/dirwlt: created.

Dump files /app/ogg/dirdmp: created.

3.目标端Manager配置

GGSCI (172-16-101-242) 3> edit params mgr

-- 添加

PORT 7810

DYNAMICPORTLIST 7811-7820

AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3

PURGEOLDEXTRACTS ./dirdat

vi custom_kafka_producer.properties

-- 添加

bootstrap.servers=172.16.101.242:9092

acks=1

compression.type=gzip

reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

batch.size=102400

linger.ms=10000

5.源端開啟全量資料抽取

-- 源端

GGSCI (dtproxy) 20> start mgr

GGSCI (dtproxy) 21> start initkfk

6.目标端全量資料應用

GGSCI (172-16-101-242) 13> start mgr

./replicat paramfile ./dirprm/initkfk.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD

7.kafka資料驗證

使用kafka用戶端工具檢視topic的資料

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_ogg --from-beginning

{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:55.946000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"C_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":2,"DATA_OBJECT_ID":2,"OBJECT_TYPE":"CLUSTER"}}

{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:56.289000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"I_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":3,"DATA_OBJECT_ID":3,"OBJECT_TYPE":"INDEX"}}

全量資料已經同步到目标kafka topic

增量資料同步

1.源端抽取程序配置

GGSCI (dtproxy) 9> edit param extkfk

-- 添加

dynamicresolution

SETENV (ORACLE_SID = "dtstack")

SETENV (NLS_LANG = "american_america.AL32UTF8")

userid ggsadmin,password oracle

exttrail ./dirdat/to

table baiyang.ora_to_kfk;

-- 添加extract程序

GGSCI (dtproxy) 10> add extract extkfk,tranlog,begin now

-- 添加trail檔案的定義與extract程序綁定

GGSCI (dtproxy) 11> add exttrail ./dirdat/to,extract extkfk

2.源端資料推送程序配置

-- 配置源端推送程序

GGSCI (dtproxy) 12> edit param pupkfk

-- 添加

extract pupkfk

passthru

dynamicresolution

userid ggsadmin,password oracle

rmthost 172.16.101.242 mgrport 7810

rmttrail ./dirdat/to

table baiyang.ora_to_kfk;

-- 添加extract程序

GGSCI (dtproxy) 13> add extract pupkfk,exttrailsource /oradata/oggorcl/ogg/dirdat/to

-- 添加trail檔案的定義與extract程序綁定

GGSCI (dtproxy) 14> add rmttrail ./dirdat/to,extract pupkfk

3.配置目标端恢複程序

-- 配置目标端恢複程序

edit param repkfk

-- 添加

REPLICAT repkfk

SOURCEDEFS ./dirdef/define_kfk.txt

targetdb libfile libggjava.so set property=./dirprm/kafka.props

REPORTCOUNT EVERY 1 MINUTES, RATE

GROUPTRANSOPS 10000

MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk;

--添加trail檔案到replicate程序

add replicat repkfk exttrail ./dirdat/to,checkpointtable ggsadmin.checkpoint

4.源端開啟實時資料抓取

./ggsci

GGSCI (dtproxy) 5> start extkfk

Sending START request to MANAGER ...

EXTRACT EXTKFK starting

GGSCI (dtproxy) 6> start pupkfk

Sending START request to MANAGER ...

EXTRACT PUPKFK starting

GGSCI (dtproxy) 7> status all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

EXTRACT RUNNING EXTKFK 00:00:00 00:00:10

EXTRACT RUNNING PUPKFK 00:00:00 00:00:00

5.目标端開啟實時資料同步

./ggsci

GGSCI (172-16-101-242) 7> start replicat repkfk

Sending START request to MANAGER ...

REPLICAT REPKFK starting

GGSCI (172-16-101-242) 8> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

REPLICAT RUNNING REPKFK 00:00:00 00:00:00

6.測試增量資料同步

Oracle插入增量資料

SQL> insert into baiyang.ora_to_kfk select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id >500 and object_id < 1000;

SQL> commit;

SQL> select count(*) from baiyang.ora_to_kfk;

COUNT(*)

----------

905

檢視Kafka消息隊列消費資料

{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042000","pos":"00000000000000075298","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS","SUBOBJECT_NAME":null,"OBJECT_ID":998,"DATA_OBJECT_ID":998,"OBJECT_TYPE":"TABLE"}}

{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042001","pos":"00000000000000075459","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS_I","SUBOBJECT_NAME":null,"OBJECT_ID":999,"DATA_OBJECT_ID":999,"OBJECT_TYPE":"INDEX"}}

源端Oracle删除資料

SQL> delete from baiyang.ora_to_kfk ;

906 rows deleted.

SQL> commit;

檢視kafka消息隊列消費資料

{"table":"BAIYANG.ORA_TO_KFK","op_type":"D","op_ts":"2019-11-11 21:13:11.166184","current_ts":"2019-11-11T21:13:17.449007","pos":"00000000000000216645","before":{"OWNER":"x1","OBJECT_NAME":"SSSSS","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}

源端插入資料

SQL> insert into baiyang.ora_to_kfk values('漢字', 'y1', 'z1', 111000,2000,'x1');

1 row created.

SQL> commit;

檢視kafka消息隊列消費資料

{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:14:21.167454","current_ts":"2019-11-11T21:14:26.497000","pos":"00000000000000216794","after":{"OWNER":"漢字","OBJECT_NAME":"y1","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}

總結

使用OGG可以友善地将Oracle的資料變更情況實時同步到Kafka消息隊列。下遊業務系統通過訂閱kafka的消息隊列,能友善地實作各類實時資料的應用。