天天看點

Bireme實時同步mysql資料,debezium+kafka+bireme,同步到Greenplum,MongoDB,PostgreSQL1、Bireme簡介。2、配置相關資料源、目标資料源和java環境。3、kafka的安裝和啟動配置4、debezium的安裝和啟動配置5、bireme的安裝和啟動配置6、測試7、優勢和優點。

1、Bireme簡介。

Bireme 是一個 Greenplum / HashData 資料倉庫的增量同步工具。目前支援 MySQL、PostgreSQL 和 MongoDB 資料源

文檔官方:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md

 Bireme工作原理

Bireme 從資料源讀取資料 (Record),将其轉化為内部格式 (Row) 并緩存,當緩存資料達到一定量,将這些資料合并為一個任務 (Task),每個任務包含兩個集合,delete 集合與insert 集合,最後把這些資料更新到目标資料庫。

每個資料源可以有多個 pipeline,對于 maxwell,每個 Kafka partition 對應一個 pipeline;對于 debezium,每個 Kafka topic 對應一個 pipeline。

1、資料流

Bireme實時同步mysql資料,debezium+kafka+bireme,同步到Greenplum,MongoDB,PostgreSQL1、Bireme簡介。2、配置相關資料源、目标資料源和java環境。3、kafka的安裝和啟動配置4、debezium的安裝和啟動配置5、bireme的安裝和啟動配置6、測試7、優勢和優點。

Bireme 采用 DELETE + COPY 的方式,将資料源的修改記錄同步到 Greenplum / HashData ,相較于INSERT + UPDATE + DELETE的方式,COPY 方式速度更快,性能更優

2、資料源

2.1、Maxwell + Kafka 是 bireme 目前支援的一種資料源類型,架構如下圖:

Bireme實時同步mysql資料,debezium+kafka+bireme,同步到Greenplum,MongoDB,PostgreSQL1、Bireme簡介。2、配置相關資料源、目标資料源和java環境。3、kafka的安裝和啟動配置4、debezium的安裝和啟動配置5、bireme的安裝和啟動配置6、測試7、優勢和優點。

Maxwell 是一個 MySQL binlog 的讀取工具,它可以實時讀取 MySQL 的 binlog,并生成 JSON 格式的消息,作為生産者發送給 Kafka

2.2、Debezium + Kafka 是 bireme 支援的另外一種資料源類型,架構如下圖:

Bireme實時同步mysql資料,debezium+kafka+bireme,同步到Greenplum,MongoDB,PostgreSQL1、Bireme簡介。2、配置相關資料源、目标資料源和java環境。3、kafka的安裝和啟動配置4、debezium的安裝和啟動配置5、bireme的安裝和啟動配置6、測試7、優勢和優點。

Debezium 是一個CDC工具,可以将資料庫的增删改轉換為事件流,并把這些修改發送給 Kafka

3、工作原理

Bireme 從資料源讀取資料 (Record),将其轉化為内部格式 (Row) 并緩存,當緩存資料達到一定量,将這些資料合并為一個任務 (Task),每個任務包含兩個集合,delete 集合與insert 集合,最後把這些資料更新到目标資料庫。

每個資料源可以有多個 pipeline,對于 maxwell,每個 Kafka partition 對應一個 pipeline;對于 debezium,每個 Kafka topic 對應一個 pipeline

4、本文搭建執行個體圖形

Bireme實時同步mysql資料,debezium+kafka+bireme,同步到Greenplum,MongoDB,PostgreSQL1、Bireme簡介。2、配置相關資料源、目标資料源和java環境。3、kafka的安裝和啟動配置4、debezium的安裝和啟動配置5、bireme的安裝和啟動配置6、測試7、優勢和優點。

2、配置相關資料源、目标資料源和java環境。

1、mysql資料源

1、資料庫,create database syncdb1;
2、使用者權限,需要擁有SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT,此處使用root權限
3、同步的表(切換到syncdb1資料庫),create table tb1(a int, b char(10), primary key(a));
           

2、pgsql目的資料庫

1、使用者,create user syncdb with password 'syncdb';
2、資料庫,create database syncdb with owner 'syncdb';
3、同步的表(使用syncdb使用者切換到syncdb資料庫),create table tb1(a int, b char(10), primary key(a));
           

3、java環境的安裝

1、下載下傳二進制安裝包:jdk-8u101-linux-x64.tar.gz
2、解壓二進制包并做軟連結:tar xf jdk-8u101-linux-x64.tar.gz && ln -s /data/jdk1.8.0_101 /usr/java
3、配置路徑和java環境變量:vim /etc/profile.d/java.sh
export JAVA_HOME=/usr/java
export JRE_HOME=$JAVA_HOME/jre
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
4、source生效:source  /etc/profile.d/java.sh
5、安裝jsvc,yum install jsvc
           

3、kafka的安裝和啟動配置

1、下載下傳位址:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/

2、kafka官方文檔:http://kafka.apache.org/

3、解壓縮:tar xf kafka_2.11-2.0.0.tgz && cd kafka_2.11-2.0.0

4、ZooKeeper

啟動,bin/zookeeper-server-start.sh config/zookeeper.properties
關閉,bin/zookeeper-server-stop.sh config/zookeeper.properties
           

5、Kafka server

啟動,bin/kafka-server-start.sh config/server.properties
啟動,bin/kafka-server-stop.sh config/server.properties
           

6、Topic(不是本實驗必須的,作為學習使用)

建立,bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic world
查詢,bin/kafka-topics.sh --list --zookeeper localhost:2181
删除,bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic world
           

7、Producer(不是本實驗必須的,作為學習使用)

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello
>jiaming
>
           

8、Consumer(不是本實驗必須的,作為學習使用)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
jiaming
           

4、debezium的安裝和啟動配置

下載下傳debezium的mysql連接配接器

1、下載下傳位址:https://debezium.io/docs/install/

2、debezium官方文檔:https://debezium.io/docs/tutorial/

3、解壓縮:tar xf debezium-connector-mysql-0.8.1.Final-plugin.tar.gz

4、解壓出來的jar包全部拷貝到kafka libs目錄下,cp debezium-connector-mysql/.jar kafka2.11-2.0.0/libs/

5、添加配置檔案(用于連接配接mysql資料源,對應參數可參考官方介紹:https://debezium.io/docs/connectors/mysql/#example-configuration )

cd kafka_2.11-2.0.0 && vim mysql.properties

note:debezium的database.server.name一定要和bireme的data_source保持一緻

name=inventory-connector
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=118.190.209.102
database.port=5700
database.user=root
database.password=123456
database.server.id=129129
database.server.name=debezium1  # debezium的database.server.name一定要和bireme的data_source保持一緻
database.whitelist=syncdb1  # 同步的資料庫清單
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=dbhistory.debezium1
include.schema.changes=true
           

6、以獨立模式啟動kafka connect,此時debezium會對資料庫中的每一個表建立一個topic,消費相應的topic,即可擷取binlog解析資訊

cd kafka_2.11-2.0.0
bin/connect-standalone.sh config/connect-standalone.properties mysql.properties
           

7、檢視topic清單

cd kafka_2.11-2.0.0
bin/kafka-topics.sh --list --zookeeper localhost:2181
           

debezium1.syncdb1.tb1,每個資料源同步表會生成一個topic

debezium1,記錄ddl操作

dbhistory.debezium1,記錄對應ddl操作和position位點資訊

Bireme實時同步mysql資料,debezium+kafka+bireme,同步到Greenplum,MongoDB,PostgreSQL1、Bireme簡介。2、配置相關資料源、目标資料源和java環境。3、kafka的安裝和啟動配置4、debezium的安裝和啟動配置5、bireme的安裝和啟動配置6、測試7、優勢和優點。

5、bireme的安裝和啟動配置

1、下載下傳位址:https://github.com/HashDataInc/bireme/releases

2、bireme官方文檔:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md

3、解壓縮:tar xf bireme-1.0.0.tar.gz && cd bireme-1.0.0

4、修改配置檔案,vim etc/config.properties

note:debezium的database.server.name一定要和bireme的data_source保持一緻

# target database where the data will sync into.
target.url = jdbc:postgresql://118.190.209.102:5432/syncdb
target.user = syncdb
target.passwd = syncdb

# data source name list, separated by comma.
data_source = debezium1  # debezium的database.server.name一定要和bireme的data_source保持一緻

# data source "debezium1"
debezium1.type = debezium
# kafka server which debezium write into.
debezium1.kafka.server = 127.0.0.1:9092 
# kafka groupid used for consumer.
debezium1.kafka.groupid = bireme
debezium1.kafka.namespace = debezium1

# set the IP address for bireme state server.
state.server.addr = 0.0.0.0
# set the port for bireme state server.
state.server.port = 8080
           

5、修改配置檔案,vim etc/debezium1.properties(表映射配置)

note:debezium1.properties的debezium1一定要和bireme的data_source保持一緻

# source table full name = target table full name
syncdb1.tb1 = public.tb1
           

6、啟動bireme,bin/bireme start

7、監控,http://192.168.1.129:8080/pretty (state.server.addr:state.server.port)

6、測試

1、mysql資料源

insert into tb1 select 1,'a';
insert into tb1 select 2,'b';
           

2、pgsql目标資料庫

syncdb=# select * from tb1;
 a |     b      
---+------------
 1 | a         
 2 | b         
(2 rows)
           

7、優勢和優點。

1、可以實作多個庫表的彙總功能,syncdb1.tb1/syncdb2.tb1 可以彙總到pgsql的一張表tb1中
2、中間使用kafka消息隊列,對于大資料量性能方面提升較好
3、不存在資料源庫***問題,位點資訊存放在kafka中的topic中
4、第一次啟動debezium,會生成一個資料源資料庫的snapshot,然後之後基于binlog的解析,這樣避免了第一次同步資料源資料庫到目标資料庫的一份全量資料
           

繼續閱讀