天天看點

Flink常見異常和錯誤資訊小結

Flink的常見異常衆多,不可能面面俱到,是以想到哪兒寫到哪兒,有漏掉的之後再補充。

部署和資源問題

(0) JDK版本過低

這不是個顯式錯誤,但是JDK版本過低很有可能會導緻Flink作業出現各種莫名其妙的問題,是以在生産環境中建議采用JDK 8的較高update(我們使用的是181)。

(1) Could not build the program from JAR file

該資訊不甚準确,因為絕大多數情況下都不是JAR包本身有毛病,而是在作業送出過程中出現異常退出了。是以需要檢視本次送出産生的用戶端日志(預設位于$FLINK_HOME/logs目錄下),再根據其中的資訊定位并解決問題。

(2) ClassNotFoundException/NoSuchMethodError/IncompatibleClassChangeError/...

一般都是因為使用者依賴第三方包的版本與Flink架構依賴的版本有沖突導緻。

(3) Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

就是字面意思,YARN叢集内沒有足夠的資源啟動Flink作業。檢查一下目前YARN叢集的狀态、正在運作的YARN App以及Flink作業所處的隊列,釋放一些資源或者加入新的資源。

(4) java.util.concurrent.TimeoutException: Slot allocation request timed out

slot配置設定請求逾時,是因為TaskManager申請資源時無法正常獲得,按照上一條的思路檢查即可。

(5) org.apache.flink.util.FlinkException: The assigned slot was removed

TaskManager的Container因為使用資源超限被kill掉了。首先需要保證每個slot配置設定到的記憶體量足夠,特殊情況下可以手動配置SlotSharingGroup來減少單個slot中共享Task的數量。如果資源沒問題,那麼多半就是程式内部發生了記憶體洩露。建議仔細檢視TaskManager日志,并按處理JVM OOM問題的正常操作來排查。

(6) java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id timed out

TaskManager心跳逾時。有可能是TaskManager已經失敗,如果沒有失敗,那麼有可能是因為網絡不好導緻JobManager沒能收到心跳信号,或者TaskManager忙于GC,無法發送心跳信号。JobManager會重新開機心跳逾時的TaskManager,如果頻繁出現此異常,應該通過日志進一步定位問題所在。

作業問題

(1)org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

該異常幾乎都是由于程式業務邏輯有誤,或者資料流裡存在未處理好的髒資料導緻的,繼續向下追溯異常棧一般就可以看到具體的出錯原因,比較常見的如POJO内有空字段,或者抽取事件時間的時間戳為null等。

(2) java.lang.IllegalStateException: Buffer pool is destroyed || Memory manager has been shut down

很多童鞋拿着這兩條異常資訊來求助,但實際上它們隻是表示BufferPool、MemoryManager這些Flink運作時元件被銷毀,亦即作業已經失敗。具體的原因多種多樣,根據經驗,一般是上一條描述的情況居多(即Could not forward element to next operator錯誤會伴随出現),其次是JDK版本問題。具體情況還是要根據TaskManager日志具體分析。

(3) akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://...]] after [10000 ms]

Akka逾時導緻,一般有兩種原因:一是叢集負載比較大或者網絡比較擁塞,二是業務邏輯同步調用耗時的外部服務。如果負載或網絡問題無法徹底緩解,需考慮調大akka.ask.timeout參數的值(預設隻有10秒);另外,調用外部服務時盡量異步操作(Async I/O)。

(4) java.io.IOException: Too many open files

這個異常我們應該都不陌生,首先檢查系統ulimit -n的檔案描述符限制,再注意檢查程式内是否有資源(如各種連接配接池的連接配接)未及時釋放。值得注意的是,Flink使用RocksDB狀态後端也有可能會抛出這個異常,此時需修改flink-conf.yaml中的state.backend.rocksdb.files.open參數,如果不限制,可以改為-1。

(5) org.apache.flink.api.common.function.InvalidTypesException: The generic type parameters of '' are missing

在Flink内使用Java Lambda表達式時,由于類型擦除造成的副作用,注意調用returns()方法指定被擦除的類型。

檢查點和狀态問題

(1) Received checkpoint barrier for checkpoint before completing current checkpoint . Skipping current checkpoint

在目前檢查點還未做完時,收到了更新的檢查點的barrier,表示目前檢查點不再需要而被取消掉,一般不需要特殊處理。

(2) Checkpoint expired before completing

首先應檢查CheckpointConfig.setCheckpointTimeout()方法設定的檢查點逾時,如果設的太短,适當改長一點。另外就是考慮發生了反壓或資料傾斜,或者barrier對齊太慢。

(3) org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible

我們知道Flink的狀态是按key組織并儲存的,如果程式邏輯内改了keyBy()邏輯或者key的序列化邏輯,就會導緻檢查點/儲存點的資料無法正确恢複。是以如果必須要改key相關的東西,就棄用之前的狀态資料吧。

(4) org.apache.flink.util.StateMigrationException: The new serializer for a MapState requires state migration in order for the job to proceed. However, migration for MapState currently isn't supported

在1.9之前的Flink版本中,如果我們使用RocksDB狀态後端,并且更改了自用MapState的schema,恢複作業時會抛出此異常,表示不支援更改schema。這個問題已經在FLINK-11947解決,更新版本即可。