天天看點

Spark Streaming 調優實踐

Spark Streaming 調優實踐

在使用 Spark 和 Spark Streaming 時,當我們将應用部署在叢集上時,可能會碰到運作慢、占用過多資源、不穩定等問題,這時需要做一些優化才能達到最好的性能。有時候一個簡單的優化可以起到化腐朽為神奇的作用,使得程式能夠更加有效率,也更加節省資源。本文我們就來介紹一些能夠提高應用性能的參數和配置。

另外需要指出的是,優化本身是一個具體性很強的事情,不同的應用及落地場景會有不同的優化方式,并沒有一個統一的優化标準。本文我們簡單聊聊一些在項目中踩過的“坑”,列舉以下常見的優化方式。

▌資料序列化

在分布式應用中,序列化(serialization)對性能的影響是顯著的。如果使用一種對象序列化慢、占用位元組多的序列化格式,就會嚴重降低計算效率。通常在 Spark 中,主要有如下3個方面涉及序列化:

① 在算子函數中使用到外部變量時,該變量會被序列化後進行網絡傳輸。

② 将自定義的類型作為 RDD 的泛型類型時,所有自定義類型對象都會進行序列化。是以這種情況下,也要求自定義的類必須實作 Serializable 接口。

③ 使用可序列化的持久化政策時(比如 MEMORY_ONLY_SER),Spark 會将 RDD 中的每個 partition 都序列化成一個大的位元組數組。

而 Spark 綜合考量易用性和性能,提供了下面兩種序列化庫。

① Java 序列化:預設情況下,Spark 使用 Java 的對象輸出流架構(ObjectOutputStream framework)來進行對象的序列化,并且可用在任意實作 Java.io.Serializable 接口的自定義類上。我們可以通過擴充 Java.io.Externalizable 來更加精細地控制序列化行為。Java 序列化方式非常靈活,但是通常序列化速度非常慢而且對于很多類會産生非常巨大的序列化結果。

② Kryo 序列化:Spark 在2.0.0以上的版本可以使用 Kryo 庫來非常快速地進行對象序列化,Kryo 要比 Java 序列化更快、更緊湊(10倍),但是其不支援所有的 Serializable 類型,并且在使用自定義類之前必須先注冊。

我們可以在初始化 SparkConf 時,調用 conf.set("spark.serializer","org.apache.spark. serializer.KryoSerializer")來使用 Kryo。一旦進行了這個配置,Kryo 序列化不僅僅會用在 Shuffling 操作時 worker 節點間的資料傳遞,也會用在 RDDs 序列化到硬碟的過程。

Spark 官方解釋沒有将 Kryo 作為預設序列化方式的唯一原因是,Kryo 必須使用者自己注冊(注意如果我們不注冊自定義類,Kryo 也是可以正常運作的,但是它必須存儲每個對象的完整類名,這是非常浪費的),但是其推薦在網絡頻繁傳輸的應用中使用 Kryo。

另外值得注意的是,在 Spark 2.0.0 之後,Spark 已經預設将 Kryo 序列化作為簡單類型(基本類型、基本類型的數組及 string 類型)RDD 進行 Shuffling 操作時傳輸資料的對象序列化方式。

Spark 已經自動包含注冊了絕大部分 Scala 的核心類,如果需要向 Kryo 注冊自己的類别,可以使用 registerKryoClasses 方法。使用 Kryo 的代碼架構如下:

// Spark配置項

val conf = new SparkConf().setMaster(...).setAppName(...)

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 配置序列化方式

conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) // 注冊需要序列化的類

val sc = new SparkContext(conf)

如果我們的對象非常大,可能需要增加 Spark.kryoserializer.buffer 的配置。

同樣在 Spark Streaming 中,通過優化序列化格式可以縮減資料序列化的開銷,而在 Streaming 中還會涉及以下兩類資料的序列化。

  • 輸入資料:Spark Streaming 中不同于 RDD 預設是以非序列化的形式存于記憶體當中,Streaming 中由接收器(Receiver)接收而來的資料,預設是以序列化重複形式(StorageLevel.MEMORY_AND_DISK_SER_2)存放于 Executor 的記憶體當中。而采用這種方式的目的,一方面是由于将輸入資料序列化為位元組流可以減少垃圾回收(GC)的開銷,另一方面對資料的重複可以對 Executor 節點的失敗有更好的容錯性。同時需要注意的是,輸入資料流一開始是儲存在記憶體當中,當記憶體不足以存放流式計算依賴的輸入資料時,會自動存放于硬碟當中。而在 Streaming 中這部分序列化是一個很大的開銷,接收器必須先反序列化(deserialize)接收到的資料,然後再序列化(serialize)為 Spark 本身的序列化格式。
  • 由 Streaming 操作産生 RDD 的持久化:由流式計算産生的 RDDs 有可能持久化在記憶體當中,例如由于基于視窗操作的資料會被反複使用,是以會持久化在記憶體當中。值得注意的是,不同于 Spark 核心預設使用非序列化的持久化方式(StorageLevel. MEMORY_ONLY),流式計算為了減少垃圾回收(GC)的開銷,預設使用了序列化的持久化方式(StorageLevel.MEMORY_ONLY_SER)。

不管在 Spark 還是在 Spark Streaming 中,使用 Kryo 序列化方式,都可以減少 CPU 和記憶體的開銷。而對于流式計算,如果資料量不是很大,并且不會造成過大的垃圾回收(GC)開銷,我們可以考慮利用非序列化對象進行持久化。

例如,我們使用很小的批處理時間間隔,并且沒有基于視窗的操作,可以通過顯示設定相應的存儲級别來關閉持久化資料時的序列化,這樣可以減少序列化引起的 CPU 開銷,但是潛在的增加了 GC 的開銷。

▌廣播大變量

我們知道,不論 Spark 還是 Spark Streaming 的應用,在叢集節點間進行資料傳輸時,都會有序列化和反序列化的開銷,而如果我們的應用有非常大的對象時,這部分開銷是巨大的。比如應用中的任何子任務需要使用 Driver 節點的一個大型配置查詢表,這時就可以考慮将該表通過共享變量的方式,廣播到每一個子節點,進而大大減少在傳輸和序列化上的開銷。

另外,Spark 在 Master 節點會列印每個任務的序列化對象大小,我們可以通過觀察任務的大小,考慮是否需要廣播某些大變量。通常一個任務的大小超過 20KB,是值得去優化的。

當我們将大型的配置查詢表廣播出去時,每個節點可以讀取配置項進行任務計算,那麼假設配置發生了動态改變時,如何通知各個子節點配置表更改了呢?(尤其是對于流式計算的任務,重新開機服務代價還是蠻大的。)

我們知道廣播變量是隻讀的,也就是說廣播出去的變量沒法再修改,那麼應該怎麼解決這個問題呢?我們可以利用 Spark 中的 unpersist() 函數,Spark 通常會按照 LRU(least Recently Used)即最近最久未使用原則對老資料進行删除,我們并不需要操作具體的資料,但如果是手動删除,可以使用 unpersist() 函數。

是以這裡更新廣播變量的方式是,利用 unpersist() 函數先将已經釋出的廣播變量删除,然後修改資料後重新進行廣播,我們通過一個廣播包裝類來實作這個功能,代碼如下:

import java.io.{ ObjectInputStream, ObjectOutputStream }

import org.apache.spark.broadcast.Broadcast

import org.apache.spark.streaming.StreamingContext

import scala.reflect.ClassTag

// 通過包裝器在DStream的foreachRDD中更新廣播變量

// 避免産生序列化問題

case class BroadcastWrapper[T: ClassTag](

    @transient private val ssc: StreamingContext,

    @transient private val _v: T) {

  @transient private var v = ssc.sparkContext.broadcast(_v)

  def update(newValue: T, blocking: Boolean = false): Unit = {

    // 删除RDD是否需要鎖定

    v.unpersist(blocking)

    v = ssc.sparkContext.broadcast(newValue)

  }

  def value: T = v.value

  private def writeObject(out: ObjectOutputStream): Unit = {

    out.writeObject(v)

  private def readObject(in: ObjectInputStream): Unit = {

    v = in.readObject().asInstanceOf[Broadcast[T]]

}

利用 wrapper 更新廣播變量,可以動态地更新大型的配置項變量,而不用重新啟動計算服務,大緻的處理邏輯如下:

// 定義

val yourBroadcast = BroadcastWrapper[yourType](ssc, yourValue)

yourStream.transform(rdd => {

  //定期更新廣播變量

  if (System.currentTimeMillis - someTime > Conf.updateFreq) {

    yourBroadcast.update(newValue, true)

  // do something else

})

▌資料處理和接收時的并行度

作為分布式系統,增加接收和處理資料的并行度是提高整個系統性能的關鍵,也能夠充分發揮叢集機器資源。

關于 partition 和 parallelism。partition 指的就是資料分片的數量,每一次 Task 隻能處理一個 partition 的資料,這個值太小了會導緻每片資料量太大,導緻記憶體壓力,或者諸多 Executor 的計算能力無法充分利用;但是如果 partition 太大了則會導緻分片太多,執行效率降低。

在執行 Action 類型操作的時候(比如各種 reduce 操作),partition 的數量會選擇 parent RDD 中最大的那一個。而 parallelism 則指的是在 RDD 進行 reduce 類操作的時候,預設傳回資料的 paritition 數量(而在進行 map 類操作的時候,partition 數量通常取自 parent RDD 中較大的一個,而且也不會涉及 Shuffle,是以這個 parallelism 的參數沒有影響)。

由上述可得,partition 和 parallelism 這兩個概念密切相關,都是涉及資料分片,作用方式其實是統一的。通過 Spark.default.parallelism 可以設定預設的分片數量,而很多 RDD 的操作都可以指定一個 partition 參數來顯式控制具體的分片數量,如 reduceByKey和reduceByKeyAndWindow。

Spark Streaming 接收 Kafka 資料的方式,這個過程有一個資料反序列化并存儲到 Spark 的開銷,如果資料接收成為了整個系統的瓶頸,那麼可以考慮增加資料接收的并行度。每個輸入 DStream 會建立一個單一的接收器(receiver 在 worker 節點運作)用來接收一個單一的資料流。而對于接收多重資料的情況,可以建立多個輸入 DStream 用來接收源資料流的不同分支(partitions)。

如果我們利用 Receiver 的形式接收 Kafka,一個單一的 Kafka 輸入 DStream 接收了兩個不同 topic 的資料流,我們為了提高并行度可以建立兩個輸入流,分别接收其中一個 topic 上的資料。這樣就可以建立兩個接收器來并行地接收資料,進而提高整體的吞吐量。而之後對于多個 DStreams,可以通過 union 操作并為一個 DStream,之後便可以在這個統一的輸入 DStream 上進行操作,代碼示例如下:

val numStreams = 5

    val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...)}

    val unifiedStream = streamingContext.union(kafkaStreams)

unifiedStream.print()

如果采用 Direct 連接配接方式,前面講過 Spark 中的 partition 和 Kafka 中的 partition 是一一對應的,但一般預設設定為 Kafka 中 partition 的數量,這樣來達到足夠并行度以接收 Kafka 資料。

▌設定合理的批處理間隔

對于一個 Spark Streaming 應用,隻有系統處理資料的速度能夠趕上資料接收的速度,整個系統才能保持穩定,否則就會造成資料積壓。換句話說,即每個 batch 的資料一旦生成就需要被盡快處理完畢。這一點我們可以通過 Spark 監控界面進行檢視(在2.3.4節我們介紹過),比較批處理時間必須小于批處理間隔。

通過設定合理的批處理大小(batch size),使得每批資料能夠在接收後被盡快地處理完成(即資料處理的速度趕上資料生成的速度)。

如何選取合适的批處理時間呢?一個好的方法是:先保守地設定一個較大的批處理間隔(如 5~10s),以及一個很低的資料速率,來觀測系統是否能夠趕上資料傳輸速率。我們可以通過檢視每個處理好的 batch 的端到端延遲來觀察,也可以看全局延遲來觀察(可以在 Spark log4j 的日志裡或者使用 StreamingListener 接口,也可以直接在 UI 界面檢視)。

如果延遲保持在一個相對穩定的狀态,則整個系統是穩定的,否則延遲不斷上升,那說明整個系統是不穩定的。在實際場景中,也可以直接觀察系統正在運作的 Spark 監控界面來判斷系統的穩定性。

▌記憶體優化

記憶體優化是在所有應用落地中必須經曆的話題,雖然 Spark 在記憶體方面已經為開發者做了很多優化和預設設定,但是我們還是需要針對具體的情況進行調試。

在優化記憶體的過程中需要從3個方面考慮這個問題:對象本身需要的記憶體;通路這些對象的記憶體開銷;垃圾回收(GC garbage collection)導緻的開銷。

通常來說,對于 Java 對象而言,有很快的通路速度,但是很容易消耗原始資料2~5倍以上的記憶體空間,可以歸結為以下幾點原因:

① 每個獨立的 Java 對象,都會有一個“對象頭”,大約16個位元組用來儲存一些基本資訊,如指向類的指針,對于一個隻包含很少資料量在内的對象(如一個 Int 類型資料),這個開銷是相對巨大的。

② Java 的 String 對象會在原始資料的基礎上額外開銷40個位元組,因為除了字元數組(Chars array)本身之外,還需要儲存如字元串長度等額外資訊,而且由于 String 内部存儲字元時是按照 UTF-16 格式編碼的,是以一個10字元的字元串開銷很容易超過60個字元。

③ 對于集合類(collection classes),如 HashMap、LinkedList,通常使用連結清單的形式将資料結構鍊在一起,那麼對于每一個節點(entry,如Map.Entry)都會有一個包裝器(wrapper),而這個包裝器對象不僅包含對象頭,還會儲存指向下一個節點的指針(每個8位元組)。

④ 熟悉 Java 的開發者應該知道,Java 資料類型分為基本類型和包裝類型,對于 int、long 等基本類型是直接在棧中配置設定空間,如果我們想将這些類型用在集合類中(如Map<String, Integer>),需要使用對基本資料類型打包(當然這是 Java 的一個自動過程),而打包後的基本資料類型就會産生額外的開銷。

針對以上記憶體優化的基本問題,接下來首先介紹 Spark 中如何管理記憶體,之後介紹一些能夠在具體應用中更加有效地使用記憶體的具體政策,例如,如何确定合适的記憶體級别,如何改變資料結構或将資料存儲為序列化格式來節省記憶體等,也會從 Spark 的緩存及 Java 的垃圾回收方面進行分析,另外,也會對 Spark Streaming 進行分析。

1. 記憶體管理

Spark 對于記憶體的使用主要有兩類用途:執行(execution)和存儲(storage)。執行類記憶體主要被用于 Shuffle 類操作、join 操作及排序(sort)和聚合(aggregation)類操作,而存儲類記憶體主要用于緩存資料(caching)和叢集間内部資料的傳送。

在 Spark 内部執行和存儲分享同一片記憶體空間(M),當沒有執行類記憶體被使用時,存儲類記憶體可以使用全部的記憶體空間,反之亦然。執行類記憶體可以剝奪存儲類記憶體的空間,但是有一個前提是,存儲類記憶體所占空間不得低于某一個門檻值 R,也就是說R指定了 M 中的一塊子空間塊是永遠不會被剝奪的。而另一方面由于實作上的複雜性,存儲類記憶體是不可以剝奪執行類記憶體的。

Spark 的這種設計方式確定了系統一些很好的特性:首先,如果應用不需要緩存資料,那麼所有的空間都可以用作執行類記憶體,可以一定程度上避免不必要的記憶體不夠用時溢出到硬碟的情況;其次,如果應用需要使用緩存資料,會有最小的記憶體空間R能夠保證這部分資料塊免于被剝奪;最後,這種方式對于使用者而言是完全黑盒的,使用者不需要了解内部如何根據不同的任務負載來進行記憶體劃分。

Spark 提供了兩個相關的配置,但是大多數情況下直接使用預設值就能滿足大部分負載情況:

  • Spark Memory.Fraction 表示 M 的大小占整個 JVM(Java Virtue Machine)堆空間的比例(預設是0.6),剩餘的空間(40%)被用來儲存使用者的資料結構及 Spark 内部的中繼資料(metadata),另一方面預防某些異常資料記錄造成的 OOM(Out of Memory)錯誤。
  • Spark.Memory.StorageFraction 表示 R 的大小占整個 M 的比例(預設是0.5),R 是存儲類記憶體在 M 中占用的空間,其中緩存的資料塊不會被執行類記憶體剝奪。

2. 優化政策

當我們需要初步判斷記憶體的占用情況時,可以建立一個 RDD,然後将其緩存(cache)起來,然後觀察網頁監控頁面的存儲頁部分,就可以看出 RDD 占用了多少記憶體。而對于特殊的對象,我們可以調用 SizeEstimator 的 estimate() 方法來評估記憶體消耗,這對于實驗不同資料層的記憶體消耗,以及判斷廣播變量在每個 Executor 堆上所占用的記憶體是非常有效的。

當我們了解了記憶體的消耗情況後,發現占用記憶體過大,可以着手做一些優化,一方面可以在資料結構方面進行優化。首先需要注意的是,我們要避免本章開頭提到的 Java 本身資料結構的頭部開銷,比如基于指針的資料結構或者包裝器類型,有以下方式可以進行優化:

  • 在設計資料結構時,優先使用基本資料類型及對象數組等,避免使用 Java 或者 Scala 标準庫當中的集合類(如 HashMap),在 fastutil 庫中,為基本資料類型提供了友善的集合類接口,這些接口也相容 Java 标準庫。
  • 盡可能避免在資料結構中嵌套大量的小對象和指針。
  • 考慮使用數值類 ID 或者枚舉對象來代替字元串類型作為主鍵(Key)。
  • 如果我們的運作時記憶體小于 32GB,可以加上 JVM 配置-XX:+UseCompressedOops 将指針的占用空間由8個位元組壓縮到4個位元組,我們也可以在 Spark-env.sh 中進行配置。

假設我們通過以上政策還是發現對象占用了過大的記憶體,可以用一個非常簡單的方式來降低記憶體使用,就是将對象以序列化的形式(serialized form)存儲,在 RDD 的持久化接口中使用序列化的存儲級别,如 MEMORY_ONLY_SER,Spark 便會将每個 RDD 分區存儲為一個很大的位元組數組。而這種方式會使得通路資料的速度有所下降,因為每個對象通路時都需要有一個反序列化的過程。在7.1節中我們已經介紹過,優先使用 Kryo 序列化方式,其占用大小遠低于 Java 本身的序列化方式。

3. 垃圾回收(GC)優化

如果我們在應用中進行了頻繁的 RDD 變動,那麼 JVM 的垃圾回收會成為一個問題(也就是說,假設在程式中隻建立了一個 RDD,後續所有操作都圍繞這個 RDD,那麼垃圾回收就不存在問題)。當 Java 需要通過删除舊對象來為新對象開辟空間時,它便會掃描我們曾建立的所有對象并找到不再使用的對象。

是以垃圾回收的開銷是和 Java 對象的個數成比例的,我們要盡可能地使用包含較少對象的資料結構(如使用 Int 數組代替 LinkedList)來降低這部分開銷。另外前面提到的用序列化形式存儲也是一個很好的方法,序列化後每個對象在每個 RDD 分區下僅有一個對象(一個位元組數組)。注意當 GC 開銷成為瓶頸時,首先要嘗試的便是序列化緩存(serialized caching)。

在做 GC 優化時,我們首先需要了解 GC 發生的頻率以及其所消耗的時間。這可以通過在 Java 選項中加入 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 來實作;之後當 Spark 任務運作後,便可以在 Worker 日志中看到 GC 發生時列印的資訊。注意這些日志是列印在叢集中的 Worker 節點上的(在工作目錄的 stdout 檔案中),而非 Driver 程式。

為了進一步優化 GC,首先簡單介紹下 Java 虛拟機内部是如何進行記憶體管理的。

① Java 對象是存儲在堆空間内的,堆空間被分為兩部分,即年輕區域(Young region)和老年區域(Old region),其中年輕代(Young generation)會用來存儲短生命周期的對象,而老年代(Old generation)會用來存儲較長生命周期的對象。

② 年輕代的區域又被分為3個部分 [Eden, Survivor1, Survivor2]。

③ 一個簡單的 GC 流程大緻是:當 Eden 區域滿了,一次小型 GC 過程會将 Eden 和 Survivor1 中還存活的對象複制到 Survivor2 區域上,Survivor 區域是可交換的(即來回複制),當一個對象存活周期已足夠長或者 Survivor2 區域已經滿時,那麼它們會被移動到老年代上,而當老年代的區域也滿了時,就會觸發一次完整的 GC 過程。

Java 的這種 GC 機制主要是基于程式中建立的大多數對象,都會在建立後被很快銷毀,隻有極少數對象會存活下來,是以其分為年輕代和老年代兩部分,而這兩部分 GC 的方式也是不同的,其時間複雜度也是不同的,年輕代會更加快一些,感興趣的讀者可以進一步查閱相關資料。

基于以上原因,Spark 在 GC 方面優化的主要目标是:隻有長生命周期的 RDD 會被存儲在老年代上,而年輕代上有足夠的空間來存儲短生命周期的對象,進而盡可能避免任務執行時建立的臨時對象觸發完整 GC 流程。我們可以通過以下步驟來一步步優化:

① 通過 GC 統計資訊觀察是否存在過于頻繁的 GC 操作,如果在任務完成前,完整的 GC 操作被調用了多次,那麼說明可執行任務并沒有獲得足夠的記憶體空間。

② 如果觸發了過多的小型 GC,而完整的 GC 操作并沒有調用很多次,那麼給 Eden 區域多配置設定一些記憶體空間會有所幫助。我們可以根據每個任務所需記憶體大小來預估 Eden 的大小,如果 Eden 設定大小為 E,可以利用配置項-Xmn=4/3*E 來對年輕代的區域大小進行設定(其中4/3的比例是考慮到 survivor 區域所需空間)。

③ 如果我們觀察 GC 列印的統計資訊,發現老年代接近存滿,那麼就需要改變 spark.memory.fraction 來減少存儲類記憶體(用于 caching)的占用,因為與其降低任務的執行速度,不如減少對象的緩存大小。另一個可選方案是減少年輕代的大小,即通過 -Xmn 來進行配置,也可以通過 JVM 的 NewRatio 參數進行調整,大多數 JVM 的該參數的預設值是2,意思是老年代占整個堆記憶體的2/3,這個比例需要大于 Spark.Memory.Fraction。

④ 通過加入 -XX:+UserG1GC 來使用 G1GC 垃圾回收器,這可以一定程度提高 GC 的性能。另外注意對于 executor 堆記憶體非常大的情況,一定通過 -XX:G1HeapRegionSize 來增加 G1 區域的大小。

針對以上步驟我們舉一個例子,如果我們的任務是從 HDFS 當中讀取資料,任務需要的記憶體空間可以通過從 HDFS 當中讀取的資料塊大小來進行預估,一般解壓後的資料塊大小會是原資料塊的2~3倍,是以如果我們希望3、4個任務同時運作在工作空間中,假設每個 HDFS 塊大小是 128MB,那麼需要将 Eden 大小設定為 4×3×128MB。改動之後,我們可以監控 GC 的頻率和時間消耗,看看有沒有達到優化的效果。

對于優化 GC,主要還是從降低全局 GC 的頻率出發,executor 中對于 GC 優化的配置可以通過 spark.executor.extraJavaOptions 來配置。

4. Spark Streaming 記憶體優化

前面介紹了 Spark 中的優化政策和關于 GC 方面的調優,對于 Spark Streaming 的應用程式,這些政策也都是适用的,除此之外還會有一些其他方面的優化點。

對于 Spark Streaming 應用所需要的叢集記憶體,很大程度上取決于要使用哪種類型的 transformation 操作。比如,假設我們想使用10分鐘資料的視窗操作,那麼我們的叢集必須有足夠的空間能夠儲存10分鐘的全部資料;亦或,我們在大量的鍵值上使用了 updateStateByKey 操作,那麼所需要的記憶體空間會較大。而如果我們僅僅使用簡單的 Map、Filter、Store 操作,那麼所需空間會較小。

預設情況下,接收器接收來的資料會以 StorageLevel.MEMORY_AND_DISK_SER_2 的格式存儲,那麼如果記憶體不足時,資料就會序列化到硬碟上,這樣會損失 Spark Streaming 應用的性能。是以通常建議為 Spark Streaming 應用配置設定充足的記憶體,可以在小規模資料集上進行測試和判斷。

另一方面與 Spark 程式有顯著差別的是,Spark Streaming 程式對實時性要求會較高,是以我們需要盡可能降低 JVM 垃圾回收所導緻的延遲。

基于此,我們可以通過以下幾個參數對記憶體使用和 GC 開銷進行優化調整。

  • DStream 的持久化級别:在前文中講過,輸入資料預設是持久化為位元組流的,因為相較于反序列化的開銷,其更會降低記憶體的使用并且減少 GC 的開銷。是以優先使用 Kryo 序列化方式,可以大大降低序列化後的尺寸和記憶體開銷。另外,如果需要更進一步減少記憶體開銷,可以通過配置 spark.rdd.compress 進行更進一步的壓縮(當然對于目前的叢集機器,大多數記憶體都足夠了)。
  • 及時清理老資料:預設情況下所有的輸入資料和由 DStream 的 Transormation 操作産生的持久 RDD 會被自動清理,即 Spark Streaming 會決定何時對資料進行清理。例如,假設我們使用10分鐘的視窗操作,Spark Streaming 會儲存之前10分鐘的所有資料,并及時清理過時的老資料。資料儲存的時間可以通過 stremingContext. remember 進行設定。
  • CMS 垃圾回收器:不同于之前我們在 Spark 中的建議,由于需要減少 GC 間的停頓,是以這裡建議使用并發标記清除類的 GC 方式。即使并發 GC 會降低全局系統的生産吞吐量,但是使用這種 GC 可以使得每個 Batch 的處理時間更加一緻(不會因為某個 Batch 處理時發生了 GC,而導緻處理時間劇增)。我們需要確定在 Driver 節點(在 spark-submit 中使用— driver-java-options)和 Executor 節點(在 Spark 配置中使用 spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC)都設定了 CMS GC 方式。
  • 其他減少 GC 開銷的方式有:可以通過 OFF_HEAP 存儲級别的 RDD 持久化方式,以及可以在 Executor 上使用更小的堆記憶體,進而降低每個 JVM 堆垃圾回收的壓力。

▌執行個體——項目實戰中的調優示例

在某輿情監控系統中,對于每天爬取的千萬級遊戲玩家評論資訊都要實時地進行詞頻統計,對于爬取到的遊戲玩家評論資料,我們會生産輸入到 Kafka 中,而另一端的消費者,我們采用了 Spark Streaming 來進行流式處理,首先利用 Direct 方式從 Kafka 拉取 batch,之後經過分詞、統計等相關處理,回寫到資料庫(DataBase,DB)上,由此高效實時的完成每天大量資料的詞頻統計任務。

對于資料量較小的情況,一般是不會暴露問題的,但是資料量增大後,就會暴露各種問題,這就需要進行一些調優和參數配置。可以通過以下幾方面進行調優嘗試。

1. 合理的批處理時間(batchDuration)

關于 Spark Streaming 的批處理時間設定是非常重要的,Spark Streaming 在不斷接收資料的同時,需要處理資料的時間,是以如果設定過段的批處理時間,會造成資料堆積,即未完成的 batch 資料越來越多,進而發生阻塞。

另外值得注意的是,batchDuration 本身也不能設定為小于 500ms,這會導緻 Spark Streaming 進行頻繁地送出作業,造成額外的開銷,減少整個系統的吞吐量;相反如果将 batchDuration 時間設定得過長,又會影響整個系統的吞吐量。

如何設定一個合理的批處理時間,需要根據應用本身、叢集資源情況,以及關注和監控 Spark Streaming 系統的運作情況來調整,重點關注監控界面中的 Total Delay,如圖1所示。

Spark Streaming 調優實踐

圖1 Spark UI 中全局延遲

2. 合理的 Kafka 拉取量(maxRatePerPartition 參數設定)

對于資料源是 Kafka 的 Spark Streaming 應用,在 Kafka 資料頻率過高的情況下,調整這個參數是非常必要的。我們可以改變 spark.streaming.kafka.maxRatePerPartition 參數的值來進行上限調整,預設是無上限的,即 Kafka 有多少資料,Spark Streaming 就會一次性全拉出,但是上節提到的批處理時間是一定的,不可能動态變化,如果持續資料頻率過高,同樣會造成資料堆積、阻塞的現象。

是以需要結合 batchDuration 設定的值,調整 spark.streaming.kafka.maxRatePerPatition 參數,注意該參數配置的是 Kafka 每個 partition 拉取的上限,資料總量還需乘以所有的 partition 數量,調整兩個參數 maxRatePerPartition 和 batchDuration 使得資料的拉取和處理能夠平衡,盡可能地增加整個系統的吞吐量,可以觀察監控界面中的 Input Rate 和 Processing Time,如圖2所示。

Spark Streaming 調優實踐

圖2 Spark UI 中輸入速率和平均處理時間

3. 緩存反複使用的 Dstream(RDD)

Spark 中的 RDD 和 SparkStreaming 中的 Dstream 如果被反複使用,最好利用 cache() 函數将該資料流緩存起來,防止過度地排程資源造成的網絡開銷。可以參考并觀察 Scheduling Delay 參數,如圖3所示。

Spark Streaming 調優實踐

圖3 SparkUI 中排程延遲

4. 其他一些優化政策

除了以上針對 Spark Streaming 和 Kafka 這個特殊場景方面的優化外,對于前面提到的一些正常優化,也可以通過下面幾點來完成。

  • 設定合理的 GC 方式:使用--conf "spark.executor.extraJavaOptions=-XX:+UseConc MarkSweepGC"來配置垃圾回收機制。
  • 設定合理的 parallelism:在 SparkStreaming+kafka 的使用中,我們采用了 Direct 連接配接方式,前面講過 Spark 中的 partition 和 Kafka 中的 Partition 是一一對應的,一般預設設定為 Kafka 中 Partition 的數量。
  • 設定合理的 CPU 資源數:CPU 的 core 數量,每個 Executor 可以占用一個或多個 core,觀察 CPU 使用率(Linux 指令 top)來了解計算資源的使用情況。例如,很常見的一種浪費是一個 Executor 占用了多個 core,但是總的 CPU 使用率卻不高(因為一個 Executor 并不會一直充分利用多核的能力),這個時候可以考慮讓單個 Executor 占用更少的 core,同時 Worker 下面增加更多的 Executor;或者從另一個角度,增加單個節點的 worker 數量,當然這需要修改 Spark 叢集的配置,進而增加 CPU 使用率。值得注意是,這裡的優化有一個平衡,Executor 的數量需要考慮其他計算資源的配置,Executor 的數量和每個 Executor 分到的記憶體大小成反比,如果每個 Executor 的記憶體過小,容易産生記憶體溢出(out of memory)的問題。
  • 高性能的算子:所謂高性能算子也要看具體的場景,通常建議使用 reduceByKey/aggregateByKey 來代替 groupByKey。而存在資料庫連接配接、資源加載建立等需求時,我們可以使用帶 partition 的操作,這樣在每一個分區進行一次操作即可,因為分區是實體同機器的,并不存在這些資源序列化的問題,進而大大減少了這部分操作的開銷。例如,可以用 mapPartitions、foreachPartitions 操作來代替 map、foreach 操作。另外在進行 coalesce 操作時,因為會進行重組分區操作,是以最好進行必要的資料過濾 filter 操作。
  • Kryo 優化序列化性能:我們隻要設定序列化類,再注冊要序列化的自定義類型即可(比如算子函數中使用到的外部變量類型、作為 RDD 泛型類型的自定義類型等)。

5. 結果

通過以上種種調整和優化,最終我們想要達到的目的便是,整個流式處理系統保持穩定,即 Spark Streaming 消費 Kafka 資料的速率趕上爬蟲向 Kafka 生産資料的速率,使得 Kafka 中的資料盡可能快地被處理掉,減少積壓,才能保證明時性,如圖4所示。

Spark Streaming 調優實踐

圖4 Spark Streaming 和 Kafka 穩定運作監控圖

當然不同的應用場景會有不同的圖形,這是本文詞頻統計優化穩定後的監控圖,我們可以看到在 Processing Time 柱形圖中有一條 Stable 的虛線,而大多數 Batch 都能夠在這一虛線下處理完畢,說明整體 Spark Streaming 是運作穩定的。

對于項目中具體的性能調優,有以下幾個點需要注意:

  • 一個 DStream 流隻關聯單一接收器,如果需要并行多個接收器來讀取資料,那麼需要建立多個 DStream 流。一個接收器至少需要運作在一個 Executor 上,甚至更多,我們需要保證在接收器槽占用了部分核後,還能有足夠的核來處理接收到的資料。例如在設定 spark.cores.max 時需要将接收器的占用考慮進來,同時注意在配置設定 Executor 給接收器時,采用的是輪循的方式(round robin fashion)。
  • 當接收器從資料源接收到資料時,會建立資料塊,在每個微秒級的資料塊間隔(blockInterval milliseconds)中都會有一個新的資料塊生成。在每個批處理間隔内(batchInterval)資料塊的數量 N=batchInterval/blockInterval。這些資料塊會由目前執行器(Executor)的資料塊管理器(BlockManager)分發到其他執行器的資料塊管理器。之後在 Driver 節點上運作的輸入網絡追蹤器(Network Input Tracker)會通知資料塊所在位置,以期進一步處理。
  • RDD 是基于 Driver 節點上每個批處理間隔産生的資料塊(blocks)而建立的,這些資料塊是 RDD 的分支(partitions),每個分支是 Spark 中的一個任務(task)。如果 blockInterval == batchInterval,那麼意味着建立了單一分支,并且可能直接在本地處理。
  • 資料塊上的映射(map)任務在執行器(一個接收塊,另一個複制塊)中處理,該執行器不考慮塊間隔,除非出現非本地排程。擁有更大的塊間隔(blockInterval)意味着更大的資料塊,如果将 spark.locality.wait 設定一個更大的值,那麼更有可能在本地節點處理資料塊。我們需要在兩個參數間(blockInterval和spark.locality.wait)做一個折中,確定越大的資料塊更可能在本地被處理。
  • 除了依賴于 batchInterval 和 blockInterval,我們可以直接通過 inputDstream. repartition(n) 來确定分支的數量。這個操作會重新打亂(reshuffles)RDD 中的資料,随機的配置設定給 n 個分支。當然打亂(shuffle)過程會造成一定的開銷,但是會有更高的并行度。RDD 的處理是由驅動程式的 jobscheduler 作為作業安排的。在給定的時間點上,隻有一個作業是活動的。是以,如果一個作業正在執行,那麼其他作業将排隊。
  • 如果我們有兩個 Dstreams,那麼将形成兩個 RDDs,并将建立兩個作業,每個作業(job)都被安排為一個接着一個地執行。為了避免這種情況,可以聯合兩個 Dstreams(union)。這将確定為 Dstreams 的兩個 RDD 形成單一的 unionRDD。而這個 unionRDD 會被視為一個作業,但是 RDDs 的分區不會受到影響。
  • 如果批處理時間大于 batchinterval,那麼很明顯,接收方的記憶體将逐漸被填滿,并最終抛出異常(很可能是 BlockNotFoundException)。目前沒有辦法暫停接收,那麼可以利用 SparkConf 配置項中的 spark.streaming.receiver.maxRate 來控制接收器的速率。

▌小 結

① Spark Streaming 中需要大量的序列化和反序列化操作,在2.0.0以上的 Spark 版本中,我們應當優先考慮使用 Kryo 序列化方式。

② 對于非常大的變量,如配置資訊,可以提前利用廣播變量的方式傳送給每一個節點。

③ 在流式處理系統中,我們需要兼顧資料的接收和資料處理,即消費資料的速率要趕上生産資料的速率。當發現生産資料速率過慢時,可以考慮增加并行度,使用更多的接收器(Receiver);如果處理速度過慢,可以考慮加機器、優化程式邏輯及 GC 優化等方式。

④ Spark 記憶體分為執行類記憶體和存儲類記憶體,執行類記憶體可以剝奪存儲類記憶體空間,但是存儲類記憶體空間有一個最低門檻值會保證保留。

⑤ 記憶體優化最簡單的方式是使用序列化格式進行對象存儲,另外一方面考慮到 Java/Scala 對象本身會有所開銷,應盡可能減少對象的數量。

⑥ 對于 Spark 而言,垃圾回收采用 G1GC,而 Spark Streaming 采用 CMS。

⑦ 調優過程是一個觀察,調整,再觀察,再調整的過程,針對具體問題需要進行不同政策上的調整,希望大家多多實踐。