天天看點

Mac M1 VM Centos7 大資料資料湖測試

Mac M1 通過VMan安裝Centos7.9,并搭建 Hadoop/Hive/Kafka/Flink/Iceberg 本地進行資料湖測試。

問題:

Paralles Desktop 沒找到免費的,是以用了VM,VM也可以網上找。

Centos7.9官方版本在VM中不成功,是以使用了别人編譯的版本:​​在m1晶片的MacBook上安裝centos7​​

JDK使用 yum 安裝 arm64架構的1.8.322版本。

MySQL使用官網下載下傳arm64版本。

大資料相關元件使用官網二進制包。

叢集資訊

主機名 内網IP
datalake-01 10.0.0.10
datalake-02 10.0.0.11
datalake-03 10.0.0.12

配置資訊

CUP 記憶體 OS
4 8GB Centos 7.9 aarch64

元件版本

元件 版本
Java 1.8.332.aarch64
Scala 2.12.15
Hadoop 3.2.3
Zookeeper 3.5.9
Hive 3.1.3
kafka 3.1.1
Flink 1.14.4
Iceberg 0.13.1
MySQL 8.0.15.aarch64

元件資訊

元件 服務
Zookeeper 3節點
Hadoop HA NameNode 01,02節點
DataNode 3節點
YARN ResourceManager 01,02節點
NodeManager 3節點
Hive Metastore 01,02節點
Hiveserver2 01,02節點
Kafka Broker 3節點
Flink JobManager 01,02節點
TaskManager 3節點
MySQL 01節點

VM虛拟機安裝Centos7

Tabby遠端連接配接虛拟伺服器

Navicat連接配接MySQL

環境變量

Centos7 和 Mac 都配置在這裡,Mac 配置 JDK 和 Maven。

~/.bash_perofile
      
# java
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.332.b09-1.el7_9.aarch64/jre
# scala
export SCALA_HOME=/opt/apps/scala-2.12.15
# zookeeper
export ZK_HOME=/opt/apps/zookeeper-3.5.9
# hadoop
export HADOOP_HOME=/opt/apps/hadoop-3.2.3
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`
# hive
export HIVE_HOME=/opt/apps/hive-3.1.3
export HIVE_CONF_DIR=$HIVE_HOME/conf
# hadoop
export KAFKA_HOME=/opt/apps/kafka-3.1.1
# maven
export M2_HOME=/opt/apps/maven-3.6.3
# flink
export FLINK_HOME=/opt/apps/flink-1.14.4

export PATH=$PATH:$JAVA_HOME/bin:$SCALA_HOME/bin:$ZK_HOME/bin:$HIVE_HOME/bin:$KAFKA_HOME/bin:$M2_HOME/bin:$FLINK_HOME/bin      

sql-client 測試

Flink相關配置

僅對測試環境使用

xecution.checkpointing.interval: 10s
execution.checkpointing.externalized-checkpoint-retention: DELETE_ON_CANCELLATION #[DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0
state.checkpoints.num-retained: 20
execution.checkpointing.mode: EXACTLY_ONCE  #[EXACTLY_ONCE, AT_LEAST_ONCE]
execution.checkpointing.timeout: 10min
execution.checkpointing.tolerable-failed-checkpoints: 3
execution.checkpointing.unaligned: false      

啟動Flink Standalone叢集

start-cluster.sh

# 停止
stop-cluster.sh
      

Web

​​http://datalake-01:8081/​​

SQL檔案

sql-client-conf.sql

create catalog hive_catalog with (
          'type'='iceberg',
          'catalog-type'='hive',
          'uri'='thrift://datalake-01:9083',
          'clients'='5',
          'property-version'='2',
          'warehouse'='/user/hive/warehouse/'
);

create catalog hadoop_catalog with (
        'type' = 'iceberg',
        'catalog-type' = 'hadoop',
        'property-version' = '2',
        'warehouse' = '/user/hive/warehouse/'
);      

啟動 sql-client

sql-client.sh -i ../sql-client-conf.sql      
show catalogs;

+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
|  hadoop_catalog |
|    hive_catalog |
+-----------------+      

測試

# cdc 0.13已支援cdc寫入,但是不支援cdc流讀,即隻支援流讀append資料,不支援流讀update資料
drop table if exists default_catalog.default_database.cdc_source_table;
create table if not exists default_catalog.default_database.cdc_source_table (
  id int,
  data string,
  dt string,
  primary key (id) not enforced
) with (
 'connector' = 'mysql-cdc',
 'hostname' = '10.0.0.10',
 'port' = '3306',
 'username' = 'root',
 'password' = '123456',
 'database-name' = 'test_db',
 'table-name' = 'test_tb',
 'server-time-zone' = 'Asia/Shanghai'
);

set execution.type = streaming;
set execution.result-mode = tableau;

# streaming 檢視 cdc 表
select * from default_catalog.default_database.cdc_source_table;

# hive catalog
create database hive_catalog.iceberg_db;

drop table hive_catalog.iceberg_db.iceberg_table;
create table if not exists hive_catalog.iceberg_db.iceberg_table(
    id bigint comment 'unique id',
    data string,
    dt string,
    primary key (id) not enforced
) comment 'iceberg test table'
 partitioned by (dt)
 with(
 'format-version' = '2',
 'write.distribution-mode'='hash',
 'write.metadata.delete-after-commit.enabled'='true',
 'write.metadata.previous-versions-max'='10'
);

# cdc to iceberg
insert into hive_catalog.iceberg_db.iceberg_table select * from default_catalog.default_database.cdc_source_table;

# batch 模式檢視 iceberg
select * from hadoop_catalog.iceberg_db.iceberg_table;

# streaming 模式檢視 iceberg
set table.dynamic-table-options.enabled = true;

# 有寫入新資料可以流讀,但是update/delete資料暫時不支援流讀,會報錯 
select * from hive_catalog.iceberg_db.iceberg_table /*+ options('streaming'='true')*/;

insert into hive_catalog.iceberg_db.iceberg_table values(4, 'e', '2022-05-22');      

CDC 到 Upsert-kafka

# cdc
create table if not exists cdc_source_table (
  id int,
  data string,
  dt string,
  primary key (id) not enforced
) with (
 'connector' = 'mysql-cdc',
 'hostname' = '10.0.0.10',
 'port' = '3306',
 'username' = 'root',
 'password' = '123456',
 'database-name' = 'test_db',
 'table-name' = 'test_tb',
 'server-time-zone' = 'Asia/Shanghai'
);

# upsert-kafka
create table upsert_kafka_table (
  id int,
  data string,
  dt string,
  primary key (id) not enforced
) with (
 'connector' = 'upsert-kafka',
 'topic' = 'test2',
 'properties.bootstrap.servers' = 'datalake-01:9092,datalake-02:9092,datalake-03:9092',
 'properties.group.id' = 'testGroup',
 'key.format' = 'json',
 'value.format' = 'json'
);

# cdc to upsert-kafka,修改MySQL中資料可以實時看到主鍵資料的變更
insert into upsert_kafka_table select * from cdc_source_table;      

Kafka 到 Upsert-Kafka

# kafka to upsert-kafka
# kafka source 沒有主鍵
create table kafka_source_table (
  id int,
  data string,
  dt string
) with (
  'connector' = 'kafka',
  'topic' = 'test',
  'properties.bootstrap.servers' = 'datalake-01:9092,datalake-02:9092,datalake-03:9092',
  'properties.group.id' = 'testGroup',
  'format' = 'json',
  'scan.startup.mode' = 'earliest-offset',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
);

# 複用上面 upsert-kafka 表
insert into upsert_kafka_table select * from kafka_source_table;

# 檢視 upsert-kafka 表結果
select * from upsert_kafka_table;

# 向 Kafka 寫入資料,可以看到資料的變更
kafka-console-producer.sh --broker-list datalake-01:9092 --topic test

{"id":1, "data":"a", "dt":"2022-05-22"}
{"id":2, "data":"b", "dt":"2022-05-22"}
{"id":3, "data":"c", "dt":"2022-05-22"}
{"id":4, "data":"d", "dt":"2022-05-22"}
{"id":5, "data":"e", "dt":"2022-05-22"}

{"id":1, "data":"aa", "dt":"2022-05-22"}

{"id":1, "data":"DD", "dt":"2022-05-22"}

+----+-------------+--------------------------------+--------------------------------+
| op |          id |                           data |                             dt |
+----+-------------+--------------------------------+--------------------------------+
| +I |           1 |                              a |                     2022-05-22 |
| +I |           2 |                              b |                     2022-05-22 |
| -U |           1 |                              a |                     2022-05-22 |
| +U |           1 |                             aa |                     2022-05-22 |
| -U |           1 |                             aa |                     2022-05-22 |
| +U |           1 |                             DD |                     2022-05-22 |      

問題

1、Flink 整合 Hadoop3 需要依賴包

flink-shaded-hadoop-3-3.1.1.7.2.9.0-173-9.0.jar
antlr-runtime-3.5.2.jar
commons-cli-1.5.0.jar      

2、Flink 導入相關依賴包

flink-sql-connector-hive-3.1.2_2.12-1.14.4.jar
flink-sql-connector-kafka_2.12-1.14.4.jar
flink-sql-connector-mysql-cdc-2.2.1.jar
flink-statebackend-rocksdb_2.12-1.14.4.jar
iceberg-flink-runtime-1.14-0.13.1.jar      

3、Hadoop3.2.3 的 guava 比 Hive 的版本高,拷貝到 Hive 的 lib 下

cp $HADOOP_HOME/share/hadoop/common/lib/guava-27.0-jre.jar $HIVE_HOME/lib/
mv $HIVE_HOME/lib/guava-19.0.jar $HIVE_HOME/lib/guava-19.0.jar.bak      

4、HADOOP_CLASSPATH問題

在導入HADOOP_HOME,并加入PATH後再導入

export HADOOP_CLASSPAT=`hadoop classpath`      

本地測試

類型 版本
IDEA 2022.1.0
Maven 3.6.3
Java 1.8.332
Scala 2.12.15
Flink 1.14.4

拷貝配置檔案

log4j 配置檔案

log4j.rootLogger=info, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
#log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p - %m%n
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-10c %x - %m%n      

測試代碼

package com.jt.test

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
import org.apache.log4j.{Level, Logger}
import org.slf4j.LoggerFactory

object FlinkCdcTest {
  private var logger: org.slf4j.Logger = _

  def main(args: Array[String]): Unit = {
    logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
    Logger.getLogger("org.apache").setLevel(Level.INFO)
    Logger.getLogger("hive.metastore").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()

    val tableEnv = TableEnvironment.create(settings)

    // hive catalog
    val catalogDDL =
      """
        |create catalog hive_catalog with (
        | 'type' = 'iceberg',
        | 'catalog-type' = 'hive',
        | 'uri' = 'thrift://datalake-01:9083',
        | 'clients' = '5',
        | 'property-version' = '1',
        | 'warehouse' = 'hdfs://nameservice1/user/hive/warehouse/'
        |)
        |""".stripMargin
    tableEnv.executeSql(catalogDDL)

    // iceberg sink table
    val databaseDDL = "create database if not exists iceberg_db"
    tableEnv.executeSql(databaseDDL)

//    val tableDDL =
//      """
//        |drop table hive_catalog.iceberg_db.iceberg_table;
//        |create table if not exists hive_catalog.iceberg_db.iceberg_table(
//        |    id bigint comment 'unique id',
//        |    data string,
//        |    dt string,
//        |    primary key (id) not enforced
//        |) comment 'iceberg test table'
//        | partitioned by (dt)
//        | with(
//        | 'format-version' = '2',
//        | 'write.distribution-mode'='hash',
//        | 'write.metadata.delete-after-commit.enabled'='true',
//        | 'write.metadata.previous-versions-max'='10'
//        |);
//       """.stripMargin
//    tableEnv.executeSql(tableDDL)
//
//
//    // cdc source
//    val cdcSourceDDL1 = "drop table if exists default_catalog.default_database.cdc_source_table"
//    tableEnv.executeSql(cdcSourceDDL1)
//
//    val cdcSourceDDL =
//      """
//        |create table if not exists default_catalog.default_database.cdc_source_table (
//        |  id int,
//        |  data string,
//        |  dt string,
//        |  primary key (id) not enforced
//        |) with (
//        | 'connector' = 'mysql-cdc',
//        | 'hostname' = '10.0.0.10',
//        | 'port' = '3306',
//        | 'username' = 'root',
//        | 'password' = '123456',
//        | 'database-name' = 'test_db',
//        | 'table-name' = 'test_tb',
//        | 'server-time-zone' = 'Asia/Shanghai'
//        |)
//      """.stripMargin
//    tableEnv.executeSql(cdcSourceDDL)
//
//    // cdc to iceberg
//    val cdcToIcebergDML =
//      """
//        |insert into hive_catalog.iceberg_db.iceberg_table
//        |select * from default_catalog.default_database.cdc_source_table
//        |""".stripMargin
//    tableEnv.executeSql(cdcToIcebergDML)

    // batch read iceberg
//    val showIceberg = "select * from hive_catalog.iceberg_db.iceberg_table"
//    tableEnv.executeSql(showIceberg).print()


    // kafka source
//    val kafkaSourceDDL =
//      """
//        |create table kafka_source_table (
//        | id int,
//        | data string,
//        | dt string
//        |) with (
//        |  'connector' = 'kafka',
//        |  'topic' = 'test',
//        |  'properties.bootstrap.servers' = 'datalake-01:9092,datalake-02:9092,datalake-03:9092',
//        |  'properties.group.id' = 'testGroup',
//        |  'format' = 'json',
//        |  'scan.startup.mode' = 'earliest-offset',
//        |  'json.fail-on-missing-field' = 'false',
//        |  'json.ignore-parse-errors' = 'true'
//        |)
//      """.stripMargin
//    tableEnv.executeSql(kafkaSourceDDL)

    // upsert kafka
    val upsertKafkaSourceDDL =
      """
        |create table upsert_kafka_table (
        | id int,
        | data string,
        | dt string,
        | primary key (id) not enforced
        |) with (
        |  'connector' = 'upsert-kafka',
        |  'topic' = 'test2',
        |  'properties.bootstrap.servers' = 'datalake-01:9092,datalake-02:9092,datalake-03:9092',
        |  'properties.group.id' = 'testGroup',
        |  'properties.scan.startup.mode' = 'earliest-offset',
        |  'key.format' = 'json',
        |  'key.json.ignore-parse-errors' = 'true',
        |  'key.json.fail-on-missing-field' = 'false',
        |  'value.format' = 'json',
        |  'value.json.fail-on-missing-field' = 'false'
        |)
      """.stripMargin
    tableEnv.executeSql(upsertKafkaSourceDDL)

    tableEnv.executeSql("select * from upsert_kafka_table").print()
  }
}      

pom 依賴

UTF-8

        1.8
        1.8
        
        3.2.2
        3.8.1
        3.1.1
        
        1.8
        2.12.15
        2.12
        
        3.2.3
        1.14.4
        2.2.1
        0.13.1
        1.14
        3.1.3

        
        compile
    

    

        
        
            org.apache.flink
            flink-runtime-web_${scala.binary.version}
            ${flink.version}
            ${scope.type}
        
        
            org.apache.flink
            flink-core
            ${flink.version}
            ${scope.type}
        
        
            org.apache.flink
            flink-scala_${scala.binary.version}
            ${flink.version}
            ${scope.type}
        
        
            org.apache.flink
            flink-table-common
            ${flink.version}
            ${scope.type}
        
        
            org.apache.flink
            flink-table-api-scala-bridge_${scala.binary.version}
            
            ${flink.version}
            ${scope.type}
        
        
            org.apache.flink
            flink-streaming-scala_${scala.binary.version}
            
            ${flink.version}
            ${scope.type}
        
        
            org.apache.flink
            flink-table-planner_${scala.binary.version}
            ${flink.version}
            ${scope.type}
        
        
            org.apache.flink
            flink-clients_${scala.binary.version}
            ${flink.version}
            ${scope.type}
        
        
            org.apache.flink
            flink-csv
            ${flink.version}
            ${scope.type}
        
        
            org.apache.flink
            flink-json
            ${flink.version}
            ${scope.type}
        
        
            org.apache.flink
            flink-orc_${scala.binary.version}
            ${flink.version}
            ${scope.type}
        
        
            org.apache.flink
            flink-statebackend-rocksdb_${scala.binary.version}
            
            ${flink.version}
            ${scope.type}
        
        
            org.apache.flink
            flink-sql-connector-kafka_${scala.binary.version}
            ${flink.version}
            ${scope.type}
        
        
            org.apache.flink
            flink-connector-hive_${scala.binary.version}
            ${flink.version}
            
                
                    com.google.guava
                    guava
                
            
            ${scope.type}
        
        
            com.ververica
            flink-sql-connector-mysql-cdc
            ${flink.cdc.version}
            ${scope.type}
        
        
            org.apache.iceberg
            iceberg-flink-runtime-${iceberg.flink.version}
            ${iceberg.version}
            ${scope.type}
        
        
            org.apache.flink
            flink-statebackend-rocksdb_${scala.binary.version}
            ${flink.version}
            ${scope.type}
        
        
            commons-cli
            commons-cli
            1.5.0
        
        
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
            
                
                    org.apache.commons
                    commons-math3
                
            
            ${scope.type}
        
        
            org.apache.hadoop
            hadoop-hdfs
            ${hadoop.version}
            ${scope.type}
        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
            ${scope.type}
        
        
        
            org.apache.hive
            hive-exec
            ${hive.version}
            ${scope.type}
            
                
                    org.apache.logging.log4j
                    log4j-slf4j-impl
                
                
                    com.google.guava
                    guava
                
            
        
        
            org.antlr
            antlr-runtime
            3.5.2
        
    
    
        

            

                net.alchim31.maven
                scala-maven-plugin
                ${scala.maven.plugin.version}
                
                    
                        
                            compile
                        
                    
                
            

            

                org.apache.maven.plugins
                maven-assembly-plugin
                ${maven.assembly.plugin.version}
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        
package

                        
                            single
                        
                    
                
            

        

          

繼續閱讀