Oracle GoldenGate 大数据相关技术 oracle ogg kafka 数据库 ogg for bigdata
在前面曾写过几篇关于OGG同步Oracle等库数据到kafka的文章:
OGG实时同步Oracle数据到Kafka实施文档(供flink流式计算)
OGG For Bigdata 12按操作类型同步Oracle数据到kafka不同topic
但是那都是做测试,没有说实际工作情况下如何将Oracle等库表的历史数据初始化到kafka的方案,我这里用过两个方案,第一个比较笨的方案那就是写shell脚本将数据从Oracle导出成json格式的数据然后再写到kafka,另一种就是现在要介绍的通过OGG本身的初始化进程来做历史数据初始化,本篇文章环境完全根据前面文章搭建的环境来做的。
先再来看下当前环境的大致配置情况:
由于本文做的一系列Ogg forBigdata投递j’son消息到kafka操作是为了提供flink消费做实时计算用,为了极大的降低flink代码解析json的成本,提高消费速度,本人文章对insert,delete,update/pkupdate的映射大致逻辑是这样映射的:
1、对于insert操作,由于ogg for bigdata生成的json消息是下面这种情况:
1
也就是有效数据存储在after的部分,这里不做变化;
2、对于delete 操作,由于ogg for bigdata生成的json消息是下面这种情况:
也就是有效数据存储在before的部分,由于insert,delete,update我这里不再像前面文章映射到不同topic,这里都映射到一个topic中,这里flink解析就有问题了,因为json结构不同,insert的有效数据在after而delete的在before,这里为了flink解析json方便,将delete的操作对应的json的有效数据也放到after中,怎么实现?就是将delete转成insert,转置后的结果json如下:
但是转置完后,标识操作类型的op_type也变成了I,那后面flink计算时候怎么知道这条记录实际做的是delete?,这就是为什么我上篇文章在源端抽取进程加了TKN-OP-TYPE属性来标识这条记录做的是什么操作,这样就算replicat做了转置,op_type会变,但是TKN-OP-TYPE是从源端带来的属性值,这个不会变。
3、对于普通update操作,由于ogg for bigdata生成的json消息是下面这种情况:
2
这里的json只会带有加了附加日志的主键及被修改的字段值, 我们首先需要做的是,把update after的数据单独拿出来做一个json:
为什么不取before的数据,因为before的数据对我们没用,不需要取这些数据,其次,由于flink要计算的字段涉及empno,ename,job,sal,deptno这些字段,就算只是改了ename字段,其他字段没有变化,我们也要将这些没有变动的字段及其现在的值拿出来写到kafka,保证json消息的完整性,让flink在处理的时候更方便。
4、对于pkupdate操作,无论是主键+其他字段的修改还是仅主键单独的变更,原本的pkupdate消息如下:
这里我们要把pkupdate before的数据拆分成一个单独的json拿出来,并且让除了主键以外的其他需要计算的指标ename,job,sal,deptno也要在这个json中并且这些除主键外的字段值均要为null值,如下:
而after的也要单独拆分,要保证主键和所有字段的值都是现在最新的状态值:
之所以这么做一是因为前面说的保证j’son消息的完整性,其次是主键变更后,变更前的主键对应的j’son数据还在kafka中,而新的主键(包括变更主键和其他字段)对应的相关值除了变更主键时被变更的字段外其他的字段值都与旧主键值一致,这样flink计算的时候就会重复计算,为了避免重复计算,在主键变更后生成了新的主键+其他加了附加日志的字段j’son后,还要写一个旧的主键对应的j’son消息,让旧的主键最新的其他字段值都为null,这样flink在计算的时候,根据主键取最新状态值的时候就不会出现重复计算的问题了。
下面是上面逻辑的大致流程图:
下面看具体实验:
–下面所有源端表都是在scott用户下操作。
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
因为现在只是对主键加了附加日志,未来DML操作,insert,delete向kafka投递消息时,规定所有的数据都在after中便于j’son解析注册,没问题,但是update以json格式投递到kafka然后flink消费时字段值只有主键和被修改的字段存在值,但是未来SCEMP表可能empno,ename,job,sal,deptno这几个字段都会用到,dept表所有字段都会用到,并且要求无论对哪些字段做update操作,投递到kafka的所有json数据必须都要有上面几个字段及相关值。所以额外给emp表的empno,ename,job,sal,deptno组合添加附加日志,dept表给整个表添加附加日志来支持后续flink计算:
数据初始化,指的是从源端Oracle 数据库将已存在的需要的数据同步至目标端,配置初始化进程:
GoldenGate 提供了 DEFGEN 工具,用于生成数据定义,当源表和目标表中 的定义不同时,GoldenGate 进程将引用该专用工具。在运行 DEFGEN 之前,需要 为其创建一个参数文件:
52
53
54
将生成的定义文件传送到目标端, 目标端的replicate进程会使用这个文件。
因为环境中已经存在一个向195.168.1.66作用的抽取进程和投递进程 e_zt,d_zt:
,并且195.168.1.66的kafka应用进程已经存在并停止了:
现在只需要把上面两张表的配置加入到e_zt,现在抽取进程配置如下:
将上面两张表加进来
因为之前已经配置了rkafka进程,现在只需要在这个进程里面加那两张表的配置就行。
这里有一个问题,虽然update之后的数据能够让flink正常算,但是对于pkupdate之前的主键对应的记录值我们还是会做计算,所以这里flink计算会出现问题,会让同一条记录(只变了主键其他值不变,在kafka中是两条消息)计算两次了,而且我们前面规定了为了flink计算方便,所有数据都从json的after部分取数,所以这里我把对于pkupdate操作来说,在插入kafka一条update之后的数据后,再插入一条update前的数据,并且这个update前的数据除了主键是原来的值外,其余要计算的指标值都设置成null,这样相当于原来变更前的主键其他指标最新的值都是null了,flink在对当前主键最新值计算的时候就会把这些值当成空值来计算从而减去update前的值,只计算update后的值,就不会出现重复计算了,而且前面的配置太冗余,看最新的应用进程配置:
55
56
57
58
启动进程:
59
去目标端查看生成的trail文件:
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
数据过来了
先看下当前kafka中topic信息:
因为kafka已经配置了当没有相关topic时会自动创建相关topic,但是为了规范,这里手动创建topic:
单独开两个会话消费上面两个topic数据:
开始初始化数据:
查看日志:
查看两个topic消费情况:
SCDEP ,SCEMP表已经 初始化数据过来了。
接下来启动应用进程增量同步数据:
去kafka看结果:
可以看到insert都正常同步过来了。
从第一条update结果看,所有添加了附加日志的列及最新值都过来了,第二条结果发现SCEMP表在做了update mgr字段时候,除了其余所有加了附加日志的字段值都跟着过来了,mgr最新值也过来了,现在的json内容是:
主键+附加日志字段+被修改字段,能够满足flink极为方便的获取每个需要计算指标的最新值。
去看kafka消息:
从上面结果看到,现在pkupdate操作被分成了两个json,旧的主键对应的j’son中需要计算的指标值都是空,而新的主键对应的json中需要计算的指标都是各指标最新的值,能够满足flink在发生pkupdate时候计算不会出错。