天天看點

flink source 同步_hadoop大資料Flink專題篇搭建基于Flink的實時數倉(一)

1.主題

     基于flink,kafka,hive搭建實時數倉(flink-sql版本)

flink source 同步_hadoop大資料Flink專題篇搭建基于Flink的實時數倉(一)

點選圖檔連結,從零開始學習大資料

2.适用讀者對象

    本文适用于hadoop大資料Flink進階學員

3.基礎環境資訊

軟體 版本 部署路徑
haoop 2.6.0-cdh5.14.4 cdh預設安裝
flink flink-1.11.1 /home/opt/flink/flink-1.11.1
kafka kafka_2.11-2.2.1-kafka-4.1.0 cdh預設安裝

4.主題描述

    随着大資料業務實時性要求的不斷提高,實時的業務越來越多,事件化的資料源也越來越多,實時處理從次要部分變成了主要部分,傳統的離線數倉很難滿足資料實時需求,于是,實時數倉概念應運而生。

5.實戰演練

    5.1 資料流圖

flink source 同步_hadoop大資料Flink專題篇搭建基于Flink的實時數倉(一)

    5.2 添加依賴包

        把以下所有依賴包放到${FLINK_HOME}/lib目錄(flink安裝目錄下的lib目錄下)

flink-connector-hive_2.11-1.11.1.jar

flink-connector-kafka_2.12-1.11.1.jar

flink-connector-kafka-base_2.12-1.11.1.jar

flink-csv-1.11.1.jar

flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar

hadoop-mapreduce-client-common-2.6.0-cdh5.14.4.jar

hadoop-mapreduce-client-core-2.6.0-cdh5.14.4.jar

hive-exec-1.1.0-cdh5.14.4.jar

hive-metastore-1.1.0-cdh5.14.4.jar

kafka_2.11-2.1.0.jar

kafka-clients-2.1.0.jar

    5.3 修改配置檔案${FLINK_HOME}/conf/flink-conf.yaml

#隻有在完成 Checkpoint 之後,檔案才會從 In-progress 狀态變成 Finish 狀态,才會觸發送出hive分區操作,是以,我們需要合理的去配置 Checkpointclassloader.resolve-order: parent-first#設定任務使用的時間屬性是eventtimepipeline.time-characteristic: EventTime#設定checkpoint的時間間隔execution.checkpointing.interval: 30000#確定檢查點之間的間隔execution.checkpointing.min-pause: 10000#設定checkpoint的逾時時間execution.checkpointing.timeout: 60000#設定任務取消後保留hdfs上的checkpoint檔案execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION#設定checkpointing操作依賴包flink.execution.packages: org.apache.flink:flink-connector-kafka_2.12:1.11.1,org.apache.flink:flink-connector-kafka-base_2.12:1.11.1
           

    5.4 配置并啟動flink yarn-session

        5.4.1 配置${FLINK_HOME}/bin/yarn-session.sh

#增加HADOOP_CLASSPATH變量export HADOOP_CLASSPATH=`hadoop classpath`#修改CC_CLASSPATH變量CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS:$HADOOP_CLASSPATH`
           

        5.4.2 啟動yarn-session模式

cd ${FLINK_HOME}/bin./yarn-session.sh
           
flink source 同步_hadoop大資料Flink專題篇搭建基于Flink的實時數倉(一)
flink source 同步_hadoop大資料Flink專題篇搭建基于Flink的實時數倉(一)

    5.5 配置flink sql-client

        5.5.1 添加配置${FLINK_HOME}/bin/sql-client.sh 

export HADOOP_CLASSPATH=`hadoop classpath`HADOOP_CONF_DIR=/etc/hadoop/conf
           

        5.5.2 配置${FLINK_HOME}/conf/sql-client-defaults.yaml

catalogs:  - name: myhivetype: hive    hive-conf-dir: /etc/hive/confdefault-database: defaultexecution:  current-catalog: myhive  current-database: default
           

        5.5.3 啟動sql-client.sh (Flink SQL指令行)

cd ${FLINK_HOME}/bin/./sql-client.sh embedded
           
flink source 同步_hadoop大資料Flink專題篇搭建基于Flink的實時數倉(一)
flink source 同步_hadoop大資料Flink專題篇搭建基于Flink的實時數倉(一)

    5.6 啟動應用

        5.6.1 在Flink SQL指令行模式下執行以下代碼

SET table.sql-dialect=hive;drop table hive_table;CREATE TABLE hive_table (`user` STRING,url STRING) PARTITIONED BY (dt STRING, hr STRING,mi STRING) STORED AS parquet TBLPROPERTIES ('partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00','sink.partition-commit.trigger'='partition-time','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='metastore,success-file');SET table.sql-dialect=default;drop table kafka_table;CREATE TABLE kafka_table (`user` STRING,`cTime` TIMESTAMP(3),`url` STRING,   WATERMARK FOR cTime AS cTime - INTERVAL '5' SECOND) WITH ('connector'='kafka','topic'='FlinkDynamicTableKafkaDemo','properties.bootstrap.servers'='bigdata4.zhenglihan.com:9092,bigdata5.zhenglihan.com:9092,bigdata6.zhenglihan.com:9092','properties.group.id'='FlinkDynamicTableKafkaDemoGroup','scan.startup.mode'='latest-offset','format'='csv');SET table.sql-dialect=hive;INSERT INTO TABLE hive_table SELECT `user`, url, DATE_FORMAT(cTime, 'yyyy-MM-dd') as dt, DATE_FORMAT(cTime, 'HH') as hr, DATE_FORMAT(cTime, 'mm') as mi  FROM kafka_table;
           

        參數解析:

partition.time-extractor.timestamp-pattern :分區時間抽取器,與 DDL 中的分區字段保持一緻;sink.partition-commit.trigger :分區觸發器類型,可選 process-time 或partition-time。process-time:不需要上面的參數,也不需要水印,當目前時間大于分區建立時間 +sink.partition-commit.delay 中定義的時間,送出分區;partition-time:需要 Source 表中定義 watermark,當 watermark > 提取到的分區時間 +sink.partition-commit.delay 中定義的時間,送出分區;sink.partition-commit.delay :相當于延時時間;sink.partition-commit.policy.kind :怎麼送出,一般送出成功之後,需要通知 metastore,這樣 Hive 才能讀到你最新分區的資料;如果需要合并小檔案,也可以自定義 Class,通過實作 PartitionCommitPolicy 接口。
           
flink source 同步_hadoop大資料Flink專題篇搭建基于Flink的實時數倉(一)

    5.7 驗證

        5.7.1 啟動kafka生産者往topic FlinkDynamicTableKafkaDemo 發送以下資料

cd /opt/cloudera/parcels/KAFKAbin/kafka-console-producer --broker-list bigdata4.zhenglihan.com:9092,bigdata5.zhenglihan.com:9092,bigdata6.zhenglihan.com:9092 --topic FlinkDynamicTableKafkaDemo
           
flink source 同步_hadoop大資料Flink專題篇搭建基于Flink的實時數倉(一)
flink source 同步_hadoop大資料Flink專題篇搭建基于Flink的實時數倉(一)

Mary,2018-12-17 12:00:00,./home

Bob,2018-12-17 12:00:00,./cart

Mary,2018-12-17 12:02:00,./prod?id=1

Mary,2018-12-17 12:55:00,./prod?id=4

Bob,2018-12-17 13:01:00,./prod?id=5

Liz,2018-12-17 13:30:00,./home

Liz,2018-12-17 13:59:00,./prod?id=7

Mary,2018-12-17 14:00:00,./cart

Liz,2018-12-17 14:02:00,./home

Bob,2018-12-17 14:30:00,./prod?id=3

Bob,2018-12-17 14:40:00,./home

Mary,2018-12-18 12:00:00,./home

Bob,2018-12-18 12:00:00,./cart

Mary,2018-12-18 12:02:00,./prod?id=1

Mary,2018-12-18 12:55:00,./prod?id=4

Bob,2018-12-18 13:01:00,./prod?id=5

Liz,2018-12-18 13:30:00,./home

Liz,2018-12-18 13:59:00,./prod?id=7

Mary,2018-12-18 14:00:00,./cart

Liz,2018-12-18 14:02:00,./home

Bob,2018-12-18 14:30:00,./prod?id=3

Bob,2018-12-18 14:40:00,./home

        5.7.2 去喝杯茶,等待一分鐘,啟動hive用戶端驗證結果

beeline !connect jdbc:hive2://bigdata5.zhenglihan.com:10000 hive hive org.apache.hive.jdbc.HiveDriverselect * from hive_table;
           
flink source 同步_hadoop大資料Flink專題篇搭建基于Flink的實時數倉(一)

6.總結:

    本文內建了flink、kafka和hive三種元件,以flink-sql方式示範了資料實時從kafka到hive的過程,讀者應掌握flink寫入資料到hive以及送出hive分區的過程及其意義,才能真正掌握hive實時數倉的核心原理。

    更為深度的講解請掃描底部二維碼關注公衆号,關注後續博文,一起學習hadoop大資料!

flink source 同步_hadoop大資料Flink專題篇搭建基于Flink的實時數倉(一)

繼續閱讀