天天看點

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

實操記錄

版本:

Flink 1.11.2

iceberg 最新版本

zeppelin

hive 是基于cdh 6.3.2版本下的hive 2.1

注意,iceberg隻支援2.x以上,官網說的。

一,保證Flink on zeppelin 查詢hive沒問題

  1)Zeppelin flink 參數配置:

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作
  • Flink sql 讀寫hive

1)Jar包放入Flink lib下:

   flink-connector-hive_2.11-1.11.2.jar

   hive-exec-2.1.0.jar

2)可能需要導入hadoop包

          flink-hadoop-compatibility_2.11-1.11.2.jar 

   flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar(這要自己編譯)

      flink-shaded-hadoop-2-uber-2.7.5-9.0.jar(實際測試使用這個)

3) 配置環境變量

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

實際配置為:

export HADOOP_CLASSPATH=`hadoop classpath`

export HADOOP_CONF_DIR=/etc/hadoop/conf.cloudera.hdfs

export HADOOP_HOME=/wyyt/software/cloudera/parcels/CDH/lib/hadoop

export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH

export HIVE_CONF_DIR=/etc/hive/conf

export HIVE_HOME=/wyyt/software/cloudera/parcels/CDH/lib/hive

export PATH=$HIVE_HOME/bin:$HIVE_HOME/sbin:$PATH

看一下Flink lib下檔案:

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

測試:

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

二,在zeppelin操作讀寫iceberg

1,導入依賴

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

2,建立catalog

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

hadoop catalog:

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

3,檢視catalog是否成功

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

4,我們建立一個database

CREATE DATABASE iceberg_db;

使用hive iceberg_db庫;

use iceberg_db;

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

5,建立iceberg表

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

6,插入資料

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

任務插入完成:

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

7,查詢資料

官網說查詢可以使用2種方式:

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

實際的zeppelin使用這裡會報錯:

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

正确的方式:

batch方式:

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

streaming方式,一直運作:

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作
CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

8,檢視資料情況

在hdfs上面檢視資料:

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

9,代碼操作

1,代碼

public class Flink2IcebergByHiveCatalog {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
        bsTableEnv.executeSql("show catalogs").print();


        bsTableEnv.executeSql("CREATE CATALOG hive_catalog2  WITH (\n" +
                "'type'='iceberg',\n" +
                "'catalog-type'='hive',\n" +
                "'uri'='thrift://192.168.5.xx:9083',\n" +
                "'clients'='5',\n" +
                "'property-version'='1',\n" +
                "'warehouse'='hdfs://192.168.5.xx:8020/tmp'\n" +
                ")");

        bsTableEnv.executeSql("show catalogs").print();
        bsTableEnv.executeSql("use catalog hive_catalog2");
        bsTableEnv.executeSql("use  iceberg_db");
        bsTableEnv.executeSql("show  tables").print();

        bsTableEnv.executeSql("CREATE TABLE hive_catalog2.iceberg_db.sample3 (\n" +
                "    id BIGINT COMMENT 'unique id',\n" +
                "    data STRING\n" +
                ")");

        //todo 插入資料
        bsTableEnv.executeSql("INSERT INTO hive_catalog2.iceberg_db.sample3 VALUES (1, 'c')");
        bsTableEnv.executeSql("SELECT * FROM sample3").print();



    }
}
           

2,依賴

<properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <flink.version>1.11.2</flink.version>
      <scala.binary.version>2.11</scala.binary.version>
      <scala.version>2.11.12</scala.version>
  </properties>


  <dependencies>


      <!--iceberg-->
      <dependency>
          <groupId>org.apache.iceberg</groupId>
          <artifactId>iceberg-flink-runtime</artifactId>
          <version>0.11.1</version>
          <scope>provided</scope>
      </dependency>

      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>2.7.3</version>
          <scope>provided</scope>
      </dependency>


      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>2.7.3</version>
          <scope>provided</scope>
      </dependency>


      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>


      <dependency>
          <groupId>joda-time</groupId>
          <artifactId>joda-time</artifactId>
          <version>2.10.5</version>
          <scope>provided</scope>
      </dependency>


      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>

      <!--  <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
             <scope>provided</scope>
        </dependency>
-->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-json</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>


      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-core</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>

      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>

      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>

      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>

      </dependency>

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-common</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>


      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>

      </dependency>

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>

      </dependency>


      <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-hive_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>


      <!--第三方的jar-->
      <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.60</version>
          <!--<scope>provided</scope>-->
      </dependency>
      <dependency>
          <groupId>org.apache.hive</groupId>
          <artifactId>hive-exec</artifactId>
          <version>2.1.0</version>
          <scope>provided</scope>
      </dependency>


  </dependencies>
           

10,iceberg檔案解析:

參考:Iceberg檔案解析(https://blog.csdn.net/weixin_47482194/article/details/115676338?spm=1001.2014.3001.5501)

CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作

繼續閱讀