天天看點

轉載--Flink Sql on Zeppelin(3)——UDF&Redis維表

UDF

概述

  • UDF在我們的Sql開發中,是一個必不可少的幫手,通過Sql+UDF能夠解決我們90%的問題
  • Flink目前提供了大量的内置UDF供我們使用,詳細可以參考官方文檔
  • 不過有些時候,内置的UDF并不滿足我們的需求,那就需要自定義UDF
  • 下面我們就來看看如何在Zeppelin中使用自定義UDF

使用

  • 在Flink中,使用代碼注冊UDF有兩種方式
    • tEnv.registerFunction("test",new TestScalarFunc());

    • tEnv.sqlUpdate("CREATE FUNCTION IF NOT EXISTS test AS 'udf.TestScalarFunc'");

  • 而在Zeppelin中,也有多種方式
    • 通過編寫Scala代碼,然後通過上面兩種方式注入
    • flink.execution.jars

      加載指定Jar加載進Flink叢集中,之後通過上面兩種方式注冊UDF。使用起來很不爽,首先你得知道有哪些UDF,其次你得挨個注冊,而且你還得知道每個UDF的全類名,很麻煩。那麼有沒有更好的方式呢?
    • flink.udf.jars

      自動将Jar包中所有UDF注冊,相當友善,下面示範一下
      • 先加一下配置參數
        %flink.conf
        flink.udf.jars /home/data/flink/lib_me/flink-udf-1.0-SNAPSHOT.jar
                   
        • 1
        • 2
      • 輸出一下,看看有哪些UDF
        %flink.ssql(type=update)
        show functions
                   
        • 1
        • 2
        轉載--Flink Sql on Zeppelin(3)——UDF&Redis維表
      • 很完美,将我們所有的UDF都注冊了進來,我們再來驗證一下正确性
        %flink.ssql(type=update)
        -- 連from哪個表都沒必要寫,Zeppelin實在太友善了	
        select javaupper('a')
                   
        • 1
        • 2
        • 3
        轉載--Flink Sql on Zeppelin(3)——UDF&Redis維表
      • 和我們預期的一樣,将字元

        a

        轉換成了

        A

    • 那麼,UDF的使用介紹就到這裡

Redis維表

概述

  • 之前在寫Flink Sql系列的時候,給大家示範了如何寫一個支援DDL方式的Redis維表
  • 不過隻在idea中使用過,并沒有在正式環境中跑過,是以,今天給大家示範一下如何在Zeppelin中使用Redis維表
  • 在開始之前,先說一下之前遇到的一個問題。我把我之前的包丢在伺服器跑時,總是抛出異常

    org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory'

    出現這個異常一般有三種情況,少包或者包沖突或者DDL有問題;而我已經把我Connector的依賴打入了Jar包,并且沒有指定

    <scope>provided</scope>

    ;我也把沖突的包給排除了;我idea是正常能跑的,是以也排除DDL寫的有問題。這就很奇怪了,後來想到Flink通過SPI機制來發現具體的實作類。于是我把Jar解壓,找到

    org.apache.flink.table.factories.TableFactory

    這個檔案。果不其然,Kafka的工廠類不在裡面,後來将所有涉及到的工廠類都丢到這個檔案裡面,就能夠正常運作了。不過這種方式太不優雅,于是又找到了一個解決方法,傳送門

    我已經将修改之後的pom檔案push了,大家可以拉一下最新的代碼

  • 另外,Flink連接配接Redis部分已經抽出來了,準備搞個新的工程,之後将會支援更多的功能,比如維表關聯

    HASH

    類型資料,又比如支援将資料插入Redis中,這些都将通過DDL語句來建表,然後用純Sql的方式進行關聯或者寫入
  • 好了,下面開始正式的介紹如何在Zeppelin中使用,我們自定義的Redis維表

使用

  • 先通過

    flink.execution.jars

    将我們的Jar引入
    %flink.conf
    flink.udf.jars /home/data/flink/lib_me/flink-udf-1.0-SNAPSHOT.jar
    flink.execution.jars /home/data/flink/lib_me/flink-redis-1.0-SNAPSHOT.jar
    flink.execution.packages	org.apache.flink:flink-connector-kafka_2.11:1.10.0,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0,org.apache.flink:flink-jdbc_2.11:1.10.0
               
  • 再建一下我們的資料源表和資料維表
    %flink.ssql
    
    -- Kafka Source DDL
    DROP TABLE IF EXISTS t3;
    CREATE TABLE t3(
        user_id BIGINT,
        item_id BIGINT,
        category_id BIGINT,
        behavior STRING,
        ts BIGINT,
        r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),
        WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND,
        p AS proctime()
    )WITH (
        'update-mode' = 'append',
        'connector.type' = 'kafka', 
        'connector.version' = 'universal',  
        'connector.topic' = 'zeppelin_01_test',  
        'connector.properties.zookeeper.connect' = '127.0.0.1:2181',
        'connector.properties.bootstrap.servers' = '127.0.0.1:9092',
        'connector.properties.group.id' = 'zeppelin_01_test',
        'connector.startup-mode' = 'earliest-offset',
        'format.type'='json'
    
    )
               
    %flink.ssql
    
    -- Redis Dim DDl
    DROP TABLE IF EXISTS redis_dim;
    CREATE TABLE redis_dim (
    first String,
    name String
    ) WITH (
      'connector.type' = 'redis',  
      'connector.ip' = '127.0.0.1', 
      'connector.port' = '6379', 
      'connector.lookup.cache.max-rows' = '10', 
      'connector.lookup.cache.ttl' = '10000000', 
      'connector.version' = '2.6' 
    )
               
  • 再執行我們的Sql,并且用UDF将查出來的維表值轉成大寫
    %flink.ssql(type=update)
    
    
    select a.*,javaupper(b.name) from t3 a left join redis_dim FOR SYSTEM_TIME AS OF a.p AS b on a.behavior = b.first where b.name is not null and b.name <> ''
    
               
  • 看一下結果
    轉載--Flink Sql on Zeppelin(3)——UDF&amp;Redis維表
  • 可以看出,我們成功關聯上了Redis維表,并且成功用我們自己注冊UDF,将值轉為了大寫,很成功!
  • 那麼,Redis維表就說到這裡

寫在最後

  • 今天這一章和之前的一張可以說把Zeppelin三種引入Jar的方式都介紹了一下,下面給大家總結一下
    • 通過Zeppelin管理
      • flink.execution.packages

        适合放我們在倉庫中有的Jar包,會自動把Jar的依賴也下載下傳下來,如

        flink-connect-kafka

      • flink.execution.jars

         适合放我們尚未部署到倉庫中的包,比如一些Flink提供的example包
      • flink.udf.jars

         适合放我們的UDF,Zeppelin會自動注冊
    • 放在

      $FLINK_HOME/lib

      中,不推薦,會影響整個Flink環境,很有可能因為你的Jar包導緻Flink無法使用
  • 在測試Redis Dim的時候,發現個bug,每次任務關閉的時候,遠端的Redis都會自動shutdown。後來觀察redis的日志發現這麼一句話

    2005:M 13 Jun 14:19:39.459 # User requested shutdown...

    ,看到這裡明白了,應該是用戶端代碼有個地方錯誤的關閉了服務端。于是翻看自己的代碼,發現

    asyncClient.shutdown(true);

    這個代碼的注釋寫着

    Synchronously save the dataset to disk and then shut down the server.

    。後來把這行去掉就一切正常了,之前之是以沒發現這個問題,是因為在idea中執行的代碼,每次停止任務的時候,根本走不到關閉連接配接的語句。還是因為自己偷懶沒有寫單元測試和去叢集測試,牢記教訓!

最後,向大家宣傳一下Flink on Zeppelin 的釘釘群,大家有問題可以在裡面讨論,簡鋒大佬也在裡面,有問題直接提問就好

轉載--Flink Sql on Zeppelin(3)——UDF&amp;Redis維表