維表join
代碼編寫
- 之前在講Flink Sql的時候和大家聊過維表以及如何用Flink Sql來完成維表Join
- 現在帶大家看看如何用Zeppelin來實作這個功能
- 首先,我們先引入我們所需的依賴包,目前大家先跟着我這麼寫,之後會講解引入依賴的多種方式和差別。
%flink.conf # 這是第一個paragraph,大家不要把所有代碼寫在一個paragraph裡面 # 配置一下依賴包,這種方式會自動下載下傳依賴 flink.execution.packages org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0,org.apache.flink:flink-jdbc_2.11:1.10.0 # 大家千萬注意,如果用的是org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0這2個包,那麼kafka 那邊的 version請寫universal,否則你會發現莫名其妙的錯誤 # 如果kafka版本低于0.11,請用org.apache.flink:link-connector-kafka-0.11_2.11 替換上面的kafka的包,kafka版本和scala版本也請替換成對應的版本,ddl語句中的version也同樣如此 # 下面會用到Mysql,如果大家已經在Flink的lib目錄下放了Mysql的驅動包,那麼配這麼多的包就行 # 否則的話,再加上mysql:mysql-connector-java:5.1.37這個包
- 然後我們注冊個
,再注冊File System Source
,之後會将從檔案中讀取的資料寫入到kafka中。注意!大家不要把所有代碼寫在一個paragraph裡面,建議一個paragraph寫一段單一功能的語句Kafka Sink
%flink.ssql -- File System Source DDL DROP TABLE IF EXISTS t1; CREATE TABLE t1 ( user_id bigint, item_id bigint, category_id bigint, behavior varchar, ts bigint ) WITH ( 'connector.type' = 'filesystem', 'connector.path' = 'hdfs:///test/UserBehavior.csv', 'format.type' = 'csv', 'format.field-delimiter' = ',' ) ;
%flink.ssql -- Kafka Sink DDL DROP TABLE IF EXISTS t2; CREATE TABLE t2 ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts BIGINT ) WITH ( 'update-mode' = 'append', 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'zeppelin_01_test', 'connector.properties.zookeeper.connect' = '127.0.0.1:2181', 'connector.properties.bootstrap.servers' = '127.0.0.1:9092', 'format.type'='json' )
%flink.ssql -- 将我們的資料寫入kafka -- 這裡之是以用了UNIX_TIMESTAMP()這個udf來代替我們原生的ts --是因為這個ts太老了,之後我們要做視窗計算的話,會一直沒法輸出資料的 insert into t2 select user_id,item_id,category_id,behavior,UNIX_TIMESTAMP() as ts from t1;
- 讓我們運作一下看看什麼情況
- 可以看到任務在持續的執行,點選這個按鈕可以跳轉到Flink叢集上對應的任務頁面,可以檢視相關資訊,這裡就不給大家示範了
- 接下來讓我們再注冊個
,然後從中讀取資料Kafka Source
%flink.ssql -- Kafka Source DDL DROP TABLE IF EXISTS t3; CREATE TABLE t3( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts BIGINT, r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),-- 計算列,因為ts是bigint,沒法作為水印,是以用UDF轉成TimeStamp WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND -- 指定水印生成方式 )WITH ( 'update-mode' = 'append', 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'zeppelin_01_test', 'connector.properties.zookeeper.connect' = '127.0.0.1:2181', 'connector.properties.bootstrap.servers' = '127.0.0.1:9092', 'connector.properties.group.id' = 'zeppelin_01_test', 'connector.startup-mode' = 'latest-offset', 'format.type'='json' )
%flink.ssql(type=update) select * from t3
- 有個要注意的地方是,
語句必須指定select
,什麼意思呢?type
指的是流式資料分析的三種模式type
- single
- append
- update
- single模式适合當輸出結果是一行的情況。使用這種模式,永遠隻有一行資料,但這行資料會持續不斷的更新
- Append模式适合不斷有新資料輸出,但不會覆寫原有資料,隻會不斷append的情況。值得注意的是,append模式的第一列一定要是timestamp,因為需要根據時間來設定一個threshold,不然資料源源不斷進來,最後會OOM,如果你要預覽資料的話,可以用
-- 來自簡鋒大佬的教誨 %flink.ssql(type=update) select * from table order by time_column desc limit 10```
- Update模式适合多行輸出的情況,适合和聚合語句配合一起使用,持續不斷的更新資料,配合Zeppelin的可視化控件一起使用,效果更好
- 有個要注意的地方是,
- 瞄一眼輸出的内容,沒什麼問題,那我們開始整合
,先去Mysql庫裡建個表Mysql Dim
-- Mysql 建表語句,注意這是在Mysql執行的!不要在Zeppelin執行 CREATE TABLE `dim_behavior` ( `id` int(10) NOT NULL AUTO_INCREMENT COMMENT '自增主鍵', `en_behavior` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '英文 行為', `zh_behavior` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '中文 行為', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- 搞兩條資料 INSERT INTO `dijie_test`.`dim_behavior`(`id`, `en_behavior`, `zh_behavior`) VALUES (1, 'buy', '購買'); INSERT INTO `dijie_test`.`dim_behavior`(`id`, `en_behavior`, `zh_behavior`) VALUES (2, 'pv', '浏覽');
- 接下來讓我們回到Zeppelin,開始寫我們的建表語句和查詢語句
%flink.ssql DROP TABLE IF EXISTS dim_behavior; CREATE TABLE `dim_behavior` ( `id` int , `en_behavior` varchar , `zh_behavior` varchar )WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://127.0.0.1:3306/dijie_test', 'connector.table' = 'dim_behavior', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'hive', 'connector.password' = 'hive' , 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10s' )
%flink.ssql(type = update) select zh_behavior, count(distinct user_id) as cnt_distin_user, tumble_start(c.r_t,interval '10' second) as tumble_start from ( select b.*,a.* from ( select *,proctime() as p from t3 ) a left join dim_behavior FOR SYSTEM_TIME AS OF a.p AS b on a.behavior = b.en_behavior where b.zh_behavior is not null ) c group by c.zh_behavior,tumble(c.r_t,interval '10' second)
- 我們在Sql裡進行了判斷,把維表中沒有的資料給過濾了。瞄一眼結果,發現确實正确的過濾了。而且資料正在持續不斷的更新
- 大家可以對比先之前的Flink Sql 03課程,發現之前寫Sql的方式實在low爆了
- 以前還得先寫Java代碼,不會Java就不配寫Flink Sql。而現在,除了Sql,别的什麼語言都沒見着,純Sql方式完成了從讀取到比對到輸出的操作,實在太友善了
- 可能有同學在用Zeppelin之前也以為也要寫代碼,甚至會對Zeppelin嗤之以鼻:我的IDEA不香嗎?當你真正開始用上的時候,你會發現,Zeppelin才是終極殺手!The Answer!
- 好了,今天的學習就到此為止,下一次帶大家學習如何在Zeppelin中,使用我們Java編寫的UDF和自定義的Table Factory
采坑記錄
- 如果在執行
的内容報如下錯誤時,請先去Interpreter頁面,重新開機Interpreter,再執行語句flink.conf
java.io.IOException: Can not change interpreter properties when interpreter process has already been launched at org.apache.zeppelin.interpreter.InterpreterSetting.setInterpreterGroupProperties(InterpreterSetting.java:958) at org.apache.zeppelin.interpreter.ConfInterpreter.interpret(ConfInterpreter.java:73) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:479) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:75) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130) at org.apache.zeppelin.scheduler.FIFOScheduler.lambda$runJobInScheduler$0(FIFOScheduler.java:39) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
- 在執行
或者insert
時,如果發現任務一點執行就立刻結束,沒有報錯,Flink Web Ui 也看不到相應的任務資訊,同時,Zeppelin的日志也查不到有些的資訊時,請将該paragraph的注釋内容全部删除,再點選執行,你就會發現任務能夠正常運作了。select
- 簡鋒大佬懷疑是Zeppelin把這條sql當成comment了,應該是個bug。我已經提ticket了,連結
- 本次教程的notebook,我也都放到了我github的倉庫裡了,傳送門
最後,向大家宣傳一下Flink on Zeppelin 的釘釘群,大家有問題可以在裡面讨論,簡鋒大佬也在裡面,有問題直接提問就好