要在叢集上的Spark Streaming應用程式中獲得最佳性能,需要進行一些調整。本文介紹了如何調整參數和配置以提高應用程式性能。這些主要是基于以下兩個層次進行考量的。
- 過有效使用群集資源減少每批資料的處理時間。
- 設定正确的批處理大小,以便可以按接收到的批處理速度處理一批資料(也就是說,資料處理跟上資料拉取的速度)。
1、減少批處理時間
Spark可以進行很多優化,以最大程度地減少每批的處理時間。這些已在《Spark調優指南》中詳細讨論,本文就最重要的一些調優方法進行一次簡單介紹。
1.1 資料接收中的并行度
通過網絡(例如Kafka,socket等)接收資料需要将資料反序列化并存儲在Spark中。如果資料接收成為系統的瓶頸,請考慮并行化資料接收。請注意,每個輸入DStream都會建立一個接收器Receiver(在Executor上運作),該接收器接收單個資料流。是以,可以通過建立多個輸入DStream并将其配置為從源接收資料流的不同分區來實作接收多個資料流。例如,可以将接收兩個Topic的單個Kafka輸入DStream拆分為兩個Kafka輸入流,每個輸入流僅接收一個主題。這将運作兩個接收器,進而允許并行接收資料,進而提高了總體吞吐量。多個DStream可以結合在一起以建立單個DStream。然後,可以将應用于單個輸入DStream的轉換應用于統一流。代碼做如下。
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
應該考慮的另一個參數是接收器的塊間隔,該間隔由配置參數确定 spark.streaming.blockInterval。對于大多數接收器,接收到的資料在存儲在Spark記憶體中之前會合并為資料塊。每批中的塊數确定了将在類似map和transform中用于處理接收到的資料的任務數。每批接收器中每個接收器的任務數大約為(批處理間隔/塊間隔)。例如,200 ms的塊間隔将每2秒批處理建立10個任務。如果任務數太少(即少于每台計算機的核心數),那麼它将效率低下,因為将不使用所有可用的核心來處理資料。要增加給定批處理間隔的任務數,請減小阻止間隔。但是,建議的塊間隔最小值約為50毫秒,如果小于這個值,可能會造成額外的任務啟動開銷。
使用多個輸入流/接收器接收資料的一種替代方法是顯式地對輸入資料流進行分區(使用
inputStream.repartition(number of partitions)
)。在進行下一步處理之前,這會将接收到的資料批分布在群集中指定數量的計算機上。
1.2 資料序列化
可以通過調整序列化格式來減少資料序列化的開銷。在流傳輸的情況下,有兩種類型的資料推薦序列化。
- 輸入資料:預設情況下,通過Receivers接收的輸入資料通過
存儲在執行程式的記憶體中。也就是說,資料被序列化為位元組以減少GC開銷。同樣,資料首先儲存在記憶體中,并且僅在記憶體不足以容納流計算所需的所有輸入資料時才溢寫到磁盤。當然,這種序列化會産生開銷–接收器必須對接收到的資料進行反序列化,然後使用Spark的序列化格式對其進行重新序列化。StorageLevel.MEMORY_AND_DISK_SER_2
- 流操作生成的持久RDD:流計算生成的RDD可以保留在記憶體中。例如,視窗操作會将資料保留在記憶體中,因為它們将被多次處理。但是,與Spark Core預設的
不同,預設情況下,流式計算生成的持久性RDD與StorageLevel.MEMORY_ONLY
(即序列化)保持一緻,以最大程度地減少GC開銷。StorageLevel.MEMORY_ONLY_SER
在這兩種情況下,使用Kryo序列化都可以減少CPU和記憶體的開銷。有關更多詳細資訊,請參見《Spark調優指南》。對于Kryo,請考慮注冊自定義類,并禁用對象引用跟蹤(請參閱《spark調優指南》中與Kryo相關的配置)。
在流應用程式需要保留的資料量不大的特定情況下,将資料(兩種類型)持久化為反序列化對象是可行的,而不會産生過多的GC開銷。例如,如果您使用的是幾秒鐘的批處理間隔并且沒有視窗操作,則可以嘗試通過顯式設定存儲級别來禁用持久化資料中的序列化。這将減少由于序列化導緻的CPU開銷,進而可能在沒有太多GC開銷的情況下提高性能。
1.3 任務啟動開銷
如果每秒啟動的任務數量很高(例如,每秒50個或更多),則向從伺服器發送任務的開銷可能會很大,并且将難以實作亞秒級的延遲。可以通過以下更改來減少開銷:
- 執行模式:在獨立模式或粗粒度Mesos模式下運作Spark可以比細粒度Mesos模式縮短任務啟動時間。有關更多詳細資訊,請參閱“在Mesos上 運作”指南。
這些更改可以将批處理時間減少100毫秒,進而使亞秒級的批處理大小可行。
2、設定正确的批次間隔
為了使在群集上運作的Spark Streaming應用程式穩定,系統應能夠盡快處理接收到的資料。換句話說,應盡快處理一批資料。可以通過監視流式Web UI中的處理時間來找到合适的批次間隔時間 ,其中批處理時間應小于批處理間隔。
根據流計算的性質,所使用的批處理間隔可能會對資料速率産生重大影響,該速率可以由應用程式在一組固定的群集資源上維持。例如,讓我們考慮前面的WordCountNetwork示例。對于特定的資料速率,系統可能能夠跟上每2秒(即2秒的批處理間隔)但不是每500毫秒報告一次字計數的情況。是以,需要設定批次間隔,以便可以維持生産中的預期資料速率。
找出适合您的應用程式的正确批處理大小的一種好方法是使用保守的批處理間隔(例如5-10秒)和低資料速率進行測試。要驗證系統是否能夠跟上資料速率,可以檢查每個已處理批處理經曆的端到端延遲的值(可以在Spark驅動程式log4j日志中查找“ Total delay”,也可以使用 StreamingListener 接口)。如果延遲保持與批次大小相當,則系統是穩定的。否則,如果延遲持續增加,則意味着系統無法跟上,是以不穩定。一旦有了穩定配置的想法,就可以嘗試提高資料速率和/或減小批處理大小。注意,由于暫時的資料速率增加而引起的延遲的瞬時增加可能是好的,隻要延遲減小回到較低的值(即小于批大小)即可。
3、記憶體調優
在《spark調優指南》中已詳細讨論了調整 Spark應用程式的記憶體使用情況和GC行為。強烈建議您閱讀。在本節中,我們将專門在Spark Streaming應用程式的上下文中讨論一些調整參數。
Spark Streaming應用程式所需的群集記憶體量在很大程度上取決于所使用的轉換類型。例如,如果要對最後10分鐘的資料使用視窗操作,則群集應具有足夠的記憶體以将10分鐘的資料儲存在記憶體中。或者,如果您想使用updateStateByKey大量的按鍵,則所需的存儲空間會很大。相反,如果您想執行一個簡單的map-filter-store操作,則所需的記憶體将很少。
通常,由于通過接收器接收的資料存儲在
StorageLevel.MEMORY_AND_DISK_SER_2
中,是以無法容納在記憶體中的資料将溢寫到磁盤上。這可能會降低流應用程式的性能,是以建議根據流應用程式的要求提供足夠的記憶體。最好嘗試以小規模檢視記憶體使用情況并據此進行估計。
記憶體調整的另一個方面是垃圾回收。對于需要低延遲的流應用程式,不希望有由JVM垃圾回收引起的大停頓。
有一些參數可以幫助您調整記憶體使用和GC開銷:
- DStream的持久性級别:如前面的“ 資料序列化”部分所述,預設情況下,輸入資料和RDD被持久化為序列化位元組。與反序列化的持久性相比,這減少了記憶體使用和GC開銷。啟用Kryo序列化可進一步減少序列化的大小和記憶體使用量。通過壓縮可以進一步減少記憶體使用(請參見Spark配置spark.rdd.compress),而這會占用CPU時間。
- 清除舊資料:預設情況下,将自動清除DStream轉換生成的所有輸入資料和持久的RDD。Spark Streaming根據使用的轉換來決定何時清除資料。例如,如果您使用10分鐘的視窗操作,那麼Spark Streaming将保留最後10分鐘的資料,并主動丢棄較舊的資料。通過以下設定可以将資料保留更長的時間(例如,以互動方式查詢較早的資料):streamingContext.remember。
- CMS垃圾收集器:強烈建議使用并發标記掃掠GC,以使與GC相關的暫停時間始終保持較低。盡管已知并發GC會降低系統的整體處理吞吐量,但仍建議使用并發GC以實作更一緻的批處理時間。確定在驅動程式(使用–driver-java-options中spark-submit)和執行程式(使用Spark配置 spark.executor.extraJavaOptions)上都設定了CMS GC 。
其他提示:為了進一步減少GC開銷,請嘗試以下更多提示。
- 使用OFF_HEAP存儲級别持久化RDD 。
- 使用更多具有較小堆大小的excutor。這将減少每個JVM堆中的GC壓力。