天天看點

flink問題整理

  1. 送出的flink程式重新開機失敗

    1. 錯誤日志:

    flink問題整理

            從圖中可以看出,flink程式在自動重新開機時,需要尋找 /tmp 下面的一些檔案。但是由于linux系統的/tmp目錄會被很多程式以及作業系統本身用到,是以很難避免檔案的誤删除操作。

    2. 解決方案

            在flink的 flink-conf.yaml 檔案中,有個配置項叫 io.tmp.dirs ,該配置用于決定程式運作過程中一些臨時檔案儲存的目錄。建議将該目錄配置為flink專用目錄。

  2. flink叢集無法通過 stop-cluster.sh 腳本停止

    1. 錯誤現象

    flink問題整理
    flink問題整理

            通過腳本停止叢集,發現無法在對應的機器上找到對應的flink服務。

    2. 解決方案

            在flink的安裝目錄下的 /bin 目錄下有個 config.sh 腳本檔案,裡面有一項配置用來配置flink服務的pid檔案目錄,配置名稱為: DEFAULT_ENV_PID_DIR ,預設值為 /tmp 。由于linux系統的/tmp目錄會被很多程式以及作業系統本身用到,是以很難避免檔案的誤删除操作。出現上述日志就是因為pid檔案被删除,導緻flink找不到機器上的程序pid編号所緻。是以我們需要修改該預設配置為一個flink專用目錄。

  3. flink sql on hive

    flink版本:1.11.1

    運作了一個sql語句,查詢hive表的資料,有group by,然後對很多字段取最大值,示例代碼如下:

    select user_id, max(column1), max(column2), max(column3)
    from test.test
    group by user_id;
               
    select後面的max字段有150多個,然後運作代碼報錯如下:
    java.lang.RuntimeException: Could not instantiate generated class 'LocalSortAggregateWithKeys$1975'
        at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
        at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
        at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:470)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
        at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
        at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
        at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65)
        ... 14 more
    Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
        at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
        ... 16 more
    Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
        at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
        at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
        ... 19 more
    Caused by: java.lang.StackOverflowError
        at org.codehaus.janino.CodeContext.extract16BitValue(CodeContext.java:700)
        at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:478)
        at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:557)
        at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:557)
               

    主要錯誤是:StackOverflowError,棧溢出。

    這個錯誤意思是說,方法嵌套太多。

    之後試了一下,發現是一個sql裡面的聚合函數不能寫太多,最大大概是120左右。建議聚合函數個數超過100個,就寫兩個sql,然後再把兩個sql的結果進行合并。

  4. 調用太多次get_json_value函數

    将處理完的資料寫入pulsar主題,主題字段特别多,超過200個,通過調用get_json_value(類似于hive中的get_json_object)函數,将處理完的結果資料(為json字元串)的内容提取出來,然後發現處理速度很慢。

    解決方式就是,結果就儲存一個字段,将json字元串輸入,下遊處理時再通過get_json_value函數擷取需要的字段值,或者是在自定義UDF中擷取。

繼續閱讀