天天看點

Spark性能相關參數配置詳解-任務排程篇 schedule排程相關

随着Spark的逐漸成熟完善, 越來越多的可配置參數被添加到Spark中來, 本文試圖通過闡述這其中部分參數的工作原理和配置思路, 和大家一起探讨一下如何根據實際場合對Spark進行配置優化。

由于篇幅較長,是以在這裡分篇組織,如果要看最新完整的網頁版内容,可以戳這裡:http://spark-config.readthedocs.org/,主要是便于更新内容

schedule排程相關

排程相關的參數設定,大多數内容都很直白,其實無須過多的額外解釋,不過基于這些參數的常用性(大概會是你針對自己的叢集第一步就會配置的參數),這裡多少就其内部機制做一些解釋。

spark.cores.max

一個叢集最重要的參數之一,當然就是CPU計算資源的數量。spark.cores.max這個參數決定了在Standalone和Mesos模式下,一個Spark應用程式所能申請的CPU Core的數量。如果你沒有并發跑多個Spark應用程式的需求,那麼可以不需要設定這個參數,預設會使用spark.deploy.defaultCores的值(而spark.deploy.defaultCores的值預設為Int.Max,也就是不限制的意思)進而應用程式可以使用所有目前可以獲得的CPU資源。

針對這個參數需要注意的是,這個參數對Yarn模式不起作用,YARN模式下,資源由Yarn統一排程管理,一個應用啟動時所申請的CPU資源的數量由另外兩個直接配置Executor的數量和每個Executor中core數量的參數決定。(曆史原因造成,不同運作模式下的一些啟動參數個人認為還有待進一步整合)

此外,在Standalone模式等背景配置設定CPU資源時,目前的實作中,在spark.cores.max允許的範圍内,基本上是優先從每個Worker中申請所能得到的最大數量的CPU core給每個Executor,是以如果人工限制了所申請的Max Core的數量小于Standalone和Mesos模式所管理的CPU數量,可能發生應用隻運作在叢集中部分節點上的情況(因為部分節點所能提供的最大CPU資源數量已經滿足應用的要求),而不是平均分布在叢集中。通常這不會是太大的問題,但是如果涉及資料本地性的場合,有可能就會帶來一定的必須進行遠端資料讀取的情況發生。理論上,這個問題可以通過兩種途徑解決:一是Standalone和Mesos的資源管理子產品自動根據節點資源情況,均勻配置設定和啟動Executor,二是和Yarn模式一樣,允許使用者指定和限制每個Executor的Core的數量。社群中有一個PR試圖走第二種途徑來解決類似的問題,不過截至我寫下這篇文檔為止(2014.8),還沒有被Merge。

spark.task.cpus

這個參數在字面上的意思就是配置設定給每個任務的CPU的數量,預設為1。實際上,這個參數并不能真的控制每個任務實際運作時所使用的CPU的數量,比如你可以通過在任務内部建立額外的工作線程來使用更多的CPU(至少目前為止,将來任務的執行環境是否能通過LXC等技術來控制還不好說)。它所發揮的作用,隻是在作業排程時,每配置設定出一個任務時,對已使用的CPU資源進行計數。也就是說隻是理論上用來統計資源的使用情況,便于安排排程。是以,如果你期望通過修改這個參數來加快任務的運作,那還是趕緊換個思路吧。這個參數的意義,個人覺得還是在你真的在任務内部自己通過任何手段,占用了更多的CPU資源時,讓排程行為更加準确的一個輔助手段。

spark.scheduler.mode

這個參數決定了單個Spark應用内部排程的時候使用FIFO模式還是Fair模式。是的,你沒有看錯,這個參數隻管理一個Spark應用内部的多個沒有依賴關系的Job作業的排程政策。

如果你需要的是多個Spark應用之間的排程政策,那麼在Standalone模式下,這取決于每個應用所申請和獲得的CPU資源的數量(暫時沒有獲得資源的應用就Pending在那裡了),基本上就是FIFO形式的,誰先申請和獲得資源,誰就占用資源直到完成。而在Yarn模式下,則多個Spark應用間的排程政策由Yarn自己的政策配置檔案所決定。

那麼這個内部的排程邏輯有什麼用呢?如果你的Spark應用是通過服務的形式,為多個使用者送出作業的話,那麼可以通過配置Fair模式相關參數來調整不同使用者作業的排程和資源配置設定優先級。

spark.locality.wait

spark.locality.wait和spark.locality.wait.process,spark.locality.wait.node,spark.locality.wait.rack這幾個參數影響了任務配置設定時的本地性政策的相關細節。

Spark中任務的處理需要考慮所涉及的資料的本地性的場合,基本就兩種,一是資料的來源是HadoopRDD;二是RDD的資料來源來自于RDD Cache(即由CacheManager從BlockManager中讀取,或者Streaming資料源RDD)。其它情況下,如果不涉及shuffle操作的RDD,不構成劃分Stage和Task的基準,不存在判斷Locality本地性的問題,而如果是ShuffleRDD,其本地性始終為No Prefer,是以其實也無所謂Locality。

在理想的情況下,任務當然是配置設定在可以從本地讀取資料的節點上時(同一個JVM内部或同一台實體機器内部)的運作時性能最佳。但是每個任務的執行速度無法準确估計,是以很難在事先獲得全局最優的執行政策,當Spark應用得到一個計算資源的時候,如果沒有可以滿足最佳本地性需求的任務可以運作時,是退而求其次,運作一個本地性條件稍差一點的任務呢,還是繼續等待下一個可用的計算資源已期望它能更好的比對任務的本地性呢?

這幾個參數一起決定了Spark任務排程在得到配置設定任務時,選擇暫時不配置設定任務,而是等待獲得滿足程序内部/節點内部/機架内部這樣的不同層次的本地性資源的最長等待時間。預設都是3000毫秒。

基本上,如果你的任務數量較大和單個任務運作時間比較長的情況下,單個任務是否在資料本地運作,代價差別可能比較顯著,如果資料本地性不理想,那麼調大這些參數對于性能優化可能會有一定的好處。反之如果等待的代價超過帶來的收益,那就不要考慮了。

特别值得注意的是:在處理應用剛啟動後送出的第一批任務時,由于當作業排程子產品開始工作時,處理任務的Executors可能還沒有完全注冊完畢,是以一部分的任務會被放置到No Prefer的隊列中,這部分任務的優先級僅次于資料本地性滿足Process級别的任務,進而被優先配置設定到非本地節點執行,如果的确沒有Executors在對應的節點上運作,或者的确是No Prefer的任務(如shuffleRDD),這樣做确實是比較優化的選擇,但是這裡的實際情況隻是這部分Executors還沒來得及注冊上而已。這種情況下,即使加大學節中這幾個參數的數值也沒有幫助。針對這個情況,有一些已經完成的和正在進行中的PR通過例如動态調整No Prefer隊列,監控節點注冊比例等等方式試圖來給出更加智能的解決方案。不過,你也可以根據自身叢集的啟動情況,通過在建立SparkContext之後,主動Sleep幾秒的方式來簡單的解決這個問題。

spark.speculation

spark.speculation以及spark.speculation.interval,spark.speculation.quantile, spark.speculation.multiplier等參數調整Speculation行為的具體細節,Speculation是在任務排程的時候,如果沒有适合目前本地性要求的任務可供運作,将跑得慢的任務在空閑計算資源上再度排程的行為,這些參數調整這些行為的頻率和判斷名額,預設是不使用Speculation的。

通常來說很難正确的判斷是否需要Speculation,能真正發揮Speculation用處的場合,往往是某些節點由于運作環境原因,比如CPU資源由于某種原因被占用,磁盤損壞導緻IO緩慢造成任務執行速度異常的情況,當然前提是你的分區任務不存在僅能被執行一次,或者不能同時執行多個拷貝等情況。Speculation任務參照的名額通常是其它任務的執行時間,而實際的任務可能由于分區資料尺寸不均勻,本來就會有時間差異,加上一定的排程和IO的随機性,是以如果一緻性名額定得過嚴,Speculation可能并不能真的發現問題,反而增加了不必要的任務開銷,定得過寬,大概又基本相當于沒用。

個人覺得,如果你的叢集規模比較大,運作環境複雜,的确可能經常發生執行異常,加上資料分區尺寸差異不大,為了程式運作時間的穩定性,那麼可以考慮仔細調整這些參數。否則還是考慮如何排除造成任務執行速度異常的因數比較靠鋪一些。

當然,我沒有實際在很大規模的叢集上運作過Spark,是以如果看法有些偏頗,還請有實際經驗的XD指正。

原文連結:http://spark-config.readthedocs.io/en/latest/

繼續閱讀