天天看點

Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

背景:一些客戶回報,增量同步資料到MaxCompute按照全天的資料做增量資料同步資料量太大,且不使用按天的增量同步資料,進行在MaxCompute上進行資料處理得出增量資料對于delete的相關資料不能做到很好的支援,在次給大家一個對增量資料同步的方案使用DTS做增量同步資料到MaxCompute,資料源為ECS上自建的mysql5.7。

一、為自建MySQL建立賬号并設定

1.1登陸自建Mysql資料庫

Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

1.2建立mysql資料庫中用于資料遷移/同步的賬号

CREATE USER 'dtsmigration'@'%' IDENTIFIED BY 'Dts123456';           

說明:

  • username:待建立的賬号。
  • host:允許該賬号登入的主機,如果允許該賬号從任意主機登入資料庫,可以使用百分号(%)。
  • password:賬号的密碼。

1.3對賬号進行授權操作

GRANT privileges ON databasename.tablename TO 'username'@'host' WITH GRANT OPTION;           
  • privileges:授予該賬号的操作權限,如SELECT、INSERT、UPDATE等,如果要授予該賬号所有權限,則使用ALL。
  • databasename:資料庫名。如果要授予該賬号具備所有資料庫的操作權限,則使用星号(*)。
  • tablename:表名。如果要授予該賬号具備所有表的操作權限,則使用星号(*)。
  • username:待授權的賬号。
  • host:允許該賬号登入的主機,如果允許該賬号從任意主機登入,則使用百分号(%)。
  • WITH GRANT OPTION:授予該賬号使用GRANT指令的權限,該參數為可選。

如果要給賬戶賦予所有資料庫和表的權限,并容許從任意主機登陸資料庫

GRANT ALL ON *.* TO 'dtsmigration'@'%';           

1.4開啟并設定自建Mysql資料庫binlog

到指定目錄下找到該檔案

Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

a.使用vim指令,修改配置檔案my.cnf中的如下參數

log_bin=mysql_bin
binlog_format=row
server_id=2 //設定大于1的整數
binlog_row_image=full //當自建MySQL的版本大于5.6時,則必須設定該項。           
Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

b.修改完成後,重新開機Mysql程序。

Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料
service mysqld restart           
Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

二、同步過程介紹

2.1結構初始化

DTS将源庫中待同步表的結構定義資訊同步至MaxCompute中,初始化時DTS會為表名增加_base字尾。例如源表為customer,那麼MaxCompute中的表即為customer_base。

2.2全量資料初始化

DTS将源庫中待同步表的存量資料,全部同步至MaxCompute中的目标表名_base表中(例如從源庫的customer表同步至MaxCompute的customer_base表),作為後續增量同步資料的基線資料。

2.3增量資料同步

DTS在MaxCompute中建立一個增量日志表,表名為同步的目标表名_log,例如customer_log,然後将源庫産生的增量資料實時同步到該表中。

三、增量同步實踐

3.1購買DTS同步

Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

3.2檢視購買的DTS同步,點選配置同步鍊路

Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

3.3配置對應的資料源和相應的MaxCompute項目

Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

3.4點選授予權限的同步賬号操作

Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

3.5選擇對應的增量同步資料的同步實踐,并選擇需要同步的表

Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

3.6同步配置預檢查

Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

3.7查詢同步的全量資料

Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

3.8檢視同步成功的增量資料分區user_log

Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

3.9檢視增量資料同步的資料

Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

中繼資料的字段介紹

字段 說明
record_id

增量日志的記錄id,為該日志唯一辨別。

- id的值唯一且遞增。

- 如果增量日志的操作類型為UPDATE,那麼增量更新會被拆分成兩條記錄,且record_id的值相同。

operation_flag

操作類型,取值:

- I:INSERT操作。

- D:DELETE操作。

- U:UPDATE操作。

utc_timestamp 操作時間戳,即binlog的時間戳(UTC 時間)。
before_flag 所有列的值是否為更新前的值,取值:Y或N。
after_flag 所有列的值是否為更新後的值,取值:Y或N。

四、根據時間點位,整合該時間點位之前的全量資料

4.1建立全量資料表

CREATE TABLE IF NOT EXISTS maxcomputeone_dev.user_all(uid BIGINT,uname STRING,deptno BIGINT,gender STRING,optime DATETIME,record_id BIGINT,operation_flag STRING,utc_timestamp BIGINT,before_flag STRING,after_flag STRING);           

4.2檢視增量資料最後同步的點位,最後整合全量資料到user_all

Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

合并語句

set odps.sql.allow.fullscan=true;
insert overwrite table user_all
select uid,
       uname,
       deptno,
       gender,
       optime
  from(
select row_number() over(partition by t.uid
 order by record_id desc, after_flag desc) as record_num, record_id, operation_flag, after_flag, uid, uname, deptno,gender,optime
  from(
select incr.record_id, incr.operation_flag, incr.after_flag, incr.uid, incr.uname,incr.deptno,incr.gender,incr.optime
  from user_log incr
 where utc_timestamp <= 1585107804
 union all
select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.uid, base.uname,base.deptno,base.gender,base.optime
  from user_base base) t) gt
where record_num=1 
  and after_flag='Y';
           

歡迎加入“MaxCompute開發者社群2群”,點選連結申請加入或掃描二維碼

https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745
Mysql5.7使用DTS增量同步資料到MaxCompute一、為自建MySQL建立賬号并設定二、同步過程介紹三、增量同步實踐四、根據時間點位,整合該時間點位之前的全量資料

繼續閱讀