天天看點

spark性能優化 -- > spark工作原理

從本篇文章開始,将開啟 spark 學習和總結之旅,專門針對如何提高 spark 性能進行總結,力圖總結出一些幹貨。

無論你是從事算法工程師,還是資料分析又或是其他與資料相關工作,利用 spark 進行海量資料處理和模組化都是非常重要和必須掌握的一門技術,我感覺編寫 spark 代碼是比較簡單的,特别是利用 Spark SQL 下的 DataFrame 接口進行資料處理,隻要有 python 基礎都是非常容易入門的,但是在性能調優上,許多人都是一知半解,寫的 spark 程式經常陷入 OOM 或卡死狀态。這時深入了解 spark 原理就顯得非常有必要了。

本系列總結主要針對 Hadoop YARN 模式。

RDD(Resilient Distributed Datasets)

RDD 是 spark 中最基本的資料抽象,存儲在 exector 或 node 中,它代表一個 “惰性,”“靜态”,“不可變”,“分布式“的資料集合,RDD 基本介紹在網上上太多了,這裡就不做詳細介紹了,主要講下以下内容:

transform(轉換)與 action(執行)的差別

轉換操作:傳回的是一個新的 RDD,常見的如:map、filter、flatMap、groupByKey 等等 執行操作:傳回的是一個結果,一個數值或者是寫入操作等,如 reduce、collect、count、first 等等

惰性計算

spark 中計算 RDD 是惰性的,也即 RDD 真正被計算(執行操作,例如寫入存儲操作、collect 操作等)時,其轉換操作才會真正被執行。

spark性能優化 -- > spark工作原理

spark 為什麼采用惰性計算:

在 MapReduce 中,大量的開發人員浪費在最小化 MapReduce 通過次數上。通過将操作合并在一起來實作。在 Spark 中,我們不建立單個執行圖,而是将許多簡單的操作結合在一起。是以,它造成了 Hadoop MapReduce 與 Apache Spark 之間的差異。

惰性設計的好處:

① 提高可管理性 可以檢視整個 DAG(将對資料執行的所有轉換的圖形),并且可以使用該資訊來優化計算。

② 降低時間複雜度和加快計算速度 隻運算真正要計算的轉換操作,并且可以根據 DAG 圖,合并不需要與 drive 通信的操作(連續的依賴轉換),例如在一個 RDD 上同時調用 map 和 filter 轉換操作,spark 可以将 map 和 filter 指令發送到每個 executor 上,spark 程式在真正執行 map 和 filter 時,隻需通路一次 record,而不是發送兩組指令并兩次通路分區。理論上相對于非惰性,将時間複雜度降低了一半。例如:

val list1 = list.map(i -> i * 3)  // Transformation1
val list2 = list1.map(i -> i + 3) // Transformation1
val list3 = list1.map(i -> i / 3) // Transformation1
list3.collect()               // ACTION      

假設原始清單(list) 很大,其中包含數百萬個元素。如果沒有懶惰的評估,我們将完成三遍如此龐大的計算。如果我們假設一次這樣的清單疊代需要 10 秒,那麼整個評估就需要 30 秒。并且每個 RDD 都會緩存下來,浪費記憶體。使用惰性評估,Spark 可以将這三個轉換像這樣合并到一個轉換中,如下:

val list3 = list.map(i -> i + 1)      

它将隻執行一次該操作。隻需一次疊代即可完成,這意味着隻需要 10 秒的時間。

容錯性

RDD 本身包含其複制所需的所有依賴資訊,一旦該 RDD 中某個分區丢失了,該 RDD 有足夠需要重新計算的資訊,可以去并行的,很快的重新計算丢失的分區。

運作在記憶體

在 spark application 的生命周期中,RDD 始終常駐記憶體(在所在的節點記憶體),這也是其比 MapReduce 更快的重要原因。

spark 中提供了三種記憶體管理機制:

① in-memory as deserialized data 這種常駐記憶體方式速度快(因為去掉了序列化時間),但是記憶體利用效率低。

② in-memory as serialized data 該方法記憶體利用效率高,但是速度慢

③ 直接存在 disk 上 對于那些較大容量的 RDD,沒辦法直接存在記憶體中,需要寫入到 DISK 上。該方法僅适用于大容量 RDD。要持久化一個 RDD,隻要調用其 cache()或者 persist()方法即可。在該 RDD 第一次被計算出來時,就會直接緩存在每個節點中。而且 Spark 的持久化機制還是自動容錯的,如果持久化的 RDD 的任何 partition 丢失了,那麼 Spark 會自動通過其源 RDD,使用 transformation 操作重新計算該 partition。

cache()和 persist()的差別在于,cache()是 persist()的一種簡化方式,cache()的底層就是調用的 persist()的無參版本,同時就是調用 persist(MEMORY_ONLY),将資料持久化到記憶體中。如果需要從記憶體中清楚緩存,那麼可以使用 unpersist()方法。

我們來仔細分析下持久化和非持久化的差別:

非持久化:

spark性能優化 -- > spark工作原理

持久化:

spark性能優化 -- > spark工作原理

顯然對于要複用多次的 RDD,要将其進行持久化操作,此時 Spark 就會根據你的持久化政策,将 RDD 中的資料儲存到記憶體或者磁盤中。以後每次對這個 RDD 進行算子操作時,都會直接從記憶體或磁盤中提取持久化的 RDD 資料,然後執行算子,而不會從源頭處重新計算一遍這個 RDD,再執行算子操作。 是以在寫 spark 代碼時:盡可能複用同一個 RDD。

這裡常有個誤區:

val rdd1 = ... // 讀取hdfs資料,加載成RDD
rdd1.cache  // 持久化操作


val rdd2 = rdd1.map(...)
val rdd3 = rdd1.filter(...)


rdd1.unpersist // 釋放緩存


rdd2.take(10).foreach(println)
rdd3.take(10).foreach(println)      

如果按上述代碼進行持久化,則效果就如同沒有持久化一樣。原因就在于 spark 的 lazy 計算。

代碼應該如下:

val rdd1 = ... // 讀取hdfs資料,加載成RDD
rdd1.cache


val rdd2 = rdd1.map(...)
val rdd3 = rdd1.filter(...)


rdd2.take(10).foreach(println)
rdd3.take(10).foreach(println)


rdd1.unpersist      

rdd2 執行 take 時,會先緩存 rdd1,接下來直接 rdd3 執行 take 時,直接利用緩存的 rdd1,最後,釋放掉 rdd1。是以在何處釋放 RDD 也是非常需要細心的。 請在 action 之後 unpersisit!!!

Spark Job Scheduling

窄依賴 與 寬依賴

shuffle 過程,簡單來說,就是将分布在叢集中多個節點上的同一個 key,拉取到同一個節點上,進行聚合或 join 等操作。比如 reduceByKey、join 等算子,都會觸發 shuffle 操作。shuffle 操作需要将資料進行重新聚合和劃分,然後配置設定到叢集的各個節點上進行下一個 stage 操作,這裡會涉及叢集不同節點間的大量資料交換。由于不同節點間的資料通過網絡進行傳輸時需要先将資料寫入磁盤,是以叢集中每個節點均有大量的檔案讀寫操作,進而導緻 shuffle 操作十分耗時(相對于 map 操作)。

窄依賴:父 RDD 與 子 RDD 的分區是一對一(map 操作)或多對一(coalesce)的,不會有 shuffle 過程;并且子 RDD 的分區結果與其 key 和 value 值無關,每個分區與其他分區亦無關。

spark性能優化 -- > spark工作原理

上面左圖可對應 map 操作分區,右圖對應 coalesce 操作。寬依賴:父 RDD 與子 RDD 的分區是一對多的關系,并且是按一定方式進行重分區,會有 shuffle 過程産生,比較耗時,可能會引發 spark 性能問題。常見的寬依賴操作如:groupByKey、reduceByKey、sort、sortByKey 等等。

spark性能優化 -- > spark工作原理

注意:coalesce 操作如果是将 10 個分區換成 100 個分區,由少分區轉成大分區将會發生 shuffle 過程。coalesce 操作場景主要是 rdd 經過多層過濾後的小檔案合并。rdd 的 reparation 方法與 coalesce 相反,主要是為了 處理資料傾斜,增加 partiton 的數量使得每個 task 處理的資料量減少,肯定會有 shuffle 過程産生(repartition 其實調用的就是 coalesce,隻不過 shuffle = true (coalesce 中 shuffle: Boolean = false))。

Spark Application

一個 spark 應用主要由一系列的 spark Job 組成,而這些 spark Job 由 sparkContext 定義而來。當 SparkContent 啟動時,一個 driver 和一系列的 executor 會在叢集的工作節點上啟動。每個 executor 都有個 JVM 虛拟環境,一個 executor 不能跨越多個節點。

spark性能優化 -- > spark工作原理

上圖表示在一個分布式系統上啟動一個 spark application 的實體硬體層面流程。

  1. 啟動一個 SparkContext
  2. 驅動程式(driver program)會定義一個叢集管理(cluster manager)
  3. cluster manager 會在工作節點上啟動一些 executor,運作送出的代碼(注意:一個節點 node 上會有多個 executor,但是一個 executor 不能跨越多個 node)

需要注意以下兩點:

  • 一個節點 node 上會有多個 executor,但是一個 executor 不能跨越多個 node
  • 每個 executor 會有多個分區,但是一個分區不能跨越多個 executor

DAG(Directed Acyclic Graph)詳解

spark Application tree

spark性能優化 -- > spark工作原理

簡而言之:一個 spark Application 由多個 Job 組成,Job 由送出代碼中的 Action 操作定義,而一個 Action 操作由多個 Stage 組成,Stage 的分割由寬依賴進行分割的,而每個 Stage 又由多個 Task 組成。一個 Task 對應一個分區,一個 task 會被配置設定到一個 executor 上執行。

每個 Job 都對應一個 DAG 圖,每個 DAG 有一系列的 Stage 組成。

  • Job:每個 Job 對應一個 Action 操作,在 spark execution Graph 中,其邊是基于代碼中的 transform 操作的依賴關系定義的。
  • Stages:每個 Action 中可能包含一個或多個 transform 操作,其中寬依賴又将 Job 劃分成多個 Stage。因為 Stages 的邊緣需要和 driver 進行通信,故通常一個 Job 裡,必須順序的執行 Stages 而非并行。并且會将多個窄依賴步驟合并成一個步驟,因為其中沒有的轉換操作沒有 shuffle 過程,可以通過隻通路一次資料,連續執行多個 transform 操作,這也是上面提到的惰性計算的優點。
def simpleSparkProgram(rdd : RDD[Double]): Long ={
  //stage1
  rdd.filter(_< 1000.0)
  .map(x => (x, x) )
  //stage2
  .groupByKey()
  .map{ case(value, groups) => (groups.sum, value)}
  //stage 3
  .sortByKey()
  .count()
}      

其代碼中對應的 Stage 如下:

spark性能優化 -- &gt; spark工作原理
  • Task:task 是 spark 中最小最基本的執行單元,每個 task 代表一個局部的計算任務。在 executor 中可以有多個 core,而每個 core 可以對應一個 task,每個 task 針對一個分區。 每次針對不同的一塊分區,執行相同的代碼。

注意:

  • spark 中同時并行的 task 數量不能超過所有 executor core 數量。 其中 所有 executor cores 數量= 每個 executor 中 core 數量 * executor 數量。
  • task 的并行化是有 executor 數量 × core 數量決定的。task 過多,并行化過小,就會浪費時間;反之就會浪費資源。是以設定參數是一個需要權衡的過程,原則就是在已有的資源情況下,充分利用記憶體和并行化。

總結

對于 DAG 的深刻了解非常重要,如果了解不深刻則可能定位問題的效率不高。比如常見的資料傾斜。當了解了這些,如果出現了資料傾斜,可以分析 job,stage 和 task,找到部分 task 輸入的嚴重不平衡,最終定位是資料問題或計算邏輯問題。

參考

  • High Performance Spark
  • ​​https://www.quora.com/What-is-the-reason-behind-keeping-lazy-evaluation-in-Apache-Spark​​
  • ​​https://data-flair.training/blogs/apache-spark-lazy-evaluation/​​
  • ​​http://bourneli.github.io/scala/spark/2016/06/17/spark-unpersist-after-action.html​​

備注:公衆号菜單包含了整理了一本AI小抄,非常适合在通勤路上用學習。

繼續閱讀