天天看點

【Spark】Spark Streaming的back pressure1.美圖

1.美圖

【Spark】Spark Streaming的back pressure1.美圖

在講flink的back pressure之前,我們先講講Spark Streaming的back pressure。Spark Streaming的back pressure出現的原因呢,我想大家應該都知道,是為了應對短期資料尖峰。Spark Streaming的back pressure是從spark 1.5以後引入的,在之前呢,隻能通過限制最大消費速度(這個要人為壓測預估),對于基于Receiver 形式,我們可以通過配置 spark.streaming.receiver.maxRate 參數來限制每個 receiver 每秒最大可以接收的記錄的資料;對于 Direct Approach 的資料接收,我們可以通過配置 spark.streaming.kafka.maxRatePerPartition 參數來限制每次作業中每個 Kafka 分區最多讀取的記錄條數。

在Spark 1.5之前,流應用程式如果因為資源不足導緻處理資料的速度跟不上接收資料的速度的情況,可以通過設定每秒所接收資料的最大條數來緩解這種情況。對于使用Receiver的方式可以通過設定’spark.streaming.receiver.maxRate’參數;對于使用Direct的方式設定參數’spark.streaming.kafka.maxRatePerPartition’。

這種限速的弊端很明顯,比如假如我們後端處理能力超過了這個最大的限制,會導緻資源浪費。需要對每個spark Streaming任務進行壓測預估。成本比較高。

由此,Spark在1.5版本中引入了背壓功能,就不再需要設定上述的速率限制了,Spark Streaming會随着處理條件的變化,自動計算所需要的速率,并進行動态地調整,但前提條件是要通過設定參數’spark.streaming.backpressure.enabled’=true來啟用這個功能,因為這個功能在預設情況下是未啟用的。

這種機制呢實際上是基于自動控制理論的pid這個概念。我們就簡單講一下其中思路:為了實作自動調節資料的傳輸速率,在原有的架構上新增了一個名為 RateController 的元件,這個元件繼承自 StreamingListener,其監聽所有作業的 onBatchCompleted 事件,并且基于 processingDelay 、schedulingDelay 、目前 Batch 處理的記錄條數以及處理完成事件來估算出一個速率;這個速率主要用于更新流每秒能夠處理的最大記錄的條數。這樣就可以實作處理能力好的話就會有一個較大的最大值,處理能力下降了就會生成一個較小的最大值。來保證Spark Streaming流暢運作。

pid速率計算源碼

【Spark】Spark Streaming的back pressure1.美圖

配置Spark Streaming的back pressure

spark.streaming.backpressure.initialRate: 啟用反壓機制時每個接收器接收第一批資料的初始最大速率。預設值沒有設定。

spark.streaming.backpressure.rateEstimator:速率估算器類,預設值為 pid ,目前 Spark 隻支援這個,大家可以根據自己的需要實作。

spark.streaming.backpressure.pid.proportional:用于響應錯誤的權重(最後批次和目前批次之間的更改)。預設值為1,隻能設定成非負值。weight for response to “error” (change between last batch and this batch)

spark.streaming.backpressure.pid.integral:錯誤積累的響應權重,具有抑制作用(有效阻尼)。預設值為 0.2 ,隻能設定成非負值。weight for the response to the accumulation of error. This has a dampening effect.

spark.streaming.backpressure.pid.derived:對錯誤趨勢的響應權重。 這可能會引起 batch size 的波動,可以幫助快速增加/減少容量。預設值為0,隻能設定成非負值。weight for the response to the trend in error. This can cause arbitrary/noise-induced fluctuations in batch size, but can also help react quickly to increased/reduced capacity.

spark.streaming.backpressure.pid.minRate:可以估算的最低費率是多少。預設值為 100

參考:flink和spark Streaming中的Back Pressure

繼續閱讀