Spark Streaming對實時資料流進行分析處理,源源不斷的從資料源接收資料切割成一個個時間間隔進行處理;
流處理與批處理有明顯差別,批進行中的資料有明顯的邊界、資料規模已知;而流處理資料流并沒有邊界,也未知資料規模;
由于流處理的資料流特征,使之資料流具有不可預測性,而且資料處理的速率還與硬體、網絡等資源有關,在這種情況下如不對源源不斷進來的資料流速率進行限制,那當Spark節點故障、網絡故障或資料處理吞吐量下來時還有資料不斷流進來,那将有可能将出現OOM進而導緻Spark Streaming程式崩潰;
在Spark Streaming中不同的資料源采用不同的限速政策,但無論是Socket資料源的限流政策還是Kafka資料源的限流政策其速率(rate)的計算都是使用PIDController算法進行計算而得來;
下面從源碼的角度分别介紹Socket資料源與Kafka資料源的限流處理。
速率限制的計算與更新
Spark Streaming的流處理其實是基于微批處理(MicroBatch)的,也就是說将資料流按某比較小的時間間隔将資料切割成為一段段微批資料進行處理;

StreamingContext調用Start()啟動的時候會将速率控制器(rateController)添加到StreamingListener監聽器中;
當每批次處理完成時将觸發監聽器(RateController),使用該批處理的處理結束時間、處理延遲時間、排程延遲時間、記錄行數調用PIDRateEstimator傳入PID算法中(PID Controller)計算出該批次的速率(rate)并更新速率限制(rateLimit)與釋出該限制速率;
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
processingEnd <- batchCompleted.batchInfo.processingEndTime
workDelay <- batchCompleted.batchInfo.processingDelay
waitDelay <- batchCompleted.batchInfo.schedulingDelay
elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
newRate.foreach { s =>
rateLimit.set(s.toLong)
publish(getLatestRate())
}
}
Socket資料源限流
批次的限制速率上面已經算出,這裡說的是接收Socket過來的資料時的資料限流;
SocketInputStream類receive方法接收到資料後将資料存入 BlockGenerator的Buffer中,在寫入Buffer前調用限流器 (RateLimiter)對寫入資料進行限流;
RateLimiter限流器使用了Google開源的 Guava中内置的RateLimiter限流器,該類隻是對Guava限流器的簡單封裝;
在Spark Streaming中可通過使用兩個參數配置初始速率與最大速率spark.streaming.receiver.maxRate、spark.streaming.backpressure.initialRate;亦可配置PIDController算法相關的四個參數值;
RateLimiter限流器是基于令牌桶的算法基本原理比較簡單,以一個恒定的速率生成令牌放入令牌桶中,桶滿則停止,處理請求時需要從令牌桶中取出令牌,當桶中無令牌可取時阻塞等待,此算法用于確定系統不被洪峰擊垮。
private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
/**
* Push a single data item into the buffer.
*/
def addData(data: Any): Unit = {
if (state == Active) {
//調用限流器等待
waitToPush()
synchronized {
if (state == Active) {
currentBuffer += data
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
def waitToPush() {
//限流器申請令牌
rateLimiter.acquire()
}
Guava庫中RateLimiter限流器基本使用:
//建立限流器,每秒産生令牌數1
RateLimiter rateLimiter=RateLimiter.create(1);
for (int i = 0; i < 10; i++) {
//獲得一個令牌,未申請到令牌則阻塞等待
double waitTime = rateLimiter.acquire();
System.out.println(String.format("id:%d time:%d waitTime:%f",i,System.currentTimeMillis(),waitTime));
}
Kafka資料源限流的實作
在Spark Streaming Kafka包拉取Kafka資料會進行如下動作:
1、取Kafka中最新偏移量、分區
2、通過rateController限制每個分區可拉取的最大消息數
3、在DirectKafkaInputDStream中建立KafkaRDD,在其中調用相關對象拉取資料
通過如上步驟也可用看出,隻要限制了Kafka某個分區的偏移量(offset)範圍也就可限制從Kafka拉取的消息數量,進而達到限流的目的,Spark streaming kafka也是通過此實作的;
計算每個分區速率限制,有如下步驟:
1、通過seekToEnd擷取最新可用偏移量與目前偏移量對比獲得目前所有分區延遲偏移量
單個分區偏移量延遲=最新偏移量記錄-目前偏移量記錄
2、擷取配置項中每個分區最大速率
(spark.streaming.kafka.maxRatePerPartition),背壓率計算,計算每個分區背壓率計算公式為:
單個分區背壓率=單個分區偏移量延遲/所有分區總延遲*速率限制
速率限制(rateLimit):為通過PIDController動态計算得來
如有配置每個分區最大速率則取配置項最大速率與背壓率兩者中的最小值,未配置則取背壓率作為每個分區速率限制;
3、将批次間隔(batchDuration)*每個分區速率限制=每個分區最大消息數
4、取目前分區偏移量+分區最大消息數 與 最新偏移量兩者當中最小的,由此來控制拉取消息速率;
如目前偏移量+分區最大消息數 大于 最新偏移量則取 最新偏移量否則取 目前偏移量+分區最大消息數作為拉取Kafka資料的Offset範圍;
// 限制每個分區最大消息數
protected def clamp(
offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
maxMessagesPerPartition(offsets).map { mmp =>
mmp.map { case (tp, messages) =>
val uo = offsets(tp)
tp -> Math.min(currentOffsets(tp) + messages, uo)
}
}.getOrElse(offsets)
}
不管是Kafka資料源還是Socket資料源Spark Streaming中都使用了PIDController算法用于計算其速率限制值,兩者的差别也隻是因為兩種資料源的擷取方式資料特征而決定的。Socket資料源使用了Guava RateLimiter、而Kafka資料源自己實作了基于Offsets的限流;
以上說介紹的架構版本為:Spark Streaming 版本為2.3.2與spark-streaming-kafka-0-10_2.11;
參考資料:
http://kafka.apache.org
http://spark.apache.org
文章首發位址:Solinx
https://mp.weixin.qq.com/s/yHStZgTAGBPoOMpj4e27Jg