
之前我們在《如何用gpss實作MySQL到Greenplum的增量同步》中詳細介紹了MySQL到Greenplum增量同步的實作步驟。今天将給大家講一講Oracle到Greenplum又是如何實作的。
Oracle資料庫雖然在OLTP領域仍有着毋庸置疑的優勢地位,但在OLAP領域與Greenplum則是差距顯著。如今已經有越來越多的分析型業務從Oracle遷移到Greenplum,在《如何從Oracle遷移到Greenplum》系列文章中,詳細介紹了業務的遷移的最佳實踐;而資料遷移中最核心的就是如何實作資料的實時增量同步。 對增量同步而言,gpss作為一個流計算架構,與源端是解耦的,是以隻要Kafka topic中的消息,包含足夠的資訊,gpss都可以提取變化的資料并重放到gp中。 之前介紹了如何利用gpss同步來自Maxwell和Mysql的增量資料,這裡再以Oracle Golden Gate為例,介紹如何實時同步來自Oracle的增量資料。
1
測試環境
- Oracle
- Oracle Golden Gate
- Kafka 2.2.2
- Greenplum 6.4.0
- GPSS 1.3.6
我們要完成的工作是:
- 通過GoldenGate将Oracle中的增量資料以json格式發送到Kafka (略)
- 利用gpss解析kafka中的json消息
- 将變化的資料更新到Greenplum的目标表中
2
測試資料簡介
測試使用的表在Oracle中定義如下:
CREATE TABLE SIEBEL_TEST.TEST_POC( ID numeric, NAME varchar2 (50), BIRTHDAY date)
其中 ID 列為鍵,用來唯一辨別一條記錄, NAME 和 BIRTHDAY 為更新字段 在源端分别對這個表進行了insert,update和delete操作。 Insert語句為:
insert into test_poc values (1, 'Igor', '01-JAN-2000');
Update語句為:
update test_poc set birthday=add_months(birthday,1) where id <3;
Delete語句為:
delete from test_poc where id=3;
3
Kafka的消息格式
接下來我們對Golden Gate的這三種類型的消息進行簡單的分析。 Insert時生成的消息示例如下:
{ "table": "SIEBEL_TEST.TEST_POC", "op_type": "I", "op_ts": "2019-11-21 10:05:34.000000", "current_ts": "2019-11-21T11:05:37.823000", "pos": "00000000250000058833", "tokens": { "TK_OPTYPE": "INSERT", "SCN": "" }, "after": { "ID": 1, "NAME": "Igor", "BIRTHDAY": "2000-01-01 00:00:00" }}
Table表示源表的表名,current_ts表示操作發生的時間,這裡我們用它做排序;op_type和after表示執行的操作及對應的資料。 Delete生成的消息如下,op_type為"D",同時before中包含了完整的内容。
{ "table": "SIEBEL_TEST.TEST_POC", "op_type": "D", "op_ts": "2019-11-21 10:13:19.000000", "current_ts": "2019-11-21T11:13:23.060002", "pos": "00000000250000059999", "tokens": { "TK_OPTYPE": "DELETE", "SCN": "" }, "before": { "ID": 3, "NAME": "Gianluca", "BIRTHDAY": "2002-01-01 00:00:00" }}
Update除了包含新資料(after)外,還包含了更新之前的資料(before), op_type類型為'U'。
{ "table": "SIEBEL_TEST.TEST_POC", "op_type": "U", "op_ts": "2019-11-21 10:13:19.000000", "current_ts": "2019-11-21T11:13:23.060000", "pos": "00000000250000059561", "tokens": { "TK_OPTYPE": "SQL COMPUPDATE", "SCN": "" }, "before": { "ID": 1, "NAME": "Igor", "BIRTHDAY": "2000-01-01 00:00:00" }, "after": { "ID": 1, "NAME": "Igor", "BIRTHDAY": "2000-02-01 00:00:00" }}
根據生成的消息,我們需要執行如下操作:
- 根據id對消息去重
- 根據ts對消息排序
- 對op_type為D的列執行删除操作
- 對其它type類型執行Merge(upsert)操作
4
執行gpss的Kafka JOB
Greenplum中的定義包含了用于排序的字段ts,用來區分消息更新的先後順序,定義如下:
CREATE TABLE test_poc( id numeric, name varchar (50), birthday date, ts timestamp);
根據資料同步的需求,gpss需要的yaml配置檔案如下:
DATABASE: testUSER: gpadminHOST: mdwPORT: 5432VERSION: 2KAFKA: INPUT: SOURCE: BROKERS: kafkahost:9092 TOPIC: oggpoc VALUE: COLUMNS: - NAME: c1 TYPE: json FORMAT: json ERROR_LIMIT: 100 OUTPUT: MODE: MERGE MATCH_COLUMNS: - id UPDATE_COLUMNS: - name - birthday ORDER_COLUMNS: - ts DELETE_CONDITION: c1->>'op_type' = 'D' TABLE: test_poc MAPPING: - NAME: id EXPRESSION: | CASE WHEN ((c1->'after')::json is not null) THEN (c1->'after'->>'ID')::integer ELSE (c1->'before'->>'ID')::integer end - NAME: name EXPRESSION: | CASE WHEN ((c1->'after')::json is not null) THEN (c1->'after'->>'NAME')::text ELSE null end - NAME: birthday EXPRESSION: | CASE WHEN ((c1->'after')::json is not null) THEN (c1->'after'->>'BIRTHDAY')::date ELSE null end - NAME: ts EXPRESSION: (c1->>'current_ts')::timestamp COMMIT: MINIMAL_INTERVAL: 2000
相關字段的含義和gpss實際執行的操作可參見參考文獻[2],這裡着重介紹下有差別的的地方,也就是由于insert和update操作的實際内容包含在after中,而delete的内容包含在before中,每個字段的内容需要額外的判斷邏輯:有after時讀取after中的内容,否則讀取before中的内容。 此外需要注意的是,當新消息的ORDER_COLUMNS的内容有重複時,gpss會把所有的包含重複内容的行,都記錄到目标表中;這樣的主要目的是為了避免資料丢失。是以在實際使用時一定要確定排序地段的唯一性。 配置檔案準備好後,我們通過gpkafka來執行加載:
gpkafka load ogg.yaml
gpkafka便會從kafka中拉取對應的消息,按照設定的操作将Kafka中的增量資料同步到目标表中。
5
小結
這裡簡單介紹了如何用gpss從Kafka消費GoldenGate生成的Oracle增量資料進行同步,其它資料庫及CDC工具(例如Informatica,StreamSet,NIFI等)也都可以利用類似的方案實作同步。今後會做更多的相關介紹,歡迎大家試用,回報,指導。
6
參考文獻
- https://github.com/pdeemea/kafka-gpss-replication
- 如何用gpss實作MySQL到Greenplum的增量同步
- https://gpdb.docs.pivotal.io/streaming-server
我知道你
在看
哦