天天看點

Flink Sql on Zeppelin(2)——維表Join

維表join

代碼編寫

  • 之前在講Flink Sql的時候和大家聊過維表以及如何用Flink Sql來完成維表Join
  • 現在帶大家看看如何用Zeppelin來實作這個功能
  • 首先,我們先引入我們所需的依賴包,目前大家先跟着我這麼寫,之後會講解引入依賴的多種方式和差別。
    %flink.conf
    	# 這是第一個paragraph,大家不要把所有代碼寫在一個paragraph裡面
    	# 配置一下依賴包,這種方式會自動下載下傳依賴
    	
    	flink.execution.packages	org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0,org.apache.flink:flink-jdbc_2.11:1.10.0
    	
    	# 大家千萬注意,如果用的是org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0這2個包,那麼kafka 那邊的 version請寫universal,否則你會發現莫名其妙的錯誤
    	
    	# 如果kafka版本低于0.11,請用org.apache.flink:link-connector-kafka-0.11_2.11 替換上面的kafka的包,kafka版本和scala版本也請替換成對應的版本,ddl語句中的version也同樣如此
    	# 下面會用到Mysql,如果大家已經在Flink的lib目錄下放了Mysql的驅動包,那麼配這麼多的包就行
    	# 否則的話,再加上mysql:mysql-connector-java:5.1.37這個包
               
  • 然後我們注冊個

    File System Source

    ,再注冊

    Kafka Sink

    ,之後會将從檔案中讀取的資料寫入到kafka中。注意!大家不要把所有代碼寫在一個paragraph裡面,建議一個paragraph寫一段單一功能的語句
    %flink.ssql
    
    	-- File System Source DDL
    	DROP TABLE IF EXISTS t1;
    	
    	CREATE TABLE t1 (
    	 user_id bigint,
    	 item_id bigint,
    	 category_id bigint,
    	 behavior varchar,
    	 ts bigint
    	) WITH (
    	  'connector.type' = 'filesystem',                
    	  'connector.path' = 'hdfs:///test/UserBehavior.csv', 
    	  'format.type' = 'csv',                         
    	  'format.field-delimiter' = ',' 
    	)    
    	;
               
    %flink.ssql
    
    	-- Kafka Sink DDL
    	DROP TABLE IF EXISTS t2;
    	
    	CREATE TABLE t2 (
    	    user_id BIGINT,
    	    item_id BIGINT,
    	    category_id BIGINT,
    	    behavior STRING,
    	    ts BIGINT
    	    ) WITH (
    	    'update-mode' = 'append',
    	    'connector.type' = 'kafka',  
    	    'connector.version' = 'universal',  
    	    'connector.topic' = 'zeppelin_01_test', 
    	    'connector.properties.zookeeper.connect' = '127.0.0.1:2181',
    	    'connector.properties.bootstrap.servers' = '127.0.0.1:9092',
    	    'format.type'='json'
    	
    	)
    
               
    %flink.ssql
    	-- 将我們的資料寫入kafka
    	-- 這裡之是以用了UNIX_TIMESTAMP()這個udf來代替我們原生的ts
    	--是因為這個ts太老了,之後我們要做視窗計算的話,會一直沒法輸出資料的
    	
    	insert into t2 select user_id,item_id,category_id,behavior,UNIX_TIMESTAMP() as ts from t1;
               
  • 讓我們運作一下看看什麼情況
    Flink Sql on Zeppelin(2)——維表Join
  • 可以看到任務在持續的執行,點選這個按鈕可以跳轉到Flink叢集上對應的任務頁面,可以檢視相關資訊,這裡就不給大家示範了
  • 接下來讓我們再注冊個

    Kafka Source

    ,然後從中讀取資料
    %flink.ssql
    
    	-- Kafka Source DDL
    	DROP TABLE IF EXISTS t3;
    	CREATE TABLE t3(
    	    user_id BIGINT,
    	    item_id BIGINT,
    	    category_id BIGINT,
    	    behavior STRING,
    	    ts BIGINT,
    	    r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),-- 計算列,因為ts是bigint,沒法作為水印,是以用UDF轉成TimeStamp
    	    WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND -- 指定水印生成方式
    	)WITH (
    	    'update-mode' = 'append',
    	    'connector.type' = 'kafka',  
    	    'connector.version' = 'universal',  
    	    'connector.topic' = 'zeppelin_01_test', 
    	    'connector.properties.zookeeper.connect' = '127.0.0.1:2181',
    	    'connector.properties.bootstrap.servers' = '127.0.0.1:9092',
    	    'connector.properties.group.id' = 'zeppelin_01_test',
    	    'connector.startup-mode' = 'latest-offset',
    	    'format.type'='json'
    	
    	)
               
    %flink.ssql(type=update)
    	
    	
    	select * from t3
               
    • 有個要注意的地方是,

      select

      語句必須指定

      type

      ,什麼意思呢?

      type

      指的是流式資料分析的三種模式
      • single
      • append
      • update
    • single模式适合當輸出結果是一行的情況。使用這種模式,永遠隻有一行資料,但這行資料會持續不斷的更新
    • Append模式适合不斷有新資料輸出,但不會覆寫原有資料,隻會不斷append的情況。值得注意的是,append模式的第一列一定要是timestamp,因為需要根據時間來設定一個threshold,不然資料源源不斷進來,最後會OOM,如果你要預覽資料的話,可以用
      -- 來自簡鋒大佬的教誨
      %flink.ssql(type=update) 
      select * from table order by time_column desc limit 10```
                 
    • Update模式适合多行輸出的情況,适合和聚合語句配合一起使用,持續不斷的更新資料,配合Zeppelin的可視化控件一起使用,效果更好
  • 瞄一眼輸出的内容,沒什麼問題,那我們開始整合

    Mysql Dim

    ,先去Mysql庫裡建個表
    -- Mysql 建表語句,注意這是在Mysql執行的!不要在Zeppelin執行
    	  CREATE TABLE `dim_behavior` (
    	  `id` int(10) NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',
    	  `en_behavior` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '英文 行為',
    	  `zh_behavior` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '中文 行為',
    	  PRIMARY KEY (`id`)
    	) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
    	
    	  -- 搞兩條資料
    	  INSERT INTO `dijie_test`.`dim_behavior`(`id`, `en_behavior`, `zh_behavior`) VALUES (1, 'buy', '購買');
    	  INSERT INTO `dijie_test`.`dim_behavior`(`id`, `en_behavior`, `zh_behavior`) VALUES (2, 'pv', '浏覽');
    
               
  • 接下來讓我們回到Zeppelin,開始寫我們的建表語句和查詢語句
    %flink.ssql
    	
    	DROP TABLE IF EXISTS dim_behavior;
    	
    	CREATE TABLE `dim_behavior` (
    	  `id` int  ,
    	  `en_behavior` varchar  ,
    	  `zh_behavior` varchar  
    	)WITH (
    	  'connector.type' = 'jdbc',
    	  'connector.url' = 'jdbc:mysql://127.0.0.1:3306/dijie_test',
    	  'connector.table' = 'dim_behavior',
    	  'connector.driver' = 'com.mysql.jdbc.Driver', 
    	  'connector.username' = 'hive',
    	  'connector.password' = 'hive' ,
    	  'connector.lookup.cache.max-rows' = '5000', 
    	  'connector.lookup.cache.ttl' = '10s'  
    	)
               
    %flink.ssql(type = update)
    
    
    	select 
    	zh_behavior,
    	count(distinct user_id) as cnt_distin_user,
    	tumble_start(c.r_t,interval '10' second) as tumble_start
    	from
    	(
    	select b.*,a.* from (
    	    select *,proctime() as p from t3
    	    ) a 
    	    left join dim_behavior FOR SYSTEM_TIME AS OF a.p AS b 
    	    on a.behavior = b.en_behavior
    	    where b.zh_behavior is not null
    	) c group by c.zh_behavior,tumble(c.r_t,interval '10' second)
               
  • 我們在Sql裡進行了判斷,把維表中沒有的資料給過濾了。瞄一眼結果,發現确實正确的過濾了。而且資料正在持續不斷的更新
    Flink Sql on Zeppelin(2)——維表Join
  • 大家可以對比先之前的Flink Sql 03課程,發現之前寫Sql的方式實在low爆了
  • 以前還得先寫Java代碼,不會Java就不配寫Flink Sql。而現在,除了Sql,别的什麼語言都沒見着,純Sql方式完成了從讀取到比對到輸出的操作,實在太友善了
  • 可能有同學在用Zeppelin之前也以為也要寫代碼,甚至會對Zeppelin嗤之以鼻:我的IDEA不香嗎?當你真正開始用上的時候,你會發現,Zeppelin才是終極殺手!The Answer!
  • 好了,今天的學習就到此為止,下一次帶大家學習如何在Zeppelin中,使用我們Java編寫的UDF和自定義的Table Factory

采坑記錄

  • 如果在執行

    flink.conf

    的内容報如下錯誤時,請先去Interpreter頁面,重新開機Interpreter,再執行語句
    java.io.IOException: Can not change interpreter properties when interpreter process has already been launched
    	at org.apache.zeppelin.interpreter.InterpreterSetting.setInterpreterGroupProperties(InterpreterSetting.java:958)
    	at org.apache.zeppelin.interpreter.ConfInterpreter.interpret(ConfInterpreter.java:73)
    	at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:479)
    	at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:75)
    	at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
    	at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
    	at org.apache.zeppelin.scheduler.FIFOScheduler.lambda$runJobInScheduler$0(FIFOScheduler.java:39)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
               
  • 在執行

    insert

    或者

    select

    時,如果發現任務一點執行就立刻結束,沒有報錯,Flink Web Ui 也看不到相應的任務資訊,同時,Zeppelin的日志也查不到有些的資訊時,請将該paragraph的注釋内容全部删除,再點選執行,你就會發現任務能夠正常運作了。
  • 簡鋒大佬懷疑是Zeppelin把這條sql當成comment了,應該是個bug。我已經提ticket了,連結
  • 本次教程的notebook,我也都放到了我github的倉庫裡了,傳送門

最後,向大家宣傳一下Flink on Zeppelin 的釘釘群,大家有問題可以在裡面讨論,簡鋒大佬也在裡面,有問題直接提問就好

Flink Sql on Zeppelin(2)——維表Join

繼續閱讀