1. 資料接受,生成Block 流程
- streamingContext啟動時,會啟動ReceiverTracker, 它會将需要啟動的Receiver分發到Executor上,Executor上的ReceiverSuperviser會負責Receiver的啟動,停止,重新開機管理(後續有詳細文章描述:如何分發,啟動等)。
- Receiver上的store(Item),實際調用ReceiverSuperviser(具體實作ReceiverSuperviserImpl)的pushSingle(),來存儲每條資料。
- 在ReceiverSuperviserImpl的内部,他是通過BlockGenerator.addData 來進行具體的資料存儲
-
在BlockGenerator内部,有幾個核心元件:緩存資料的buffer, 存儲block的queue, 定時将buffer資料包裝成Block扔到queue的定時器,定時從queue拉取Block然後通過ReceivedBlockHandler将其存儲的線程。
具體如下圖所示:
2. 一些問題的思考
在整個的執行流程中,線性調用鍊上基本不會發生任何問題。隻是在BlockGenerator緩存接受的資料,生成Block的時候需要重點關注,以及receivedBlockHandler如何将Blocks存儲在Spark Mem中也需要被關注。
- BlockGenerator中的currentBuffer:它緩存的資料會被定時器每隔blockInterval(預設200ms)的時間拿走,這個緩存所用的是spark運作時記憶體(預設heap*0,4),而不是storage記憶體。那麼如果在一個blockInterval時間内,接受速率很大的話,這部分記憶體很容易OOM 或者 進行大量的 GC,進而導緻receiver所在的Executor極容易挂掉,或者處理速度很慢。
- BlockGenerator中的BlockQueue作為緩存buffer和BlockManager的中轉站,一般沒有什麼問題。
- receivedBlockManager将擷取的Block都存儲在自己所在的這個Executor上,即一個batchInterval内的所有資料都存儲在一個Executor上,如果Executor的storage 記憶體不是很大,而一個batchInterval時間内接受過多的資料,很容撐爆記憶體導緻Receiver挂掉。
- 時刻關注batch processing延遲,如果batch processing延遲較大,那麼上遊的資料會被累計,進而導緻記憶體問題。
- 盡量打開 backpressure 的功能 link。
StorageLevel的設定:
-
: 如果Receiver一旦挂掉,部分block就找不到,進而整個作業失敗。Memory_Disk
-
: 有了replication,一般就不容易出現資料丢失,但是記憶體壓力大問題仍然存在。Memory_Disk_2
-
: 資料序列化存儲,比較适合。Memory_AND_DISK_SER_2
3. 一些優化嘗試
- 讓receiver盡量均勻的分布到Executor上, 1.5版本之後采用的這種方式。
- 啟動的Receiver個數:一般為Executor個數的 1/4, 在做repelication的情況下,記憶體最大可以占用到 1/2 的storage.
- 務必給你系統設定 spark.streaming.receiver.maxRate。假設你啟動了 N個 Receiver,那麼你系統實際會接受到的資料不會超過 N*MaxRate,也就是說,maxRate參數是針對每個 Receiver 設定的。這個MaxRate在實際使用中需要測試出經驗值
- 設定blockInterval:
blockInterval = batchInterval * #receiver / (partitionFactor * sparkProcessCore) sparkProcessCores = spark.cores.max - #receiver
- 單個Executor的記憶體不要設定的太大,避免大記憶體GC問題。