1、Bireme簡介。
Bireme 是一個 Greenplum / HashData 資料倉庫的增量同步工具。目前支援 MySQL、PostgreSQL 和 MongoDB 資料源
文檔官方:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md
Bireme工作原理
Bireme 從資料源讀取資料 (Record),将其轉化為内部格式 (Row) 并緩存,當緩存資料達到一定量,将這些資料合并為一個任務 (Task),每個任務包含兩個集合,delete 集合與insert 集合,最後把這些資料更新到目标資料庫。
每個資料源可以有多個 pipeline,對于 maxwell,每個 Kafka partition 對應一個 pipeline;對于 debezium,每個 Kafka topic 對應一個 pipeline。
1、資料流
Bireme 采用 DELETE + COPY 的方式,将資料源的修改記錄同步到 Greenplum / HashData ,相較于INSERT + UPDATE + DELETE的方式,COPY 方式速度更快,性能更優
2、資料源
2.1、Maxwell + Kafka 是 bireme 目前支援的一種資料源類型,架構如下圖:
Maxwell 是一個 MySQL binlog 的讀取工具,它可以實時讀取 MySQL 的 binlog,并生成 JSON 格式的消息,作為生産者發送給 Kafka
2.2、Debezium + Kafka 是 bireme 支援的另外一種資料源類型,架構如下圖:
Debezium 是一個CDC工具,可以将資料庫的增删改轉換為事件流,并把這些修改發送給 Kafka
3、工作原理
Bireme 從資料源讀取資料 (Record),将其轉化為内部格式 (Row) 并緩存,當緩存資料達到一定量,将這些資料合并為一個任務 (Task),每個任務包含兩個集合,delete 集合與insert 集合,最後把這些資料更新到目标資料庫。
每個資料源可以有多個 pipeline,對于 maxwell,每個 Kafka partition 對應一個 pipeline;對于 debezium,每個 Kafka topic 對應一個 pipeline
4、本文搭建執行個體圖形
2、配置相關資料源、目标資料源和java環境。
1、mysql資料源
1、資料庫,create database syncdb1;
2、使用者權限,需要擁有SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT,此處使用root權限
3、同步的表(切換到syncdb1資料庫),create table tb1(a int, b char(10), primary key(a));
2、pgsql目的資料庫
1、使用者,create user syncdb with password 'syncdb';
2、資料庫,create database syncdb with owner 'syncdb';
3、同步的表(使用syncdb使用者切換到syncdb資料庫),create table tb1(a int, b char(10), primary key(a));
3、java環境的安裝
1、下載下傳二進制安裝包:jdk-8u101-linux-x64.tar.gz
2、解壓二進制包并做軟連結:tar xf jdk-8u101-linux-x64.tar.gz && ln -s /data/jdk1.8.0_101 /usr/java
3、配置路徑和java環境變量:vim /etc/profile.d/java.sh
export JAVA_HOME=/usr/java
export JRE_HOME=$JAVA_HOME/jre
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
4、source生效:source /etc/profile.d/java.sh
5、安裝jsvc,yum install jsvc
3、kafka的安裝和啟動配置
1、下載下傳位址:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
2、kafka官方文檔:http://kafka.apache.org/
3、解壓縮:tar xf kafka_2.11-2.0.0.tgz && cd kafka_2.11-2.0.0
4、ZooKeeper
啟動,bin/zookeeper-server-start.sh config/zookeeper.properties
關閉,bin/zookeeper-server-stop.sh config/zookeeper.properties
5、Kafka server
啟動,bin/kafka-server-start.sh config/server.properties
啟動,bin/kafka-server-stop.sh config/server.properties
6、Topic(不是本實驗必須的,作為學習使用)
建立,bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic world
查詢,bin/kafka-topics.sh --list --zookeeper localhost:2181
删除,bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic world
7、Producer(不是本實驗必須的,作為學習使用)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello
>jiaming
>
8、Consumer(不是本實驗必須的,作為學習使用)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
jiaming
4、debezium的安裝和啟動配置
下載下傳debezium的mysql連接配接器
1、下載下傳位址:https://debezium.io/docs/install/
2、debezium官方文檔:https://debezium.io/docs/tutorial/
3、解壓縮:tar xf debezium-connector-mysql-0.8.1.Final-plugin.tar.gz
4、解壓出來的jar包全部拷貝到kafka libs目錄下,cp debezium-connector-mysql/.jar kafka2.11-2.0.0/libs/
5、添加配置檔案(用于連接配接mysql資料源,對應參數可參考官方介紹:https://debezium.io/docs/connectors/mysql/#example-configuration )
cd kafka_2.11-2.0.0 && vim mysql.properties
note:debezium的database.server.name一定要和bireme的data_source保持一緻
name=inventory-connector
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=118.190.209.102
database.port=5700
database.user=root
database.password=123456
database.server.id=129129
database.server.name=debezium1 # debezium的database.server.name一定要和bireme的data_source保持一緻
database.whitelist=syncdb1 # 同步的資料庫清單
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=dbhistory.debezium1
include.schema.changes=true
6、以獨立模式啟動kafka connect,此時debezium會對資料庫中的每一個表建立一個topic,消費相應的topic,即可擷取binlog解析資訊
cd kafka_2.11-2.0.0
bin/connect-standalone.sh config/connect-standalone.properties mysql.properties
7、檢視topic清單
cd kafka_2.11-2.0.0
bin/kafka-topics.sh --list --zookeeper localhost:2181
debezium1.syncdb1.tb1,每個資料源同步表會生成一個topic
debezium1,記錄ddl操作
dbhistory.debezium1,記錄對應ddl操作和position位點資訊
5、bireme的安裝和啟動配置
1、下載下傳位址:https://github.com/HashDataInc/bireme/releases
2、bireme官方文檔:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md
3、解壓縮:tar xf bireme-1.0.0.tar.gz && cd bireme-1.0.0
4、修改配置檔案,vim etc/config.properties
note:debezium的database.server.name一定要和bireme的data_source保持一緻
# target database where the data will sync into.
target.url = jdbc:postgresql://118.190.209.102:5432/syncdb
target.user = syncdb
target.passwd = syncdb
# data source name list, separated by comma.
data_source = debezium1 # debezium的database.server.name一定要和bireme的data_source保持一緻
# data source "debezium1"
debezium1.type = debezium
# kafka server which debezium write into.
debezium1.kafka.server = 127.0.0.1:9092
# kafka groupid used for consumer.
debezium1.kafka.groupid = bireme
debezium1.kafka.namespace = debezium1
# set the IP address for bireme state server.
state.server.addr = 0.0.0.0
# set the port for bireme state server.
state.server.port = 8080
5、修改配置檔案,vim etc/debezium1.properties(表映射配置)
note:debezium1.properties的debezium1一定要和bireme的data_source保持一緻
# source table full name = target table full name
syncdb1.tb1 = public.tb1
6、啟動bireme,bin/bireme start
7、監控,http://192.168.1.129:8080/pretty (state.server.addr:state.server.port)
6、測試
1、mysql資料源
insert into tb1 select 1,'a';
insert into tb1 select 2,'b';
2、pgsql目标資料庫
syncdb=# select * from tb1;
a | b
---+------------
1 | a
2 | b
(2 rows)
7、優勢和優點。
1、可以實作多個庫表的彙總功能,syncdb1.tb1/syncdb2.tb1 可以彙總到pgsql的一張表tb1中
2、中間使用kafka消息隊列,對于大資料量性能方面提升較好
3、不存在資料源庫***問題,位點資訊存放在kafka中的topic中
4、第一次啟動debezium,會生成一個資料源資料庫的snapshot,然後之後基于binlog的解析,這樣避免了第一次同步資料源資料庫到目标資料庫的一份全量資料