1.主題
基于flink,kafka,hive搭建實時數倉(flink-sql版本)

點選圖檔連結,從零開始學習大資料
2.适用讀者對象
本文适用于hadoop大資料Flink進階學員
3.基礎環境資訊
軟體 | 版本 | 部署路徑 |
haoop | 2.6.0-cdh5.14.4 | cdh預設安裝 |
flink | flink-1.11.1 | /home/opt/flink/flink-1.11.1 |
kafka | kafka_2.11-2.2.1-kafka-4.1.0 | cdh預設安裝 |
4.主題描述
随着大資料業務實時性要求的不斷提高,實時的業務越來越多,事件化的資料源也越來越多,實時處理從次要部分變成了主要部分,傳統的離線數倉很難滿足資料實時需求,于是,實時數倉概念應運而生。
5.實戰演練
5.1 資料流圖
5.2 添加依賴包
把以下所有依賴包放到${FLINK_HOME}/lib目錄(flink安裝目錄下的lib目錄下)
flink-connector-hive_2.11-1.11.1.jar flink-connector-kafka_2.12-1.11.1.jar flink-connector-kafka-base_2.12-1.11.1.jar flink-csv-1.11.1.jar flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar hadoop-mapreduce-client-common-2.6.0-cdh5.14.4.jar hadoop-mapreduce-client-core-2.6.0-cdh5.14.4.jar hive-exec-1.1.0-cdh5.14.4.jar hive-metastore-1.1.0-cdh5.14.4.jar kafka_2.11-2.1.0.jar kafka-clients-2.1.0.jar |
5.3 修改配置檔案${FLINK_HOME}/conf/flink-conf.yaml
#隻有在完成 Checkpoint 之後,檔案才會從 In-progress 狀态變成 Finish 狀态,才會觸發送出hive分區操作,是以,我們需要合理的去配置 Checkpointclassloader.resolve-order: parent-first#設定任務使用的時間屬性是eventtimepipeline.time-characteristic: EventTime#設定checkpoint的時間間隔execution.checkpointing.interval: 30000#確定檢查點之間的間隔execution.checkpointing.min-pause: 10000#設定checkpoint的逾時時間execution.checkpointing.timeout: 60000#設定任務取消後保留hdfs上的checkpoint檔案execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION#設定checkpointing操作依賴包flink.execution.packages: org.apache.flink:flink-connector-kafka_2.12:1.11.1,org.apache.flink:flink-connector-kafka-base_2.12:1.11.1
5.4 配置并啟動flink yarn-session
5.4.1 配置${FLINK_HOME}/bin/yarn-session.sh
#增加HADOOP_CLASSPATH變量export HADOOP_CLASSPATH=`hadoop classpath`#修改CC_CLASSPATH變量CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS:$HADOOP_CLASSPATH`
5.4.2 啟動yarn-session模式
cd ${FLINK_HOME}/bin./yarn-session.sh
5.5 配置flink sql-client
5.5.1 添加配置${FLINK_HOME}/bin/sql-client.sh
export HADOOP_CLASSPATH=`hadoop classpath`HADOOP_CONF_DIR=/etc/hadoop/conf
5.5.2 配置${FLINK_HOME}/conf/sql-client-defaults.yaml
catalogs: - name: myhivetype: hive hive-conf-dir: /etc/hive/confdefault-database: defaultexecution: current-catalog: myhive current-database: default
5.5.3 啟動sql-client.sh (Flink SQL指令行)
cd ${FLINK_HOME}/bin/./sql-client.sh embedded
5.6 啟動應用
5.6.1 在Flink SQL指令行模式下執行以下代碼
SET table.sql-dialect=hive;drop table hive_table;CREATE TABLE hive_table (`user` STRING,url STRING) PARTITIONED BY (dt STRING, hr STRING,mi STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file');SET table.sql-dialect=default;drop table kafka_table;CREATE TABLE kafka_table (`user` STRING,`cTime` TIMESTAMP(3),`url` STRING, WATERMARK FOR cTime AS cTime - INTERVAL '5' SECOND) WITH ('connector'='kafka','topic'='FlinkDynamicTableKafkaDemo','properties.bootstrap.servers'='bigdata4.zhenglihan.com:9092,bigdata5.zhenglihan.com:9092,bigdata6.zhenglihan.com:9092','properties.group.id'='FlinkDynamicTableKafkaDemoGroup','scan.startup.mode'='latest-offset','format'='csv');SET table.sql-dialect=hive;INSERT INTO TABLE hive_table SELECT `user`, url, DATE_FORMAT(cTime, 'yyyy-MM-dd') as dt, DATE_FORMAT(cTime, 'HH') as hr, DATE_FORMAT(cTime, 'mm') as mi FROM kafka_table;
參數解析:
partition.time-extractor.timestamp-pattern :分區時間抽取器,與 DDL 中的分區字段保持一緻;sink.partition-commit.trigger :分區觸發器類型,可選 process-time 或partition-time。process-time:不需要上面的參數,也不需要水印,當目前時間大于分區建立時間 +sink.partition-commit.delay 中定義的時間,送出分區;partition-time:需要 Source 表中定義 watermark,當 watermark > 提取到的分區時間 +sink.partition-commit.delay 中定義的時間,送出分區;sink.partition-commit.delay :相當于延時時間;sink.partition-commit.policy.kind :怎麼送出,一般送出成功之後,需要通知 metastore,這樣 Hive 才能讀到你最新分區的資料;如果需要合并小檔案,也可以自定義 Class,通過實作 PartitionCommitPolicy 接口。
5.7 驗證
5.7.1 啟動kafka生産者往topic FlinkDynamicTableKafkaDemo 發送以下資料
cd /opt/cloudera/parcels/KAFKAbin/kafka-console-producer --broker-list bigdata4.zhenglihan.com:9092,bigdata5.zhenglihan.com:9092,bigdata6.zhenglihan.com:9092 --topic FlinkDynamicTableKafkaDemo
Mary,2018-12-17 12:00:00,./home Bob,2018-12-17 12:00:00,./cart Mary,2018-12-17 12:02:00,./prod?id=1 Mary,2018-12-17 12:55:00,./prod?id=4 Bob,2018-12-17 13:01:00,./prod?id=5 Liz,2018-12-17 13:30:00,./home Liz,2018-12-17 13:59:00,./prod?id=7 Mary,2018-12-17 14:00:00,./cart Liz,2018-12-17 14:02:00,./home Bob,2018-12-17 14:30:00,./prod?id=3 Bob,2018-12-17 14:40:00,./home Mary,2018-12-18 12:00:00,./home Bob,2018-12-18 12:00:00,./cart Mary,2018-12-18 12:02:00,./prod?id=1 Mary,2018-12-18 12:55:00,./prod?id=4 Bob,2018-12-18 13:01:00,./prod?id=5 Liz,2018-12-18 13:30:00,./home Liz,2018-12-18 13:59:00,./prod?id=7 Mary,2018-12-18 14:00:00,./cart Liz,2018-12-18 14:02:00,./home Bob,2018-12-18 14:30:00,./prod?id=3 Bob,2018-12-18 14:40:00,./home |
5.7.2 去喝杯茶,等待一分鐘,啟動hive用戶端驗證結果
beeline !connect jdbc:hive2://bigdata5.zhenglihan.com:10000 hive hive org.apache.hive.jdbc.HiveDriverselect * from hive_table;
6.總結:
本文內建了flink、kafka和hive三種元件,以flink-sql方式示範了資料實時從kafka到hive的過程,讀者應掌握flink寫入資料到hive以及送出hive分區的過程及其意義,才能真正掌握hive實時數倉的核心原理。
更為深度的講解請掃描底部二維碼關注公衆号,關注後續博文,一起學習hadoop大資料!