天天看點

Spark Streaming 資料接受 導入到記憶體 生成Block的過程

1. 資料接受,生成Block 流程

  • streamingContext啟動時,會啟動ReceiverTracker, 它會将需要啟動的Receiver分發到Executor上,Executor上的ReceiverSuperviser會負責Receiver的啟動,停止,重新開機管理(後續有詳細文章描述:如何分發,啟動等)。
  • Receiver上的store(Item),實際調用ReceiverSuperviser(具體實作ReceiverSuperviserImpl)的pushSingle(),來存儲每條資料。
  • 在ReceiverSuperviserImpl的内部,他是通過BlockGenerator.addData 來進行具體的資料存儲
  • 在BlockGenerator内部,有幾個核心元件:緩存資料的buffer, 存儲block的queue, 定時将buffer資料包裝成Block扔到queue的定時器,定時從queue拉取Block然後通過ReceivedBlockHandler将其存儲的線程。

    具體如下圖所示:

    Spark Streaming 資料接受 導入到記憶體 生成Block的過程

2. 一些問題的思考

在整個的執行流程中,線性調用鍊上基本不會發生任何問題。隻是在BlockGenerator緩存接受的資料,生成Block的時候需要重點關注,以及receivedBlockHandler如何将Blocks存儲在Spark Mem中也需要被關注。

  • BlockGenerator中的currentBuffer:它緩存的資料會被定時器每隔blockInterval(預設200ms)的時間拿走,這個緩存所用的是spark運作時記憶體(預設heap*0,4),而不是storage記憶體。那麼如果在一個blockInterval時間内,接受速率很大的話,這部分記憶體很容易OOM 或者 進行大量的 GC,進而導緻receiver所在的Executor極容易挂掉,或者處理速度很慢。
  • BlockGenerator中的BlockQueue作為緩存buffer和BlockManager的中轉站,一般沒有什麼問題。
  • receivedBlockManager将擷取的Block都存儲在自己所在的這個Executor上,即一個batchInterval内的所有資料都存儲在一個Executor上,如果Executor的storage 記憶體不是很大,而一個batchInterval時間内接受過多的資料,很容撐爆記憶體導緻Receiver挂掉。
  • 時刻關注batch processing延遲,如果batch processing延遲較大,那麼上遊的資料會被累計,進而導緻記憶體問題。
  • 盡量打開 backpressure 的功能 link。

StorageLevel的設定:

  • Memory_Disk

    : 如果Receiver一旦挂掉,部分block就找不到,進而整個作業失敗。
  • Memory_Disk_2

    : 有了replication,一般就不容易出現資料丢失,但是記憶體壓力大問題仍然存在。
  • Memory_AND_DISK_SER_2

    : 資料序列化存儲,比較适合。

3. 一些優化嘗試

  1. 讓receiver盡量均勻的分布到Executor上, 1.5版本之後采用的這種方式。
  2. 啟動的Receiver個數:一般為Executor個數的 1/4, 在做repelication的情況下,記憶體最大可以占用到 1/2 的storage.
  3. 務必給你系統設定 spark.streaming.receiver.maxRate。假設你啟動了 N個 Receiver,那麼你系統實際會接受到的資料不會超過 N*MaxRate,也就是說,maxRate參數是針對每個 Receiver 設定的。這個MaxRate在實際使用中需要測試出經驗值
  4. 設定blockInterval:

    blockInterval = batchInterval * #receiver / (partitionFactor * sparkProcessCore) sparkProcessCores = spark.cores.max - #receiver

  5. 單個Executor的記憶體不要設定的太大,避免大記憶體GC問題。

繼續閱讀