利用ogg實作oracle到kafka的增量資料實時同步 2018-05-23
前言 ogg即Oracle GoldenGate是Oracle的同步工具,本文講如何配置ogg以實作Oracle資料庫增量資料實時同步到kafka中,其中同步消息格式為json。
下面是我的源端和目标端的一些配置資訊:
- 版本 OGG版本 ip 别名 源端 OracleRelease 11.2.0.1.0 Oracle GoldenGate 11.2.1.0.3 for Oracle on Linux x86-64 192.168.44.128 master 目标端 kafka_2.11-1.1.0 Oracle GoldenGate for Big Data 12.3.1.1.1 on Linux x86-64 192.168.44.129 slave1
1、下載下傳 可在這裡或舊版本查詢下載下傳
注意:源端和目标端的檔案不一樣,目标端需要下載下傳Oracle GoldenGate for Big Data,源端需要下載下傳Oracle GoldenGate for Oracle具體下載下傳方法見最後的附錄截圖。
2、源端(Oracle)配置 注意:源端是安裝了oracle的機器,oracle環境變量之前都配置好了
2.1 解壓 先建立ogg目錄
1
2
mkdir -p /opt/ogg
unzip V34339-01.zip
解壓後得到一個tar包,再解壓這個tar
1
2
tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /opt/ogg
chown -R oracle:oinstall /opt/ogg (使oracle使用者有ogg的權限,後面有些需要在oracle使用者下執行才能成功)
2.2 配置ogg環境變量 為了簡單友善起見,我在/etc/profile裡配置的,建議在生産中配置oracle的環境變量檔案/home/oracle/.bash_profile裡配置,為了怕出問題,我把OGG_HOME等環境變量在/etc/profile配置了一份,不知道這是否是必須的。
1
2
3
export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=$ORACLE_HOME/lib:/usr/lib
export PATH=$OGG_HOME:$PATH
使之生效
測試一下ogg指令
如果指令成功即可進行下一步,不成功請檢查前面的步驟。
2.3 oracle打開歸檔模式 1
2
su - oracle
sqlplus / as sysdba
執行下面的指令檢視目前是否為歸檔模式
1
2
3
4
5
6
SQL> archive log list
Database log mode No Archive Mode
Automatic archival Disabled
Archive destination USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence 12
Current log sequence 14
若為Disabled,手動打開即可
1
2
3
4
5
6
conn / as sysdba (以DBA身份連接配接資料庫)
shutdown immediate (立即關閉資料庫)
startup mount (啟動執行個體并加載資料庫,但不打開)
alter database archivelog; (更改資料庫為歸檔模式)
alter database open; (打開資料庫)
alter system archive log start; (啟用自動歸檔)
再執行一下
1
2
3
4
5
6
Database log mode Archive Mode
Automatic archival Enabled
Archive destination USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence 12
Next log sequence to archive 14
Current log sequence 14
可以看到為Enabled,則成功打開歸檔模式。
2.4 Oracle打開日志相關 OGG基于輔助日志等進行實時傳輸,故需要打開相關日志確定可擷取事務内容,通過下面的指令檢視該狀态
1
select force_logging, supplemental_log_data_min from v$database;
1
2
3
FORCE_ SUPPLEMENTAL_LOG
------ ----------------
NO NO
若為NO,則需要通過指令修改
1
2
alter database force logging;
alter database add supplemental log data;
再檢視一下為YES即可
1
2
3
4
5
SQL> select force_logging, supplemental_log_data_min from v$database;
FORCE_ SUPPLEMENTAL_LOG
------ ----------------
YES YES
2.5 oracle建立複制使用者 首先root使用者建立相關檔案夾,并賦予權限
1
2
mkdir -p /u01/app/oracle/oggdata/orcl
chown -R oracle:oinstall /u01/app/oracle/oggdata/orcl
然後執行下面sql
1
2
3
4
5
6
7
8
9
10
11
SQL> create tablespace oggtbs datafile '/u01/app/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;
Tablespace created.
SQL> create user ogg identified by ogg default tablespace oggtbs;
User created.
SQL> grant dba to ogg;
Grant succeeded.
2.6 OGG初始化 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ggsci
Oracle GoldenGate Command Interpreter for Oracle
Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO
Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21
Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
GGSCI (ambari.master.com) 1> create subdirs
Creating subdirectories under current directory /root
Parameter files /root/dirprm: created
Report files /root/dirrpt: created
Checkpoint files /root/dirchk: created
Process status files /root/dirpcs: created
SQL script files /root/dirsql: created
Database definitions files /root/dirdef: created
Extract data files /root/dirdat: created
Temporary files /root/dirtmp: created
Stdout files /root/dirout: created
GGSCI (ambari.master.com) 2>
2.7 Oracle建立測試表 建立一個使用者,在該使用者下建立測試表,使用者名、密碼、表名均為 test_ogg。
1
2
3
4
create user test_ogg identified by test_ogg default tablespace users;
grant dba to test_ogg;
conn test_ogg/test_ogg;
create table test_ogg(id int ,name varchar(20),primary key(id));
3 目标端(kafka)配置 1
2
3
mkdir -p /opt/ogg
unzip 123111_ggs_Adapters_Linux_x64.zip
tar xf ggs_Adapters_Linux_x64.tar -C /opt/ogg/
3.2 環境變量 1
2
3
export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib
export PATH=$OGG_HOME:$PATH
同樣測試一下ogg指令
3.3 初始化目錄 4、OGG源端配置 4.1 配置OGG的全局變量 先切換到oracle使用者下
1
2
3
su oracle
cd /opt/ogg
ggsci
1
2
3
4
GGSCI (ambari.master.com) 1> dblogin userid ogg password ogg
Successfully logged into database.
GGSCI (ambari.master.com) 2> edit param ./globals
然後和用vim編輯一樣添加
4.2 配置管理器mgr 1
2
3
4
5
GGSCI (ambari.master.com) 3> edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
說明:PORT即mgr的預設監聽端口;DYNAMICPORTLIST動态端口清單,當指定的mgr端口不可用時,會在這個端口清單中選擇一個,最大指定範圍為256個;AUTORESTART重新開機參數設定表示重新開機所有EXTRACT程序,最多5次,每次間隔3分鐘;PURGEOLDEXTRACTS即TRAIL檔案的定期清理
4.3 添加複制表 1
2
3
4
5
6
7
8
9
GGSCI (ambari.master.com) 4> add trandata test_ogg.test_ogg
Logging of supplemental redo data enabled for table TEST_OGG.TEST_OGG.
GGSCI (ambari.master.com) 5> info trandata test_ogg.test_ogg
Logging of supplemental redo log data is enabled for table TEST_OGG.TEST_OGG.
Columns supplementally logged for table TEST_OGG.TEST_OGG: ID
4.4 配置extract程序 1
2
3
4
5
6
7
8
GGSCI (ambari.master.com) 6> edit param extkafka
extract extkafka
dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ogg,password ogg
exttrail /opt/ogg/dirdat/to
table test_ogg.test_ogg;
說明:第一行指定extract程序名稱;dynamicresolution動态解析;SETENV設定環境變量,這裡分别設定了Oracle資料庫以及字元集;userid ggs,password ggs即OGG連接配接Oracle資料庫的帳号密碼,這裡使用2.5中特意建立的複制帳号;exttrail定義trail檔案的儲存位置以及檔案名,注意這裡檔案名隻能是2個字母,其餘部分OGG會補齊;table即複制表的表名,支援*通配,必須以;結尾
添加extract程序:
1
2
GGSCI (ambari.master.com) 16> add extract extkafka,tranlog,begin now
EXTRACT added.
(注:若報錯
1
ERROR: Could not create checkpoint file /opt/ogg/dirchk/EXTKAFKA.cpe (error 2, No such file or directory).
執行下面的指令再重新添加即可。
)
添加trail檔案的定義與extract程序綁定:
1
2
GGSCI (ambari.master.com) 17> add exttrail /opt/ogg/dirdat/to,extract extkafka
EXTTRAIL added.
4.5 配置pump程序 pump程序本質上來說也是一個extract,隻不過他的作用僅僅是把trail檔案傳遞到目标端,配置過程和extract程序類似,隻是邏輯上稱之為pump程序
1
2
3
4
5
6
7
8
GGSCI (ambari.master.com) 18> edit param pukafka
extract pukafka
passthru
dynamicresolution
userid ogg,password ogg
rmthost 192.168.44.129 mgrport 7809
rmttrail /opt/ogg/dirdat/to
table test_ogg.test_ogg;
說明:第一行指定extract程序名稱;passthru即禁止OGG與Oracle互動,我們這裡使用pump邏輯傳輸,故禁止即可;dynamicresolution動态解析;userid ogg,password ogg即OGG連接配接Oracle資料庫的帳号密碼rmthost和mgrhost即目标端(kafka)OGG的mgr服務的位址以及監聽端口;rmttrail即目标端trail檔案存儲位置以及名稱。
分别将本地trail檔案和目标端的trail檔案綁定到extract程序:
1
2
3
4
GGSCI (ambari.master.com) 1> add extract pukafka,exttrailsource /opt/ogg/dirdat/to
EXTRACT added.
GGSCI (ambari.master.com) 2> add rmttrail /opt/ogg/dirdat/to,extract pukafka
RMTTRAIL added.
4.6 配置define檔案 Oracle與MySQL,Hadoop叢集(HDFS,Hive,kafka等)等之間資料傳輸可以定義為異構資料類型的傳輸,故需要定義表之間的關系映射,在OGG指令行執行:
1
2
3
4
GGSCI (ambari.master.com) 3> edit param test_ogg
defsfile /opt/ogg/dirdef/test_ogg.test_ogg
userid ogg,password ogg
table test_ogg.test_ogg;
在OGG主目錄下執行(oracle使用者):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
./defgen paramfile dirprm/test_ogg.prm
***********************************************************************
Oracle GoldenGate Table Definition Generator for Oracle
Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258
Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 16:58:29
Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
Starting at 2018-05-23 05:03:04
***********************************************************************
Operating System Version:
Linux
Version #1 SMP Wed Apr 12 15:04:24 UTC 2017, Release 3.10.0-514.16.1.el7.x86_64
Node: ambari.master.com
Machine: x86_64
soft limit hard limit
Address Space Size : unlimited unlimited
Heap Size : unlimited unlimited
File Size : unlimited unlimited
CPU Time : unlimited unlimited
Process id: 13126
***********************************************************************
** Running with the following parameters **
***********************************************************************
defsfile /opt/ogg/dirdef/test_ogg.test_ogg
userid ogg,password ***
table test_ogg.test_ogg;
Retrieving definition for TEST_OGG.TEST_OGG
Definitions generated for 1 table in /opt/ogg/dirdef/test_ogg.test_ogg
将生成的/opt/ogg/dirdef/test_ogg.test_ogg發送的目标端ogg目錄下的dirdef裡:
1
scp -r /opt/ogg/dirdef/test_ogg.test_ogg root@slave1:/opt/ogg/dirdef/
5、OGG目标端配置 5.1 開啟kafka服務 1
2
3
cd /opt/kafka_2.11-1.1.0/
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
5.2 配置管理器mgr 1
2
3
4
5
GGSCI (ambari.slave1.com) 1> edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
5.3 配置checkpoint checkpoint即複制可追溯的一個偏移量記錄,在全局配置裡添加checkpoint表即可。
1
2
edit param ./GLOBALS
CHECKPOINTTABLE test_ogg.checkpoint
5.4 配置replicate程序 1
2
3
4
5
6
7
GGSCI (ambari.slave1.com) 4> edit param rekafka
REPLICAT rekafka
sourcedefs /opt/ogg/dirdef/test_ogg.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;
說明:REPLICATE rekafka定義rep程序名稱;sourcedefs即在4.6中在源伺服器上做的表映射檔案;TARGETDB LIBFILE即定義kafka一些适配性的庫檔案以及配置檔案,配置檔案位于OGG主目錄下的dirprm/kafka.props;REPORTCOUNT即複制任務的報告生成頻率;GROUPTRANSOPS為以事務傳輸時,事務合并的機關,減少IO操作;MAP即源端與目标端的映射關系
5.5 配置kafka.props 1
2
cd /opt/ogg/dirprm/
vim kafka.props
1
2
3
4
5
6
7
gg.handlerlist=kafkahandler //handler類型
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相關配置
gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名稱,無需手動建立
gg.handler.kafkahandler.format=json //傳輸檔案的格式,支援json,xml等
gg.handler.kafkahandler.mode=op //OGG for Big Data中傳輸模式,即op為一次SQL傳輸一次,tx為一次事務傳輸一次
gg.classpath=dirprm/:/opt/kafka_2.11-1.1.0/libs/*:/opt/ogg/:/opt/ogg/lib/*
1
vim custom_kafka_producer.properties
1
2
3
4
5
6
7
8
bootstrap.servers=192.168.44.129:9092 //kafkabroker的位址
acks=1
compression.type=gzip //壓縮類型
reconnect.backoff.ms=1000 //重連延時
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
其中需要将後面的注釋去掉,ogg不識别注釋,如果不去掉會報錯
5.6 添加trail檔案到replicate程序 1
2
GGSCI (ambari.slave1.com) 2> add replicat rekafka exttrail /opt/ogg/dirdat/to,checkpointtable test_ogg.checkpoint
REPLICAT added.
6、測試 6.1 啟動所有程序 在源端和目标端的OGG指令行下使用start [程序名]的形式啟動所有程序。
啟動順序按照源mgr——目标mgr——源extract——源pump——目标replicate來完成。
全部需要在ogg目錄下執行ggsci目錄進入ogg指令行。
源端依次是
1
2
3
start mgr
start extkafka
start pukafka
1
2
start mgr
start rekafka
可以通過info all 或者info [程序名] 檢視狀态,所有的程序都為RUNNING才算成功
1
2
3
4
5
6
7
GGSCI (ambari.master.com) 5> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTKAFKA 04:50:21 00:00:03
EXTRACT RUNNING PUKAFKA 00:00:00 00:00:03
1
2
3
4
5
6
GGSCI (ambari.slave1.com) 3> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REKAFKA 00:00:00 00:00:01
6.2 異常解決 如果有不是RUNNING可通過檢視日志的方法檢查解決問題,具體通過下面兩種方法
或者ogg指令行,以rekafka程序為例
1
GGSCI (ambari.slave1.com) 2> view report rekafka
列舉其中我遇到的一個問題:
異常資訊
1
2
3
SEVERE: Unable to set property on handler 'kafkahandler' (oracle.goldengate.handler.kafka.KafkaHandler). Failed to set property: TopicName:="test_ogg" (class: oracle.goldengate.handler.kafka.KafkaHandler).
oracle.goldengate.util.ConfigException: Failed to set property: TopicName:="test_ogg" (class: oracle.goldengate.handler.kafka.KafkaHandler).
at ......
具體原因是網上的教程是舊版的,設定topicName的屬性為:
1
gg.handler.kafkahandler.topicName=test_ogg
新版的這樣設定
1
gg.handler.kafkahandler.topicMappingTemplate=test_ogg
大家可根據自己的版本進行設定,附上stackoverflow原答案
1
2
3
4
5
6
7
8
I tried to move data from Oracle Database to Kafka using Golden gate adapter Version 12.3.0.1.0
In new version there is no topicname
The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate=test
In previous version we have gg.handler.kafkahandler.topicName=test
6.3 測試同步更新效果 現在源端執行sql語句
1
2
3
4
5
6
7
conn test_ogg/test_ogg
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete test_ogg where id=1;
commit;
檢視源端trail檔案狀态
1
2
ls -l /opt/ogg/dirdat/to*
-rw-rw-rw- 1 oracle oinstall 1464 May 23 10:31 /opt/ogg/dirdat/to000000
檢視目标端trail檔案狀态
1
2
ls -l /opt/ogg/dirdat/to*
-rw-r----- 1 root root 1504 May 23 10:31 /opt/ogg/dirdat/to000000
檢視kafka是否自動建立對應的主題
1
bin/kafka-topics.sh --list --zookeeper localhost:2181
在清單中顯示有test_ogg則表示沒問題
通過消費者看是否有同步消息
1
2
3
4
bin/kafka-console-consumer.sh --bootstrap-server 192.168.44.129:9092 --topic test_ogg --from-beginning
{"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2018-05-23 10:31:28.000078","current_ts":"2018-05-23T10:36:48.525000","pos":"00000000000000001093","after":{"ID":1,"NAME":"test"}}
{"table":"TEST_OGG.TEST_OGG","op_type":"U","op_ts":"2018-05-23 10:31:36.000073","current_ts":"2018-05-23T10:36:48.874000","pos":"00000000000000001233","before":{},"after":{"ID":1,"NAME":"zhangsan"}}
{"table":"TEST_OGG.TEST_OGG","op_type":"D","op_ts":"2018-05-23 10:31:43.000107","current_ts":"2018-05-23T10:36:48.875000","pos":"00000000000000001376","before":{"ID":1}}
顯然,Oracle的資料已準實時同步到Kafka,格式為json,其中op_type代表操作類型,這個可配置,我沒有配置則按預設的來,預設為
1
2
3
gg.handler.kafkahandler.format.insertOpKey = I
gg.handler.kafkahandler.format.updateOpKey = U
gg.handler.kafkahandler.format.deleteOpKey = D
before代表操作之前的資料,after代表操作後的資料,現在已經可以從kafka擷取到同步的json資料了,後面可以用SparkStreaming和Storm等解析然後存到hadoop等大資料平台裡
6.4 SparkStreaming測試消費同步消息 具體代碼可參考Spark Streaming連接配接Kafka入門教程
下面附上消費成功的結果圖
利用ogg實作oracle到kafka的增量資料實時同步 7、更新:後續遇到的問題 在後面的使用過程中發現上面同步到kafka的json資料中少一些我們想要的一些,下面講一下我是如何解決的
首先建表:
1
2
3
4
5
6
7
8
9
CREATE TABLE "TCLOUD"."T_OGG2"
( "ID" NUMBER(*,0),
"TEXT_NAME" VARCHAR2(20),
"AGE" NUMBER(*,0),
"ADD" VARCHAR2(100),
"IDD" VARCHAR2(100),
CONSTRAINT "T_OGG2_PK" PRIMARY KEY ("ID", "IDD")
)
為什麼不用之前建的表,主要是之前的字段太少,不容易看出問題,現在主要是增加幾個字段,然後id,idd是聯合主鍵。
看一下按照之前的配置,同步到kafka的資料(截取部分資料)
1
2
3
4
5
{"table":"TCLOUD.T_OGG2","op_type":"I","op_ts":"2018-05-31 11:46:09.512672","current_ts":"2018-05-31T11:46:15.292000","pos":"00000000000000001903","after":{"ID":4,"TEXT_NAME":null,"AGE":0,"ADD":null,"IDD":"8"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 11:49:10.514549","current_ts":"2018-05-31T11:49:16.450000","pos":"00000000000000002227","before":{},"after":{"ID":4,"TEXT_NAME":"lisi","IDD":"7"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 11:49:48.514869","current_ts":"2018-05-31T11:49:54.481000","pos":"00000000000000002373","before":{"ID":4,"IDD":"7"},"after":{"ID":1,"IDD":"7"}}
{"table":"TCLOUD.T_OGG2","op_type":"D","op_ts":"2018-05-31 11:52:38.516877","current_ts":"2018-05-31T11:52:45.633000","pos":"00000000000000003161","before":{"ID":1,"IDD":"7"}}
現在隻有insert的資料是全的,update更新非主鍵字段before是沒有資料的,更新主鍵before隻有主鍵的資料,delete隻有before的主鍵字段,也就是update和delete的資訊是不全的,且沒有主鍵資訊(程式裡是不能判斷哪一個是主鍵的),這樣對于程式自動解析同步資料是不利的(不同的需求可能不一樣),具體自己可以分析,就不啰嗦了,這裡主要解決,有需要before和after全部資訊和主鍵資訊的需求。
7.1 添加before 在源端extract裡添加下面幾行
1
2
3
4
GGSCI (ambari.master.com) 33> edit param extkafka
GETUPDATEBEFORES
NOCOMPRESSDELETES
NOCOMPRESSUPDATES
重新開機 extkafka
1
2
stop extkafka
start extkafka
然後測試
1
2
3
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:48:55.630340","current_ts":"2018-05-31T14:49:01.709000","pos":"00000000000000003770","before":{"ID":1,"AGE":20,"IDD":"1"},"after":{"ID":1,"AGE":1,"IDD":"1"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:48:55.630340","current_ts":"2018-05-31T14:49:01.714000","pos":"00000000000000004009","before":{"ID":1,"AGE":20,"IDD":"2"},"after":{"ID":1,"AGE":1,"IDD":"2"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:48:55.630340","current_ts":"2018-05-31T14:49:01.715000","pos":"00000000000000004248","before":{"ID":1,"AGE":20,"IDD":"8"},"after":{"ID":1,"AGE":1,"IDD":"8"}}
發現update之後before裡有資料即可,但是現在before和after的資料都不全(隻有部分字段)
網上有的說隻添加GETUPDATES即可,但我測試了沒有成功,關于每個配置項什麼含義可以參考https://blog.csdn.net/linucle/article/details/13505939(有些配置的含義裡面也沒有給出)
參考:http://www.itpub.net/thread-2083473-1-1.html
7.2 添加主鍵 在kafka.props添加
1
gg.handler.kafkahandler.format.includePrimaryKeys=true
重新開機 rekafka
1
2
stop rekafka
start rekafka
測試:
1
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:58:57.637035","current_ts":"2018-05-31T14:59:03.401000","pos":"00000000000000004510","primary_keys":["ID","IDD"],"before":{"ID":1,"AGE":1,"IDD":"1"},"after":{"ID":1,"AGE":20,"IDD":"1"}}
發現有primary_keys,不錯~
參考:http://blog.51cto.com/lyzbg/2088409
7.3 補全全部字段 如果字段補全應該是Oracle沒有開啟全列補充日志
1
2
3
4
5
SQL> select supplemental_log_data_all from v$database;
SUPPLE
------
NO
通過以下指令開啟
1
2
3
4
5
6
7
8
9
10
11
SQL> alter database add supplemental log data(all) columns;
Database altered.
SQL> select supplemental_log_data_all from v$database;
SUPPLE
------
YES
SQL>
測試一下
1
2
3
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 15:27:45.655518","current_ts":"2018-05-31T15:27:52.891000","pos":"00000000000000006070","primary_keys":["ID","IDD"],"before":{"ID":1,"TEXT_NAME":null,"AGE":1,"ADD":null,"IDD":"1"},"after":{"ID":1,"TEXT_NAME":null,"AGE":20,"ADD":null,"IDD":"1"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 15:27:45.655518","current_ts":"2018-05-31T15:27:52.893000","pos":"00000000000000006341","primary_keys":["ID","IDD"],"before":{"ID":1,"TEXT_NAME":null,"AGE":1,"ADD":null,"IDD":"2"},"after":{"ID":1,"TEXT_NAME":null,"AGE":20,"ADD":null,"IDD":"2"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 15:27:45.655518","current_ts":"2018-05-31T15:27:52.895000","pos":"00000000000000006612","primary_keys":["ID","IDD"],"before":{"ID":1,"TEXT_NAME":null,"AGE":1,"ADD":null,"IDD":"8"},"after":{"ID":1,"TEXT_NAME":null,"AGE":20,"ADD":null,"IDD":"8"}}
到現在json資訊裡的内容已經很全了,基本滿足了我想要的,附圖:
利用ogg實作oracle到kafka的增量資料實時同步 啟發我發現和Oracle全列補充日志沒有開啟有關的部落格:https://blog.csdn.net/huoshuyinhua/article/details/79013387
開啟指令參考:https://blog.csdn.net/aaron8219/article/details/16825963
注:部落格上講到,開啟全列補充日志會導緻磁盤快速增長,LGWR程序繁忙,不建議使用。大家可根據自己的情況使用。
8、關于通配 如果想通配整個庫的話,隻需要把上面的配置所有表名的改為,如test_ogg.test_ogg改為 test_ogg.,但是kafka的topic不能通配,是以需要把所有的表的資料放在一個topic即可,後面再用程式解析表名即可。 9、附錄 目标端在這裡,下載下傳下來後檔案名123111_ggs_Adapters_Linux_x64.zip
利用ogg實作oracle到kafka的增量資料實時同步 源端在舊版本查詢下載下傳,下載下傳後檔案名為V34339-01.zip
利用ogg實作oracle到kafka的增量資料實時同步 利用ogg實作oracle到kafka的增量資料實時同步 利用ogg實作oracle到kafka的增量資料實時同步 利用ogg實作oracle到kafka的增量資料實時同步 參考資料 基于OGG的Oracle與Hadoop叢集準實時同步介紹