天天看點

oracle 去重_如何用gpss實作Oracle到Greenplum的增量同步

oracle 去重_如何用gpss實作Oracle到Greenplum的增量同步

之前我們在《如何用gpss實作MySQL到Greenplum的增量同步》中詳細介紹了MySQL到Greenplum增量同步的實作步驟。今天将給大家講一講Oracle到Greenplum又是如何實作的。

Oracle資料庫雖然在OLTP領域仍有着毋庸置疑的優勢地位,但在OLAP領域與Greenplum則是差距顯著。如今已經有越來越多的分析型業務從Oracle遷移到Greenplum,在《如何從Oracle遷移到Greenplum》系列文章中,詳細介紹了業務的遷移的最佳實踐;而資料遷移中最核心的就是如何實作資料的實時增量同步。 對增量同步而言,gpss作為一個流計算架構,與源端是解耦的,是以隻要Kafka topic中的消息,包含足夠的資訊,gpss都可以提取變化的資料并重放到gp中。 之前介紹了如何利用gpss同步來自Maxwell和Mysql的增量資料,這裡再以Oracle Golden Gate為例,介紹如何實時同步來自Oracle的增量資料。

1

測試環境

  • Oracle
  • Oracle Golden Gate
  • Kafka 2.2.2
  • Greenplum 6.4.0
  • GPSS 1.3.6

我們要完成的工作是:

  • 通過GoldenGate将Oracle中的增量資料以json格式發送到Kafka (略)
  • 利用gpss解析kafka中的json消息
  • 将變化的資料更新到Greenplum的目标表中

2

測試資料簡介

測試使用的表在Oracle中定義如下:

CREATE TABLE SIEBEL_TEST.TEST_POC(   ID numeric,   NAME varchar2 (50),   BIRTHDAY date)
           

其中 ID 列為鍵,用來唯一辨別一條記錄, NAME 和 BIRTHDAY 為更新字段 在源端分别對這個表進行了insert,update和delete操作。 Insert語句為:

insert into test_poc values (1, 'Igor', '01-JAN-2000');
           

Update語句為:

update test_poc set birthday=add_months(birthday,1) where id <3;
           

Delete語句為:

delete from test_poc where id=3;
           

3

Kafka的消息格式

接下來我們對Golden Gate的這三種類型的消息進行簡單的分析。 Insert時生成的消息示例如下:

{  "table": "SIEBEL_TEST.TEST_POC",  "op_type": "I",  "op_ts": "2019-11-21 10:05:34.000000",  "current_ts": "2019-11-21T11:05:37.823000",  "pos": "00000000250000058833",  "tokens": {    "TK_OPTYPE": "INSERT",    "SCN": ""  },  "after": {    "ID": 1,    "NAME": "Igor",    "BIRTHDAY": "2000-01-01 00:00:00"  }}
           

Table表示源表的表名,current_ts表示操作發生的時間,這裡我們用它做排序;op_type和after表示執行的操作及對應的資料。 Delete生成的消息如下,op_type為"D",同時before中包含了完整的内容。

{  "table": "SIEBEL_TEST.TEST_POC",  "op_type": "D",  "op_ts": "2019-11-21 10:13:19.000000",  "current_ts": "2019-11-21T11:13:23.060002",  "pos": "00000000250000059999",  "tokens": {    "TK_OPTYPE": "DELETE",    "SCN": ""  },  "before": {    "ID": 3,    "NAME": "Gianluca",    "BIRTHDAY": "2002-01-01 00:00:00"  }}
           

Update除了包含新資料(after)外,還包含了更新之前的資料(before), op_type類型為'U'。

{  "table": "SIEBEL_TEST.TEST_POC",  "op_type": "U",  "op_ts": "2019-11-21 10:13:19.000000",  "current_ts": "2019-11-21T11:13:23.060000",  "pos": "00000000250000059561",  "tokens": {    "TK_OPTYPE": "SQL COMPUPDATE",    "SCN": ""  },  "before": {    "ID": 1,    "NAME": "Igor",    "BIRTHDAY": "2000-01-01 00:00:00"  },  "after": {    "ID": 1,    "NAME": "Igor",    "BIRTHDAY": "2000-02-01 00:00:00"  }}
           

根據生成的消息,我們需要執行如下操作:

  • 根據id對消息去重
  • 根據ts對消息排序
  • 對op_type為D的列執行删除操作
  • 對其它type類型執行Merge(upsert)操作

4

執行gpss的Kafka JOB

Greenplum中的定義包含了用于排序的字段ts,用來區分消息更新的先後順序,定義如下:

CREATE TABLE test_poc(   id numeric,   name varchar (50),   birthday date,   ts timestamp);
           

根據資料同步的需求,gpss需要的yaml配置檔案如下:

DATABASE: testUSER: gpadminHOST: mdwPORT: 5432VERSION: 2KAFKA:   INPUT:      SOURCE:        BROKERS: kafkahost:9092        TOPIC: oggpoc      VALUE:        COLUMNS:          - NAME: c1            TYPE: json        FORMAT: json      ERROR_LIMIT: 100   OUTPUT:      MODE: MERGE      MATCH_COLUMNS:        - id      UPDATE_COLUMNS:        - name        - birthday      ORDER_COLUMNS:        - ts      DELETE_CONDITION: c1->>'op_type' = 'D'      TABLE: test_poc      MAPPING:         - NAME: id           EXPRESSION: |              CASE WHEN ((c1->'after')::json is not null) THEN (c1->'after'->>'ID')::integer              ELSE (c1->'before'->>'ID')::integer end         - NAME: name           EXPRESSION: |              CASE WHEN ((c1->'after')::json is not null) THEN (c1->'after'->>'NAME')::text              ELSE null end         - NAME: birthday           EXPRESSION: |              CASE WHEN ((c1->'after')::json is not null) THEN (c1->'after'->>'BIRTHDAY')::date              ELSE null end         - NAME: ts           EXPRESSION: (c1->>'current_ts')::timestamp   COMMIT:      MINIMAL_INTERVAL: 2000
           

相關字段的含義和gpss實際執行的操作可參見參考文獻[2],這裡着重介紹下有差別的的地方,也就是由于insert和update操作的實際内容包含在after中,而delete的内容包含在before中,每個字段的内容需要額外的判斷邏輯:有after時讀取after中的内容,否則讀取before中的内容。 此外需要注意的是,當新消息的ORDER_COLUMNS的内容有重複時,gpss會把所有的包含重複内容的行,都記錄到目标表中;這樣的主要目的是為了避免資料丢失。是以在實際使用時一定要確定排序地段的唯一性。 配置檔案準備好後,我們通過gpkafka來執行加載:

gpkafka load ogg.yaml
           

gpkafka便會從kafka中拉取對應的消息,按照設定的操作将Kafka中的增量資料同步到目标表中。

5

小結

這裡簡單介紹了如何用gpss從Kafka消費GoldenGate生成的Oracle增量資料進行同步,其它資料庫及CDC工具(例如Informatica,StreamSet,NIFI等)也都可以利用類似的方案實作同步。今後會做更多的相關介紹,歡迎大家試用,回報,指導。

6

參考文獻

  1. https://github.com/pdeemea/kafka-gpss-replication
  2. 如何用gpss實作MySQL到Greenplum的增量同步
  3. https://gpdb.docs.pivotal.io/streaming-server
oracle 去重_如何用gpss實作Oracle到Greenplum的增量同步

我知道你

在看

oracle 去重_如何用gpss實作Oracle到Greenplum的增量同步

繼續閱讀