天天看點

60TB 資料量的作業從 Hive 遷移到 Spark 在 Facebook 的實踐

Facebook 經常使用分析來進行資料驅動的決策。在過去的幾年裡,使用者和産品都得到了增長,使得我們分析引擎中單個查詢的資料量達到了數十TB。我們的一些批處理分析都是基于 Hive 平台(Apache Hive 是 Facebook 在2009年貢獻給社群的)和 Corona( Facebook 内部的 MapReduce 實作)進行的。Facebook 還針對包括 Hive 在内的多個内部資料存儲,繼續增加了其 Presto 的 ANSI-SQL 查詢的覆寫範圍。Facebook 内部還支援其他類型的分析,如圖計算、機器學習(Apache Giraph)和流處理(如 Puma、Swift 和 Stylus)。

盡管 Facebook 提供的服務涵蓋了分析領域的廣泛領域,但我們仍在不斷地與開源社群互動,以分享我們的經驗,并向他人學習。Apache Spark 于2009年由加州大學伯克利分校(UC-Berkeley)的 Matei Zaharia 創辦,并于2013年貢獻給 Apache。它是目前增長最快的資料處理平台之一,因為它能夠支援流處理、批處理、指令式(RDD)、聲明式(SQL)、圖計算和機器學習用例,所有這些都在相同的 API 和底層計算引擎中。Spark 可以有效地利用大量記憶體,跨整個管道(pipelines)優化代碼,并跨任務(tasks)重用 jvm 以獲得更好的性能。Facebook 認為 Spark 已經成熟到可以在許多批處理用例中與 Hive 進行比較的地步。在本文的後面部分,将介紹 Facebook 使用 Spark 替代 Hive 的經驗和教訓。

用例:為實體排序(entity ranking)做特性準備

實時實體排名在 Facebook 有着多種使用場景。對于一些線上服務平台,原始的特性值是使用 Hive 離線生成的,并将生成的資料加載到這些實時關聯查詢系統中。這些 Hive 作業是數年前開發的,占用了大量的計算資源,并且難以維護,因為這些作業被拆分成數百個 Hive 小作業。為了使得業務能夠使用到新的特征資料,并且讓系統變得可維護,我們開始着手将這些作業遷移到 Spark 中。

以前的 Hive 作業實作

基于 Hive 的作業由三個邏輯階段組成,每個階段對應數百個由 entity_id 分割的較小 Hive 作業,因為為每個階段運作較大的 Hive 作業不太可靠,并且受到每個作業的最大任務數限制。具體如下:

60TB 資料量的作業從 Hive 遷移到 Spark 在 Facebook 的實踐

以上三個邏輯階段可以概括如下:

  • 過濾掉非生産需要的特性和噪音;
  • 對每個(entity_id、target_id)對進行聚合;
  • 将表分為 N 個分片,并對每個切分通過自定義 UDF 生成一個用于線上查詢的自定義索引檔案。

基于 Hive 建構索引的作業大約需要運作三天。管理起來也很有挑戰性,因為這條管道包含數百個分片作業,是以很難進行監控。沒有簡單的方法來衡量作業的整體進度或計算 ETA。考慮到現有 Hive 作業的上述局限性,我們決定嘗試使用 Spark 來建構一個更快、更易于管理的作業。

Spark 實作

如果使用 Spark 全部替換上面的作業可能會很慢,并且很有挑戰性,需要大量的資源。是以我們首先将焦點投入在 Hive 作業中資源最密集的部分:第二階段。我們從50GB的壓縮輸入樣本開始,然後逐漸擴充到 300 GB、1 TB 和20 TB。在每次增加大小時,我們都解決了性能和穩定性問題,但是嘗試 20 TB 時我們發現了最大改進的地方。

在運作 20 TB 的輸入時,我們發現由于任務太多,生成了太多的輸出檔案(每個檔案的大小大約為100 MB)。在作業運作的10個小時中,有3個小時用于将檔案從 staging 目錄移動到 HDFS 中的最終目錄。最初,我們考慮了兩個方案:要麼改進 HDFS 中的批量重命名以支援我們的用例;要麼配置 Spark 以生成更少的輸出檔案(這一階段有大量的任務——70,000個)。經過認真思考,我們得到了第三種方案。由于我們在作業的第二步中生成的 tmp_table2 表是臨時的,并且隻用于存儲作業的中間輸出。最後,我們把上面 Hive 實作的三個階段的作業用一個 Spark 作業表示,該作業讀取 60 TB 的壓縮資料并執行 90 TB的 shuffle 和排序,最後的 Spark job 如下:

60TB 資料量的作業從 Hive 遷移到 Spark 在 Facebook 的實踐

我們如何擴充 Spark 來完成這項工作?

當然,在如此大的資料量上運作單個 Spark 作業在第一次嘗試甚至第十次嘗試時都不會起作用。據我們所知,這是生産環境中 shuffle 資料量最大的 Spark 作業(Databricks 的 PB 級排序是在合成資料上進行的)。我們對 Spark 核心和應用程式進行了大量的改進和優化,才使這項工作得以運作。這項工作的好處在于,其中許多改進都适用于 Spark 的其他大型工作負載,并且我們能夠将所有工作重新貢獻給開源 Apache Spark 項目 - 有關更多詳細資訊,請參見下面相關的 JIRA。下面我們将重點介紹将一個實體排名作業部署到生産環境的主要改進。

可靠性修複(Reliability fixes)

處理節點頻繁重新開機

為了可靠地執行長時間運作的作業,我們希望系統能夠容錯并從故障中恢複(主要是由于正常維護或軟體錯誤導緻的機器重新啟動)。雖然 Spark 最初的設計可以容忍機器重動,但我們還是發現了各種各樣的 bug/問題,我們需要在系統正式投入生産之前解決這些問題。

  • 使得 PipedRDD 容忍節點重新開機(SPARK-13793):PipedRDD 之前在處理節點重新開機設計不夠健壯,當它擷取資料失敗時,這個作業就會失敗。我們重新設計了 PipedRDD,使得它能夠友好的處理這種異常,并且從這種類型的異常中恢複。
  • 最大的擷取失敗次數可配置( SPARK-13369 ):對于長期運作的作業而言,由于計算機重動而導緻擷取失敗的可能性大大增加。 在 Spark 中每個階段允許的最大擷取失敗次數是寫死的,是以,當達到最大失敗次數時,作業通常會失敗。我們做了一個更改,使其變得可配置,并将這個參數的值從 4 增加到 20,使得作業對于 fetch 失敗更加健壯。
  • Less disruptive cluster restart:長時間運作的作業應該能夠在叢集重新開機後繼續運作,這樣我們就不會浪費到目前為止完成的所有處理。Spark 的可重新開機 shuffle service 讓我們在節點重新開機後保留 shuffle 檔案。最重要的是,我們在 Spark driver 中實作了能夠暫停任務排程的功能,這樣作業就不會因為叢集重新開機而導緻任務失敗。

其他可靠性修複

  • Unresponsive driver(SPARK-13279):Spark driver 添加任務會進行一項時間複雜度為 O(N2) 的操作,這可能會導緻其被卡住,最終導緻作業被 killed。我們删除這個不必要的 O(N2) 操作來解決這個問題。
  • Excessive driver speculation:我們發現,Spark driver 在管理大量任務時,會花費了大量時間進行推測(speculation)。在短期内,在運作這個作業時我們禁止了 speculation。我們目前正在對 Spark Driver 進行修改,以減少 speculation 的時間。
  • TimSort issue due to integer overflow for large buffer( SPARK-13850 ):我們發現 Spark 的 unsafe 記憶體操作有一個 bug,這會導緻 TimSort 中的記憶體出現問題。不過 Databricks 的從業人員已經修複了這個問題,使我們能夠在大型記憶體緩沖區上進行操作。
  • Tune the shuffle service to handle large number of connections:在 shuffle 階段,我們看到許多 executors 在試圖連接配接 shuffle service 時逾時。通過增加 Netty 服務線程(spark.shuffle.io.serverThreads)和 backlog (spark.shuffle.io.backLog)的數量解決了這個問題。
  • Fix Spark executor OOM( SPARK-13958 ):一開始在每個節點上運作四個以上的 reduce 任務是很有挑戰性的。Spark executors 的記憶體不足,因為 sorter 中存在一個 bug,該 bug 會導緻指針數組無限增長。我們通過在指針數組沒有更多可用記憶體時強制将資料溢寫到磁盤來修複這個問題。是以,現在我們可以在一個節點上運作 24個任務而不會導緻記憶體不足。

性能提升

在實作了上述可靠性改進之後,我們能夠可靠地運作 Spark 作業。此時,我們将工作重心轉移到與性能相關的問題上,以最大限度地利用 Spark。我們使用Spark 的名額和 profilers 來發現一些性能瓶頸。

我們用來發現性能瓶頸的工具

  • Spark UI Metrics:Spark UI 可以很好地洞察特定階段的時間花在哪裡。每個任務的執行時間被劃分為子階段,以便更容易地找到作業中的瓶頸。
  • Jstack:Spark UI 中還提供 executor 程序的 jstack 功能,這個可以幫助我們找到代碼中的熱點問題。
  • Spark Linux Perf/Flame Graph support:盡管上面的兩個工具非常友善,但它們并沒有提供同時運作在數百台機器上作業的 CPU 概要的聚合視圖。在每個作業的基礎上,我們增加了對性能分析的支援,并且可以定制采樣的持續時間/頻率。

性能優化

  • Fix memory leak in the sorter(SPARK-14363)性能提升 30%:我們發現當任務釋放所有記憶體頁,但指針數組沒有被釋放。結果,大量記憶體未被使用,導緻頻繁溢出和 executor OOMs。現在,我們修複了這個問題,這個功能使得 CPU 性能提高了30%;
  • Snappy optimization ( SPARK-14277 )性能提升 10%:對于每一行的讀/寫,都會調用 JNI 方法(Snappy.ArrayCopy)。我們發現了這個問題,并且将這個調用修改成非 JNI 的System.ArrayCopy調用,修改完之後 CPU 性能提高了10%;
  • Reduce shuffle write latency(SPARK-5581)性能提升近 50%:在 map 端,當将 shuffle 資料寫入磁盤時,map 任務的每個分區打開和關閉相同的檔案。我們修複了這個問題,以避免不必要的打開/關閉,修改完之後 CPU 性能提高近 50%;
  • Fix duplicate task run issue due to fetch failure (SPARK-14649):當擷取失敗(fetch failure)發生時,Spark driver 會重新送出已經運作的任務,這會導緻性能低下。我們通過避免重新運作正在運作的任務修複了這個問題,并且我們發現當發生擷取操作失敗時,作業也更加穩定。
  • Configurable buffer size for PipedRDD(SPARK-14542)性能提升近 10%:在使用 PipedRDD 時,我們發現用于将資料從排序器(sorter)傳輸到管道處理的預設緩沖區大小太小,我們的作業花費了超過 10% 的時間來複制資料。我們使這個緩沖區大小變得可配置,以避免這個瓶頸。
  • Cache index files for shuffle fetch speed-up(SPARK-15074):我們發現,shuffle service 經常成為瓶頸,reduce 端花費 10% 到 15% 的時間來等待擷取 map 端的資料。通過更深入的研究這個問題,我們發現 shuffle service 為每次 shuffle fetch 都需要打開/關閉 shuffle index 檔案。我們通過緩存索引資訊,這樣我們就可以避免重複打開/關閉檔案,這一變化減少了50%的 shuffle fetch 時間;
  • Reduce update frequency of shuffle bytes written metrics(SPARK-15569)性能提升近 20%:使用 Spark Linux Perf 內建,我們發現大約 20% 的 CPU 時間花在探測和更新随機位元組寫的名額上。
  • Configurable initial buffer size for Sorter( SPARK-15958 )性能提升近 5%:Sorter 的預設初始緩沖區大小太小(4 KB),對于大的工作負載來說這個值太小了,是以我們浪費了大量的時間來複制内容。我們将這個緩沖區大小變得可配置(備注:spark.shuffle.sort.initialBufferSize), 當将這個參數設定為 64 MB 時,可以避免大量的資料複制,使得性能提升近 5%;
  • Configuring number of tasks:由于我們輸入的資料大小為 60 T,每個 HDFS 塊大小為 256 M,是以我們要生成超過250,000個任務。盡管我們能夠運作具有如此多任務的 Spark 作業,但我們發現,當任務數量過高時,性能會顯著下降。我們引入了一個配置參數,使 map 輸入大小可配置,我們通過将輸入的 split 大小設定為 2 GB ,使得 task 的資料減少了八倍。

在所有這些可靠性和性能改進之後,我們的實體排名系統變成了一個更快、更易于管理的管道,并且我們提供了在 Spark 中運作其他類似作業的能力。

使用 Spark 和 Hive 運作上面實體排名程式性能比較

我們使用以下性能名額來比較 Spark 和 Hive 運作性能。

CPU time:這是從作業系統的角度來看 CPU 使用情況。例如,如果您的作業在32核機器上僅運作一個程序,使用所有 CPU 的50%持續10秒,那麼您的 CPU 時間将是 32 0.5 10 = 160 CPU 秒。

60TB 資料量的作業從 Hive 遷移到 Spark 在 Facebook 的實踐

CPU reservation time:從資源管理架構的角度來看,這是 CPU 預留(CPU reservation)。例如,如果我們将32核機器預留10秒來運作這個作業,那麼 CPU 預留時間是 32 * 10 = 320 CPU秒。CPU 時間與 CPU 預留時間的比率反映了我們叢集預留 CPU 資源的情況。準确地說,當運作相同的工作負載時,與 CPU 時間相比,預留時間可以更好地比較執行引擎。例如,如果一個程序需要1個 CPU 秒來運作,但是必須保留100個 CPU 秒,那麼根據這個名額,它的效率低于需要10個 CPU 秒但隻預留10個 CPU 秒來做相同數量的工作的程序。我們還計算了記憶體預留時間,但這裡沒有列出來,因為這些數字與 CPU 預留時間類似,而且使用 Spark 和 Hive 運作這個程式時都沒有在記憶體中緩存資料。Spark 有能力在記憶體中緩存資料,但由于叢集記憶體的限制,我們并沒有使用這個功能。

60TB 資料量的作業從 Hive 遷移到 Spark 在 Facebook 的實踐

Latency:作業從開始到結束運作時間。

60TB 資料量的作業從 Hive 遷移到 Spark 在 Facebook 的實踐

結論和未來工作

Facebook 使用高性能和可擴充的分析引擎來幫助産品開發。Apache Spark 提供了将各種分析用例統一到單個 API ,并且提供了高效的計算引擎。我們将分解成數百個 Hive 作業管道替換為一個 Spark 作業,通過一系列的性能和可靠性改進,我們能夠使用 Spark 來處理生産中的實體資料排序的用例。在這個特殊的用例中,我們展示了 Spark 可以可靠地 shuffle 并排序 90 TB 以上的中間資料,并在一個作業中運作 250,000個 tasks。與舊的基于 Hive 計算引擎管道相比,基于 Spark 的管道産生了顯著的性能改進(4.5-6倍 CPU性能提升、節省了 3-4 倍資源的使用,并降低了大約5倍的延遲),并且已經在生産環境中運作了幾個月。

**本文轉載自微信公共帳号:iteblog_hadoop,

作者:過往記憶大資料**

對開源大資料和感興趣的同學可以加小編微信(圖一二維碼,備注進群)進入技術交流微信群。也可釘釘掃碼加入社群的釘釘群

60TB 資料量的作業從 Hive 遷移到 Spark 在 Facebook 的實踐

阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區數個Spark技術同學每日線上答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

60TB 資料量的作業從 Hive 遷移到 Spark 在 Facebook 的實踐