天天看點

Ogg For Bigdata 同步Oracle資料到KAFKA(包括初始化曆史資料)

Oracle GoldenGate 大資料相關技術 oracle ogg kafka 資料庫 ogg for bigdata

在前面曾寫過幾篇關于OGG同步Oracle等庫資料到kafka的文章:

OGG實時同步Oracle資料到Kafka實施文檔(供flink流式計算)

OGG For Bigdata 12按操作類型同步Oracle資料到kafka不同topic

但是那都是做測試,沒有說實際工作情況下如何将Oracle等庫表的曆史資料初始化到kafka的方案,我這裡用過兩個方案,第一個比較笨的方案那就是寫shell腳本将資料從Oracle導出成json格式的資料然後再寫到kafka,另一種就是現在要介紹的通過OGG本身的初始化程序來做曆史資料初始化,本篇文章環境完全根據前面文章搭建的環境來做的。

先再來看下目前環境的大緻配置情況:

Ogg For Bigdata 同步Oracle資料到KAFKA(包括初始化曆史資料)

由于本文做的一系列Ogg forBigdata投遞j’son消息到kafka操作是為了提供flink消費做實時計算用,為了極大的降低flink代碼解析json的成本,提高消費速度,本人文章對insert,delete,update/pkupdate的映射大緻邏輯是這樣映射的:

1、對于insert操作,由于ogg for bigdata生成的json消息是下面這種情況:

1

也就是有效資料存儲在after的部分,這裡不做變化;

2、對于delete 操作,由于ogg for bigdata生成的json消息是下面這種情況:

也就是有效資料存儲在before的部分,由于insert,delete,update我這裡不再像前面文章映射到不同topic,這裡都映射到一個topic中,這裡flink解析就有問題了,因為json結構不同,insert的有效資料在after而delete的在before,這裡為了flink解析json友善,将delete的操作對應的json的有效資料也放到after中,怎麼實作?就是将delete轉成insert,轉置後的結果json如下:

但是轉置完後,辨別操作類型的op_type也變成了I,那後面flink計算時候怎麼知道這條記錄實際做的是delete?,這就是為什麼我上篇文章在源端抽取程序加了TKN-OP-TYPE屬性來辨別這條記錄做的是什麼操作,這樣就算replicat做了轉置,op_type會變,但是TKN-OP-TYPE是從源端帶來的屬性值,這個不會變。

3、對于普通update操作,由于ogg for bigdata生成的json消息是下面這種情況:

2

這裡的json隻會帶有加了附加日志的主鍵及被修改的字段值, 我們首先需要做的是,把update after的資料單獨拿出來做一個json:

為什麼不取before的資料,因為before的資料對我們沒用,不需要取這些資料,其次,由于flink要計算的字段涉及empno,ename,job,sal,deptno這些字段,就算隻是改了ename字段,其他字段沒有變化,我們也要将這些沒有變動的字段及其現在的值拿出來寫到kafka,保證json消息的完整性,讓flink在處理的時候更友善。

4、對于pkupdate操作,無論是主鍵+其他字段的修改還是僅主鍵單獨的變更,原本的pkupdate消息如下:

這裡我們要把pkupdate before的資料拆分成一個單獨的json拿出來,并且讓除了主鍵以外的其他需要計算的名額ename,job,sal,deptno也要在這個json中并且這些除主鍵外的字段值均要為null值,如下:

而after的也要單獨拆分,要保證主鍵和所有字段的值都是現在最新的狀态值:

之是以這麼做一是因為前面說的保證j’son消息的完整性,其次是主鍵變更後,變更前的主鍵對應的j’son資料還在kafka中,而新的主鍵(包括變更主鍵和其他字段)對應的相關值除了變更主鍵時被變更的字段外其他的字段值都與舊主鍵值一緻,這樣flink計算的時候就會重複計算,為了避免重複計算,在主鍵變更後生成了新的主鍵+其他加了附加日志的字段j’son後,還要寫一個舊的主鍵對應的j’son消息,讓舊的主鍵最新的其他字段值都為null,這樣flink在計算的時候,根據主鍵取最新狀态值的時候就不會出現重複計算的問題了。

下面是上面邏輯的大緻流程圖:

Ogg For Bigdata 同步Oracle資料到KAFKA(包括初始化曆史資料)

下面看具體實驗:

–下面所有源端表都是在scott使用者下操作。

3

4

Ogg For Bigdata 同步Oracle資料到KAFKA(包括初始化曆史資料)
Ogg For Bigdata 同步Oracle資料到KAFKA(包括初始化曆史資料)

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

因為現在隻是對主鍵加了附加日志,未來DML操作,insert,delete向kafka投遞消息時,規定所有的資料都在after中便于j’son解析注冊,沒問題,但是update以json格式投遞到kafka然後flink消費時字段值隻有主鍵和被修改的字段存在值,但是未來SCEMP表可能empno,ename,job,sal,deptno這幾個字段都會用到,dept表所有字段都會用到,并且要求無論對哪些字段做update操作,投遞到kafka的所有json資料必須都要有上面幾個字段及相關值。是以額外給emp表的empno,ename,job,sal,deptno組合添加附加日志,dept表給整個表添加附加日志來支援後續flink計算:

資料初始化,指的是從源端Oracle 資料庫将已存在的需要的資料同步至目标端,配置初始化程序:

GoldenGate 提供了 DEFGEN 工具,用于生成資料定義,當源表和目标表中 的定義不同時,GoldenGate 程序将引用該專用工具。在運作 DEFGEN 之前,需要 為其建立一個參數檔案:

52

53

54

将生成的定義檔案傳送到目标端, 目标端的replicate程序會使用這個檔案。

因為環境中已經存在一個向195.168.1.66作用的抽取程序和投遞程序 e_zt,d_zt:

,并且195.168.1.66的kafka應用程序已經存在并停止了:

現在隻需要把上面兩張表的配置加入到e_zt,現在抽取程序配置如下:

将上面兩張表加進來

因為之前已經配置了rkafka程序,現在隻需要在這個程序裡面加那兩張表的配置就行。

這裡有一個問題,雖然update之後的資料能夠讓flink正常算,但是對于pkupdate之前的主鍵對應的記錄值我們還是會做計算,是以這裡flink計算會出現問題,會讓同一條記錄(隻變了主鍵其他值不變,在kafka中是兩條消息)計算兩次了,而且我們前面規定了為了flink計算友善,所有資料都從json的after部分取數,是以這裡我把對于pkupdate操作來說,在插入kafka一條update之後的資料後,再插入一條update前的資料,并且這個update前的資料除了主鍵是原來的值外,其餘要計算的名額值都設定成null,這樣相當于原來變更前的主鍵其他名額最新的值都是null了,flink在對目前主鍵最新值計算的時候就會把這些值當成空值來計算進而減去update前的值,隻計算update後的值,就不會出現重複計算了,而且前面的配置太備援,看最新的應用程序配置:

55

56

57

58

啟動程序:

59

去目标端檢視生成的trail檔案:

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

資料過來了

先看下目前kafka中topic資訊:

因為kafka已經配置了當沒有相關topic時會自動建立相關topic,但是為了規範,這裡手動建立topic:

單獨開兩個會話消費上面兩個topic資料:

開始初始化資料:

檢視日志:

檢視兩個topic消費情況:

SCDEP ,SCEMP表已經 初始化資料過來了。

接下來啟動應用程序增量同步資料:

去kafka看結果:

可以看到insert都正常同步過來了。

從第一條update結果看,所有添加了附加日志的列及最新值都過來了,第二條結果發現SCEMP表在做了update mgr字段時候,除了其餘所有加了附加日志的字段值都跟着過來了,mgr最新值也過來了,現在的json内容是:

主鍵+附加日志字段+被修改字段,能夠滿足flink極為友善的擷取每個需要計算名額的最新值。

去看kafka消息:

從上面結果看到,現在pkupdate操作被分成了兩個json,舊的主鍵對應的j’son中需要計算的名額值都是空,而新的主鍵對應的json中需要計算的名額都是各名額最新的值,能夠滿足flink在發生pkupdate時候計算不會出錯。