實操記錄
版本:
Flink 1.11.2
iceberg 最新版本
zeppelin
hive 是基于cdh 6.3.2版本下的hive 2.1
注意,iceberg隻支援2.x以上,官網說的。
一,保證Flink on zeppelin 查詢hive沒問題
1)Zeppelin flink 參數配置:

- Flink sql 讀寫hive
1)Jar包放入Flink lib下:
flink-connector-hive_2.11-1.11.2.jar
hive-exec-2.1.0.jar
2)可能需要導入hadoop包
flink-hadoop-compatibility_2.11-1.11.2.jar
flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar(這要自己編譯)
flink-shaded-hadoop-2-uber-2.7.5-9.0.jar(實際測試使用這個)
3) 配置環境變量
實際配置為:
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=/etc/hadoop/conf.cloudera.hdfs
export HADOOP_HOME=/wyyt/software/cloudera/parcels/CDH/lib/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
export HIVE_CONF_DIR=/etc/hive/conf
export HIVE_HOME=/wyyt/software/cloudera/parcels/CDH/lib/hive
export PATH=$HIVE_HOME/bin:$HIVE_HOME/sbin:$PATH
看一下Flink lib下檔案:
測試:
二,在zeppelin操作讀寫iceberg
1,導入依賴
2,建立catalog
hadoop catalog:
3,檢視catalog是否成功
4,我們建立一個database
CREATE DATABASE iceberg_db;
使用hive iceberg_db庫;
use iceberg_db;
5,建立iceberg表
6,插入資料
任務插入完成:
7,查詢資料
官網說查詢可以使用2種方式:
實際的zeppelin使用這裡會報錯:
正确的方式:
batch方式:
streaming方式,一直運作:
![]()
CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作
![]()
CDH叢集下,Flink+hive+iceberg+zeppelin實踐—01實操記錄9,代碼操作
8,檢視資料情況
在hdfs上面檢視資料:
9,代碼操作
1,代碼
public class Flink2IcebergByHiveCatalog { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings); bsTableEnv.executeSql("show catalogs").print(); bsTableEnv.executeSql("CREATE CATALOG hive_catalog2 WITH (\n" + "'type'='iceberg',\n" + "'catalog-type'='hive',\n" + "'uri'='thrift://192.168.5.xx:9083',\n" + "'clients'='5',\n" + "'property-version'='1',\n" + "'warehouse'='hdfs://192.168.5.xx:8020/tmp'\n" + ")"); bsTableEnv.executeSql("show catalogs").print(); bsTableEnv.executeSql("use catalog hive_catalog2"); bsTableEnv.executeSql("use iceberg_db"); bsTableEnv.executeSql("show tables").print(); bsTableEnv.executeSql("CREATE TABLE hive_catalog2.iceberg_db.sample3 (\n" + " id BIGINT COMMENT 'unique id',\n" + " data STRING\n" + ")"); //todo 插入資料 bsTableEnv.executeSql("INSERT INTO hive_catalog2.iceberg_db.sample3 VALUES (1, 'c')"); bsTableEnv.executeSql("SELECT * FROM sample3").print(); } }
2,依賴
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.11.2</flink.version> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.12</scala.version> </properties> <dependencies> <!--iceberg--> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink-runtime</artifactId> <version>0.11.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.10.5</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!--第三方的jar--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.60</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.0</version> <scope>provided</scope> </dependency> </dependencies>
10,iceberg檔案解析:
參考:Iceberg檔案解析(https://blog.csdn.net/weixin_47482194/article/details/115676338?spm=1001.2014.3001.5501)