Mac M1 通過VMan安裝Centos7.9,并搭建 Hadoop/Hive/Kafka/Flink/Iceberg 本地進行資料湖測試。
問題:
Paralles Desktop 沒找到免費的,是以用了VM,VM也可以網上找。
Centos7.9官方版本在VM中不成功,是以使用了别人編譯的版本:在m1晶片的MacBook上安裝centos7
JDK使用 yum 安裝 arm64架構的1.8.322版本。
MySQL使用官網下載下傳arm64版本。
大資料相關元件使用官網二進制包。
叢集資訊
主機名 | 内網IP |
datalake-01 | 10.0.0.10 |
datalake-02 | 10.0.0.11 |
datalake-03 | 10.0.0.12 |
配置資訊
CUP | 記憶體 | OS |
4 | 8GB | Centos 7.9 aarch64 |
元件版本
元件 | 版本 |
Java | 1.8.332.aarch64 |
Scala | 2.12.15 |
Hadoop | 3.2.3 |
Zookeeper | 3.5.9 |
Hive | 3.1.3 |
kafka | 3.1.1 |
Flink | 1.14.4 |
Iceberg | 0.13.1 |
MySQL | 8.0.15.aarch64 |
元件資訊
元件 | 服務 | |
Zookeeper | 3節點 | |
Hadoop HA | NameNode | 01,02節點 |
DataNode | 3節點 | |
YARN | ResourceManager | 01,02節點 |
NodeManager | 3節點 | |
Hive | Metastore | 01,02節點 |
Hiveserver2 | 01,02節點 | |
Kafka | Broker | 3節點 |
Flink | JobManager | 01,02節點 |
TaskManager | 3節點 | |
MySQL | 01節點 |
VM虛拟機安裝Centos7
Tabby遠端連接配接虛拟伺服器
Navicat連接配接MySQL
環境變量
Centos7 和 Mac 都配置在這裡,Mac 配置 JDK 和 Maven。
~/.bash_perofile
# java
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.332.b09-1.el7_9.aarch64/jre
# scala
export SCALA_HOME=/opt/apps/scala-2.12.15
# zookeeper
export ZK_HOME=/opt/apps/zookeeper-3.5.9
# hadoop
export HADOOP_HOME=/opt/apps/hadoop-3.2.3
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`
# hive
export HIVE_HOME=/opt/apps/hive-3.1.3
export HIVE_CONF_DIR=$HIVE_HOME/conf
# hadoop
export KAFKA_HOME=/opt/apps/kafka-3.1.1
# maven
export M2_HOME=/opt/apps/maven-3.6.3
# flink
export FLINK_HOME=/opt/apps/flink-1.14.4
export PATH=$PATH:$JAVA_HOME/bin:$SCALA_HOME/bin:$ZK_HOME/bin:$HIVE_HOME/bin:$KAFKA_HOME/bin:$M2_HOME/bin:$FLINK_HOME/bin
sql-client 測試
Flink相關配置
僅對測試環境使用
xecution.checkpointing.interval: 10s
execution.checkpointing.externalized-checkpoint-retention: DELETE_ON_CANCELLATION #[DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0
state.checkpoints.num-retained: 20
execution.checkpointing.mode: EXACTLY_ONCE #[EXACTLY_ONCE, AT_LEAST_ONCE]
execution.checkpointing.timeout: 10min
execution.checkpointing.tolerable-failed-checkpoints: 3
execution.checkpointing.unaligned: false
啟動Flink Standalone叢集
start-cluster.sh
# 停止
stop-cluster.sh
Web
http://datalake-01:8081/
SQL檔案
sql-client-conf.sql
create catalog hive_catalog with (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://datalake-01:9083',
'clients'='5',
'property-version'='2',
'warehouse'='/user/hive/warehouse/'
);
create catalog hadoop_catalog with (
'type' = 'iceberg',
'catalog-type' = 'hadoop',
'property-version' = '2',
'warehouse' = '/user/hive/warehouse/'
);
啟動 sql-client
sql-client.sh -i ../sql-client-conf.sql
show catalogs;
+-----------------+
| catalog name |
+-----------------+
| default_catalog |
| hadoop_catalog |
| hive_catalog |
+-----------------+
測試
# cdc 0.13已支援cdc寫入,但是不支援cdc流讀,即隻支援流讀append資料,不支援流讀update資料
drop table if exists default_catalog.default_database.cdc_source_table;
create table if not exists default_catalog.default_database.cdc_source_table (
id int,
data string,
dt string,
primary key (id) not enforced
) with (
'connector' = 'mysql-cdc',
'hostname' = '10.0.0.10',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test_db',
'table-name' = 'test_tb',
'server-time-zone' = 'Asia/Shanghai'
);
set execution.type = streaming;
set execution.result-mode = tableau;
# streaming 檢視 cdc 表
select * from default_catalog.default_database.cdc_source_table;
# hive catalog
create database hive_catalog.iceberg_db;
drop table hive_catalog.iceberg_db.iceberg_table;
create table if not exists hive_catalog.iceberg_db.iceberg_table(
id bigint comment 'unique id',
data string,
dt string,
primary key (id) not enforced
) comment 'iceberg test table'
partitioned by (dt)
with(
'format-version' = '2',
'write.distribution-mode'='hash',
'write.metadata.delete-after-commit.enabled'='true',
'write.metadata.previous-versions-max'='10'
);
# cdc to iceberg
insert into hive_catalog.iceberg_db.iceberg_table select * from default_catalog.default_database.cdc_source_table;
# batch 模式檢視 iceberg
select * from hadoop_catalog.iceberg_db.iceberg_table;
# streaming 模式檢視 iceberg
set table.dynamic-table-options.enabled = true;
# 有寫入新資料可以流讀,但是update/delete資料暫時不支援流讀,會報錯
select * from hive_catalog.iceberg_db.iceberg_table /*+ options('streaming'='true')*/;
insert into hive_catalog.iceberg_db.iceberg_table values(4, 'e', '2022-05-22');
CDC 到 Upsert-kafka
# cdc
create table if not exists cdc_source_table (
id int,
data string,
dt string,
primary key (id) not enforced
) with (
'connector' = 'mysql-cdc',
'hostname' = '10.0.0.10',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test_db',
'table-name' = 'test_tb',
'server-time-zone' = 'Asia/Shanghai'
);
# upsert-kafka
create table upsert_kafka_table (
id int,
data string,
dt string,
primary key (id) not enforced
) with (
'connector' = 'upsert-kafka',
'topic' = 'test2',
'properties.bootstrap.servers' = 'datalake-01:9092,datalake-02:9092,datalake-03:9092',
'properties.group.id' = 'testGroup',
'key.format' = 'json',
'value.format' = 'json'
);
# cdc to upsert-kafka,修改MySQL中資料可以實時看到主鍵資料的變更
insert into upsert_kafka_table select * from cdc_source_table;
Kafka 到 Upsert-Kafka
# kafka to upsert-kafka
# kafka source 沒有主鍵
create table kafka_source_table (
id int,
data string,
dt string
) with (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'datalake-01:9092,datalake-02:9092,datalake-03:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
# 複用上面 upsert-kafka 表
insert into upsert_kafka_table select * from kafka_source_table;
# 檢視 upsert-kafka 表結果
select * from upsert_kafka_table;
# 向 Kafka 寫入資料,可以看到資料的變更
kafka-console-producer.sh --broker-list datalake-01:9092 --topic test
{"id":1, "data":"a", "dt":"2022-05-22"}
{"id":2, "data":"b", "dt":"2022-05-22"}
{"id":3, "data":"c", "dt":"2022-05-22"}
{"id":4, "data":"d", "dt":"2022-05-22"}
{"id":5, "data":"e", "dt":"2022-05-22"}
{"id":1, "data":"aa", "dt":"2022-05-22"}
{"id":1, "data":"DD", "dt":"2022-05-22"}
+----+-------------+--------------------------------+--------------------------------+
| op | id | data | dt |
+----+-------------+--------------------------------+--------------------------------+
| +I | 1 | a | 2022-05-22 |
| +I | 2 | b | 2022-05-22 |
| -U | 1 | a | 2022-05-22 |
| +U | 1 | aa | 2022-05-22 |
| -U | 1 | aa | 2022-05-22 |
| +U | 1 | DD | 2022-05-22 |
問題
1、Flink 整合 Hadoop3 需要依賴包
flink-shaded-hadoop-3-3.1.1.7.2.9.0-173-9.0.jar
antlr-runtime-3.5.2.jar
commons-cli-1.5.0.jar
2、Flink 導入相關依賴包
flink-sql-connector-hive-3.1.2_2.12-1.14.4.jar
flink-sql-connector-kafka_2.12-1.14.4.jar
flink-sql-connector-mysql-cdc-2.2.1.jar
flink-statebackend-rocksdb_2.12-1.14.4.jar
iceberg-flink-runtime-1.14-0.13.1.jar
3、Hadoop3.2.3 的 guava 比 Hive 的版本高,拷貝到 Hive 的 lib 下
cp $HADOOP_HOME/share/hadoop/common/lib/guava-27.0-jre.jar $HIVE_HOME/lib/
mv $HIVE_HOME/lib/guava-19.0.jar $HIVE_HOME/lib/guava-19.0.jar.bak
4、HADOOP_CLASSPATH問題
在導入HADOOP_HOME,并加入PATH後再導入
export HADOOP_CLASSPAT=`hadoop classpath`
本地測試
類型 | 版本 |
IDEA | 2022.1.0 |
Maven | 3.6.3 |
Java | 1.8.332 |
Scala | 2.12.15 |
Flink | 1.14.4 |
拷貝配置檔案
log4j 配置檔案
log4j.rootLogger=info, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
#log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p - %m%n
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-10c %x - %m%n
測試代碼
package com.jt.test
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
import org.apache.log4j.{Level, Logger}
import org.slf4j.LoggerFactory
object FlinkCdcTest {
private var logger: org.slf4j.Logger = _
def main(args: Array[String]): Unit = {
logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
Logger.getLogger("org.apache").setLevel(Level.INFO)
Logger.getLogger("hive.metastore").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tableEnv = TableEnvironment.create(settings)
// hive catalog
val catalogDDL =
"""
|create catalog hive_catalog with (
| 'type' = 'iceberg',
| 'catalog-type' = 'hive',
| 'uri' = 'thrift://datalake-01:9083',
| 'clients' = '5',
| 'property-version' = '1',
| 'warehouse' = 'hdfs://nameservice1/user/hive/warehouse/'
|)
|""".stripMargin
tableEnv.executeSql(catalogDDL)
// iceberg sink table
val databaseDDL = "create database if not exists iceberg_db"
tableEnv.executeSql(databaseDDL)
// val tableDDL =
// """
// |drop table hive_catalog.iceberg_db.iceberg_table;
// |create table if not exists hive_catalog.iceberg_db.iceberg_table(
// | id bigint comment 'unique id',
// | data string,
// | dt string,
// | primary key (id) not enforced
// |) comment 'iceberg test table'
// | partitioned by (dt)
// | with(
// | 'format-version' = '2',
// | 'write.distribution-mode'='hash',
// | 'write.metadata.delete-after-commit.enabled'='true',
// | 'write.metadata.previous-versions-max'='10'
// |);
// """.stripMargin
// tableEnv.executeSql(tableDDL)
//
//
// // cdc source
// val cdcSourceDDL1 = "drop table if exists default_catalog.default_database.cdc_source_table"
// tableEnv.executeSql(cdcSourceDDL1)
//
// val cdcSourceDDL =
// """
// |create table if not exists default_catalog.default_database.cdc_source_table (
// | id int,
// | data string,
// | dt string,
// | primary key (id) not enforced
// |) with (
// | 'connector' = 'mysql-cdc',
// | 'hostname' = '10.0.0.10',
// | 'port' = '3306',
// | 'username' = 'root',
// | 'password' = '123456',
// | 'database-name' = 'test_db',
// | 'table-name' = 'test_tb',
// | 'server-time-zone' = 'Asia/Shanghai'
// |)
// """.stripMargin
// tableEnv.executeSql(cdcSourceDDL)
//
// // cdc to iceberg
// val cdcToIcebergDML =
// """
// |insert into hive_catalog.iceberg_db.iceberg_table
// |select * from default_catalog.default_database.cdc_source_table
// |""".stripMargin
// tableEnv.executeSql(cdcToIcebergDML)
// batch read iceberg
// val showIceberg = "select * from hive_catalog.iceberg_db.iceberg_table"
// tableEnv.executeSql(showIceberg).print()
// kafka source
// val kafkaSourceDDL =
// """
// |create table kafka_source_table (
// | id int,
// | data string,
// | dt string
// |) with (
// | 'connector' = 'kafka',
// | 'topic' = 'test',
// | 'properties.bootstrap.servers' = 'datalake-01:9092,datalake-02:9092,datalake-03:9092',
// | 'properties.group.id' = 'testGroup',
// | 'format' = 'json',
// | 'scan.startup.mode' = 'earliest-offset',
// | 'json.fail-on-missing-field' = 'false',
// | 'json.ignore-parse-errors' = 'true'
// |)
// """.stripMargin
// tableEnv.executeSql(kafkaSourceDDL)
// upsert kafka
val upsertKafkaSourceDDL =
"""
|create table upsert_kafka_table (
| id int,
| data string,
| dt string,
| primary key (id) not enforced
|) with (
| 'connector' = 'upsert-kafka',
| 'topic' = 'test2',
| 'properties.bootstrap.servers' = 'datalake-01:9092,datalake-02:9092,datalake-03:9092',
| 'properties.group.id' = 'testGroup',
| 'properties.scan.startup.mode' = 'earliest-offset',
| 'key.format' = 'json',
| 'key.json.ignore-parse-errors' = 'true',
| 'key.json.fail-on-missing-field' = 'false',
| 'value.format' = 'json',
| 'value.json.fail-on-missing-field' = 'false'
|)
""".stripMargin
tableEnv.executeSql(upsertKafkaSourceDDL)
tableEnv.executeSql("select * from upsert_kafka_table").print()
}
}
pom 依賴
UTF-8
1.8
1.8
3.2.2
3.8.1
3.1.1
1.8
2.12.15
2.12
3.2.3
1.14.4
2.2.1
0.13.1
1.14
3.1.3
compile
org.apache.flink
flink-runtime-web_${scala.binary.version}
${flink.version}
${scope.type}
org.apache.flink
flink-core
${flink.version}
${scope.type}
org.apache.flink
flink-scala_${scala.binary.version}
${flink.version}
${scope.type}
org.apache.flink
flink-table-common
${flink.version}
${scope.type}
org.apache.flink
flink-table-api-scala-bridge_${scala.binary.version}
${flink.version}
${scope.type}
org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}
${scope.type}
org.apache.flink
flink-table-planner_${scala.binary.version}
${flink.version}
${scope.type}
org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}
${scope.type}
org.apache.flink
flink-csv
${flink.version}
${scope.type}
org.apache.flink
flink-json
${flink.version}
${scope.type}
org.apache.flink
flink-orc_${scala.binary.version}
${flink.version}
${scope.type}
org.apache.flink
flink-statebackend-rocksdb_${scala.binary.version}
${flink.version}
${scope.type}
org.apache.flink
flink-sql-connector-kafka_${scala.binary.version}
${flink.version}
${scope.type}
org.apache.flink
flink-connector-hive_${scala.binary.version}
${flink.version}
com.google.guava
guava
${scope.type}
com.ververica
flink-sql-connector-mysql-cdc
${flink.cdc.version}
${scope.type}
org.apache.iceberg
iceberg-flink-runtime-${iceberg.flink.version}
${iceberg.version}
${scope.type}
org.apache.flink
flink-statebackend-rocksdb_${scala.binary.version}
${flink.version}
${scope.type}
commons-cli
commons-cli
1.5.0
org.apache.hadoop
hadoop-common
${hadoop.version}
org.apache.commons
commons-math3
${scope.type}
org.apache.hadoop
hadoop-hdfs
${hadoop.version}
${scope.type}
org.apache.hadoop
hadoop-client
${hadoop.version}
${scope.type}
org.apache.hive
hive-exec
${hive.version}
${scope.type}
org.apache.logging.log4j
log4j-slf4j-impl
com.google.guava
guava
org.antlr
antlr-runtime
3.5.2
net.alchim31.maven
scala-maven-plugin
${scala.maven.plugin.version}
compile
org.apache.maven.plugins
maven-assembly-plugin
${maven.assembly.plugin.version}
jar-with-dependencies
make-assembly
package
single