天天看點

利用ogg實作oracle到kafka的增量資料實時同步

ogg即Oracle GoldenGate是Oracle的同步工具,本文講如何配置ogg以實作Oracle資料庫增量資料實時同步到kafka中,其中同步消息格式為json。

下面是我的源端和目标端的一些配置資訊:

<col>

-

版本

OGG版本

ip

别名

源端

OracleRelease 11.2.0.1.0

Oracle GoldenGate 11.2.1.0.3 for Oracle on Linux x86-64

192.168.44.128

master

目标端

kafka_2.11-1.1.0

Oracle GoldenGate for Big Data 12.3.1.1.1 on Linux x86-64

192.168.44.129

slave1

注意:源端和目标端的檔案不一樣,目标端需要下載下傳Oracle GoldenGate for Big Data,源端需要下載下傳Oracle GoldenGate for Oracle具體下載下傳方法見最後的附錄截圖。

注意:源端是安裝了oracle的機器,oracle環境變量之前都配置好了

先建立ogg目錄

解壓後得到一個tar包,再解壓這個tar

為了簡單友善起見,我在/etc/profile裡配置的,建議在生産中配置oracle的環境變量檔案/home/oracle/.bash_profile裡配置,為了怕出問題,我把OGG_HOME等環境變量在/etc/profile配置了一份,不知道這是否是必須的。

使之生效

測試一下ogg指令

如果指令成功即可進行下一步,不成功請檢查前面的步驟。

執行下面的指令檢視目前是否為歸檔模式

若為Disabled,手動打開即可

再執行一下

可以看到為Enabled,則成功打開歸檔模式。

OGG基于輔助日志等進行實時傳輸,故需要打開相關日志確定可擷取事務内容,通過下面的指令檢視該狀态

若為NO,則需要通過指令修改

再檢視一下為YES即可

首先root使用者建立相關檔案夾,并賦予權限

然後執行下面sql

建立一個使用者,在該使用者下建立測試表,使用者名、密碼、表名均為 test_ogg。

同樣測試一下ogg指令

先切換到oracle使用者下

然後和用vim編輯一樣添加

說明:PORT即mgr的預設監聽端口;DYNAMICPORTLIST動态端口清單,當指定的mgr端口不可用時,會在這個端口清單中選擇一個,最大指定範圍為256個;AUTORESTART重新開機參數設定表示重新開機所有EXTRACT程序,最多5次,每次間隔3分鐘;PURGEOLDEXTRACTS即TRAIL檔案的定期清理

說明:第一行指定extract程序名稱;dynamicresolution動态解析;SETENV設定環境變量,這裡分别設定了Oracle資料庫以及字元集;userid ggs,password ggs即OGG連接配接Oracle資料庫的帳号密碼,這裡使用2.5中特意建立的複制帳号;exttrail定義trail檔案的儲存位置以及檔案名,注意這裡檔案名隻能是2個字母,其餘部分OGG會補齊;table即複制表的表名,支援*通配,必須以;結尾

添加extract程序:

(注:若報錯

執行下面的指令再重新添加即可。

)

添加trail檔案的定義與extract程序綁定:

pump程序本質上來說也是一個extract,隻不過他的作用僅僅是把trail檔案傳遞到目标端,配置過程和extract程序類似,隻是邏輯上稱之為pump程序

說明:第一行指定extract程序名稱;passthru即禁止OGG與Oracle互動,我們這裡使用pump邏輯傳輸,故禁止即可;dynamicresolution動态解析;userid ogg,password ogg即OGG連接配接Oracle資料庫的帳号密碼rmthost和mgrhost即目标端(kafka)OGG的mgr服務的位址以及監聽端口;rmttrail即目标端trail檔案存儲位置以及名稱。

分别将本地trail檔案和目标端的trail檔案綁定到extract程序:

Oracle與MySQL,Hadoop叢集(HDFS,Hive,kafka等)等之間資料傳輸可以定義為異構資料類型的傳輸,故需要定義表之間的關系映射,在OGG指令行執行:

在OGG主目錄下執行(oracle使用者):

将生成的/opt/ogg/dirdef/test_ogg.test_ogg發送的目标端ogg目錄下的dirdef裡:

checkpoint即複制可追溯的一個偏移量記錄,在全局配置裡添加checkpoint表即可。

說明:REPLICATE rekafka定義rep程序名稱;sourcedefs即在4.6中在源伺服器上做的表映射檔案;TARGETDB LIBFILE即定義kafka一些适配性的庫檔案以及配置檔案,配置檔案位于OGG主目錄下的dirprm/kafka.props;REPORTCOUNT即複制任務的報告生成頻率;GROUPTRANSOPS為以事務傳輸時,事務合并的機關,減少IO操作;MAP即源端與目标端的映射關系

其中需要将後面的注釋去掉,ogg不識别注釋,如果不去掉會報錯

在源端和目标端的OGG指令行下使用start [程序名]的形式啟動所有程序。

啟動順序按照源mgr——目标mgr——源extract——源pump——目标replicate來完成。

全部需要在ogg目錄下執行ggsci目錄進入ogg指令行。

源端依次是

可以通過info all 或者info [程序名] 檢視狀态,所有的程序都為RUNNING才算成功

如果有不是RUNNING可通過檢視日志的方法檢查解決問題,具體通過下面兩種方法

或者ogg指令行,以rekafka程序為例

列舉其中我遇到的一個問題:

異常資訊

具體原因是網上的教程是舊版的,設定topicName的屬性為:

新版的這樣設定

大家可根據自己的版本進行設定,附上stackoverflow原答案

現在源端執行sql語句

檢視源端trail檔案狀态

檢視目标端trail檔案狀态

檢視kafka是否自動建立對應的主題

在清單中顯示有test_ogg則表示沒問題

通過消費者看是否有同步消息

顯然,Oracle的資料已準實時同步到Kafka,格式為json,其中op_type代表操作類型,這個可配置,我沒有配置則按預設的來,預設為

before代表操作之前的資料,after代表操作後的資料,現在已經可以從kafka擷取到同步的json資料了,後面可以用SparkStreaming和Storm等解析然後存到hadoop等大資料平台裡

下面附上消費成功的結果圖

利用ogg實作oracle到kafka的增量資料實時同步

在後面的使用過程中發現上面同步到kafka的json資料中少一些我們想要的一些,下面講一下我是如何解決的

首先建表:

為什麼不用之前建的表,主要是之前的字段太少,不容易看出問題,現在主要是增加幾個字段,然後id,idd是聯合主鍵。

看一下按照之前的配置,同步到kafka的資料(截取部分資料)

現在隻有insert的資料是全的,update更新非主鍵字段before是沒有資料的,更新主鍵before隻有主鍵的資料,delete隻有before的主鍵字段,也就是update和delete的資訊是不全的,且沒有主鍵資訊(程式裡是不能判斷哪一個是主鍵的),這樣對于程式自動解析同步資料是不利的(不同的需求可能不一樣),具體自己可以分析,就不啰嗦了,這裡主要解決,有需要before和after全部資訊和主鍵資訊的需求。

在源端extract裡添加下面幾行

重新開機 extkafka

然後測試

發現update之後before裡有資料即可,但是現在before和after的資料都不全(隻有部分字段)

網上有的說隻添加GETUPDATES即可,但我測試了沒有成功,關于每個配置項什麼含義可​

在kafka.props添加

重新開機 rekafka

測試:

發現有primary_keys,不錯~

如果字段補全應該是Oracle沒有開啟全列補充日志

通過以下指令開啟

測試一下

到現在json資訊裡的内容已經很全了,基本滿足了我想要的,附圖:

利用ogg實作oracle到kafka的增量資料實時同步

注:部落格上講到,開啟全列補充日志會導緻磁盤快速增長,LGWR程序繁忙,不建議使用。大家可根據自己的情況使用。

如果想通配整個庫的話,隻需要把上面的配置所有表名的改為,如test_ogg.test_ogg改為 test_ogg.,但是kafka的topic不能通配,是以需要把所有的表的資料放在一個topic即可,後面再用程式解析表名即可。
利用ogg實作oracle到kafka的增量資料實時同步
利用ogg實作oracle到kafka的增量資料實時同步
利用ogg實作oracle到kafka的增量資料實時同步
利用ogg實作oracle到kafka的增量資料實時同步
利用ogg實作oracle到kafka的增量資料實時同步