天天看點

Flink SQL No Watermark

大家好,請教一個問題

我有一條進行 <code>session window</code> 的 sql。這條 sql 消費較少資料量的 topic 的時候,是可以生成 watermark。消費大量的資料的時候,就無法生成watermark。

一直是 No Watermark。 暫時找不到排查問題的思路。

Flink 版本号是 1.10,kafka 中消息是有時間的,其他的任務是可以拿到這個時間生成watermark。同時設定了 <code>EventTime mode 模式</code>,<code>Blink Planner。</code>

關于這個問題我進行了一些 debug,發現了 watermark 對應的一個 <code>physical relnode</code> 是 <code>StreamExecWatermarkAssigner</code>

在<code>translateToPlanInternal</code>中生成了如下一個 class 代碼:

其中關鍵的資訊是 <code>result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond());</code> 确實按照 <code>WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND</code> 的定義擷取的watermark。

在 flink 的 graph 中也确實有對應的 op 在做這個事情,不知為何會出現 no watermark 這樣的結果。因為這部分codegen的代碼确實無法進一步debug了。

如果大家有什麼好的 debug codegen 生成的代碼,可以告訴我哈,非常感謝

hi, 你的意思是沒有辦法在codegen出來的代碼上加斷點的意思嗎?

這裡倒是有一個比較hack的方法:

将生成的類放在一個java檔案之中,然後修改改下<code>GeneratedClass</code>下的<code>newInstance</code>方法,如果<code>classname ==“WatermarkGenerator$2”</code> 則将剛才的類則傳回 <code>new WatermarkGenerator$2</code> 這個類。

我個人對于問題的猜測是有一條資料的rowtime遠遠晚于其他資料,進而将整體的watermark提得很高,導緻後面的“晚到”的資料一直無法觸發watermark的生成。

大家好

問題的原因定位到了。

由于無法 debug codegen 生成的代碼,即使我拿到線上的資料,開啟了debug環境依然無法得到進展。

這個時候,我進行了 disable chain,觀察 watermark 的生成情況,看看到底在那個環節沒有繼續往下傳遞。(因為多個 op chain 在一起,不能确定到底是那個環節存在問題)

發現在 <code>WatermarkAssigner(rowtime=[event_time], watermark=[(event_ti...)</code>這個 op 中部分 task 為 No watermark,由于這個op和source chain在一起,導緻這個vertex 對應的watermark無法顯示隻能是 no data。因為存在 group by 下遊的 watermark 為 min(parent task output watermark),是以下遊是 No watermark。導緻在查問題的時候,比較困難。

定位到由于 kafka 部分 partition 無資料導緻 No watermark 加上 <code>table.exec.source.idle-timeout = 10s</code> 參數即可。

當然,如果能直接 debug codegen 生成的代碼,那麼這個問題的分析路徑會更簡單。我應該直接可以發現大部分 task 可以生成 watermark,少部分 task 無 watermark,能夠快速的減少debug的時間。目前使用 disable chain 觀察每個 op 的情況,對于 Flink sql 的 debug 有很大的便利之處,不知社群是否有相關參數幫助開發者。

繼續閱讀