UDF
概述
- UDF在我們的Sql開發中,是一個必不可少的幫手,通過Sql+UDF能夠解決我們90%的問題
- Flink目前提供了大量的内置UDF供我們使用,詳細可以參考官方文檔
- 不過有些時候,内置的UDF并不滿足我們的需求,那就需要自定義UDF
- 下面我們就來看看如何在Zeppelin中使用自定義UDF
使用
- 在Flink中,使用代碼注冊UDF有兩種方式
-
tEnv.registerFunction("test",new TestScalarFunc());
-
tEnv.sqlUpdate("CREATE FUNCTION IF NOT EXISTS test AS 'udf.TestScalarFunc'");
-
- 而在Zeppelin中,也有多種方式
- 通過編寫Scala代碼,然後通過上面兩種方式注入
-
加載指定Jar加載進Flink叢集中,之後通過上面兩種方式注冊UDF。使用起來很不爽,首先你得知道有哪些UDF,其次你得挨個注冊,而且你還得知道每個UDF的全類名,很麻煩。那麼有沒有更好的方式呢?flink.execution.jars
-
自動将Jar包中所有UDF注冊,相當友善,下面示範一下flink.udf.jars
- 先加一下配置參數
%flink.conf flink.udf.jars /home/data/flink/lib_me/flink-udf-1.0-SNAPSHOT.jar
- 1
- 2
- 輸出一下,看看有哪些UDF
%flink.ssql(type=update) show functions
- 1
- 2
轉載--Flink Sql on Zeppelin(3)——UDF&Redis維表 - 很完美,将我們所有的UDF都注冊了進來,我們再來驗證一下正确性
%flink.ssql(type=update) -- 連from哪個表都沒必要寫,Zeppelin實在太友善了 select javaupper('a')
- 1
- 2
- 3
轉載--Flink Sql on Zeppelin(3)——UDF&Redis維表 - 和我們預期的一樣,将字元
轉換成了a
A
- 先加一下配置參數
- 那麼,UDF的使用介紹就到這裡
Redis維表
概述
- 之前在寫Flink Sql系列的時候,給大家示範了如何寫一個支援DDL方式的Redis維表
- 不過隻在idea中使用過,并沒有在正式環境中跑過,是以,今天給大家示範一下如何在Zeppelin中使用Redis維表
- 在開始之前,先說一下之前遇到的一個問題。我把我之前的包丢在伺服器跑時,總是抛出異常
出現這個異常一般有三種情況,少包或者包沖突或者DDL有問題;而我已經把我Connector的依賴打入了Jar包,并且沒有指定org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory'
;我也把沖突的包給排除了;我idea是正常能跑的,是以也排除DDL寫的有問題。這就很奇怪了,後來想到Flink通過SPI機制來發現具體的實作類。于是我把Jar解壓,找到<scope>provided</scope>
org.apache.flink.table.factories.TableFactory
這個檔案。果不其然,Kafka的工廠類不在裡面,後來将所有涉及到的工廠類都丢到這個檔案裡面,就能夠正常運作了。不過這種方式太不優雅,于是又找到了一個解決方法,傳送門
我已經将修改之後的pom檔案push了,大家可以拉一下最新的代碼
- 另外,Flink連接配接Redis部分已經抽出來了,準備搞個新的工程,之後将會支援更多的功能,比如維表關聯
類型資料,又比如支援将資料插入Redis中,這些都将通過DDL語句來建表,然後用純Sql的方式進行關聯或者寫入HASH
- 好了,下面開始正式的介紹如何在Zeppelin中使用,我們自定義的Redis維表
使用
- 先通過
将我們的Jar引入flink.execution.jars
%flink.conf flink.udf.jars /home/data/flink/lib_me/flink-udf-1.0-SNAPSHOT.jar flink.execution.jars /home/data/flink/lib_me/flink-redis-1.0-SNAPSHOT.jar flink.execution.packages org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0,org.apache.flink:flink-jdbc_2.11:1.10.0
- 再建一下我們的資料源表和資料維表
%flink.ssql -- Kafka Source DDL DROP TABLE IF EXISTS t3; CREATE TABLE t3( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts BIGINT, r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'), WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND, p AS proctime() )WITH ( 'update-mode' = 'append', 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'zeppelin_01_test', 'connector.properties.zookeeper.connect' = '127.0.0.1:2181', 'connector.properties.bootstrap.servers' = '127.0.0.1:9092', 'connector.properties.group.id' = 'zeppelin_01_test', 'connector.startup-mode' = 'earliest-offset', 'format.type'='json' )
%flink.ssql -- Redis Dim DDl DROP TABLE IF EXISTS redis_dim; CREATE TABLE redis_dim ( first String, name String ) WITH ( 'connector.type' = 'redis', 'connector.ip' = '127.0.0.1', 'connector.port' = '6379', 'connector.lookup.cache.max-rows' = '10', 'connector.lookup.cache.ttl' = '10000000', 'connector.version' = '2.6' )
- 再執行我們的Sql,并且用UDF将查出來的維表值轉成大寫
%flink.ssql(type=update) select a.*,javaupper(b.name) from t3 a left join redis_dim FOR SYSTEM_TIME AS OF a.p AS b on a.behavior = b.first where b.name is not null and b.name <> ''
- 看一下結果
轉載--Flink Sql on Zeppelin(3)——UDF&Redis維表 - 可以看出,我們成功關聯上了Redis維表,并且成功用我們自己注冊UDF,将值轉為了大寫,很成功!
- 那麼,Redis維表就說到這裡
寫在最後
- 今天這一章和之前的一張可以說把Zeppelin三種引入Jar的方式都介紹了一下,下面給大家總結一下
- 通過Zeppelin管理
-
适合放我們在倉庫中有的Jar包,會自動把Jar的依賴也下載下傳下來,如flink.execution.packages
flink-connect-kafka
-
适合放我們尚未部署到倉庫中的包,比如一些Flink提供的example包flink.execution.jars
-
适合放我們的UDF,Zeppelin會自動注冊flink.udf.jars
-
- 放在
中,不推薦,會影響整個Flink環境,很有可能因為你的Jar包導緻Flink無法使用$FLINK_HOME/lib
- 通過Zeppelin管理
- 在測試Redis Dim的時候,發現個bug,每次任務關閉的時候,遠端的Redis都會自動shutdown。後來觀察redis的日志發現這麼一句話
,看到這裡明白了,應該是用戶端代碼有個地方錯誤的關閉了服務端。于是翻看自己的代碼,發現2005:M 13 Jun 14:19:39.459 # User requested shutdown...
這個代碼的注釋寫着asyncClient.shutdown(true);
。後來把這行去掉就一切正常了,之前之是以沒發現這個問題,是因為在idea中執行的代碼,每次停止任務的時候,根本走不到關閉連接配接的語句。還是因為自己偷懶沒有寫單元測試和去叢集測試,牢記教訓!Synchronously save the dataset to disk and then shut down the server.
最後,向大家宣傳一下Flink on Zeppelin 的釘釘群,大家有問題可以在裡面讨論,簡鋒大佬也在裡面,有問題直接提問就好