HADOOP叢集各元件性能調優
-
- 配置原則
-
- 如何發揮叢集最佳性能
-
- 原則1:CPU核數配置設定原則
- 原則2:記憶體配置設定
- 原則3:虛拟CPU個數配置設定
- 原則4:提高磁盤IO吞吐率
- 影響性能的因素
-
-
- 因素1:檔案伺服器磁盤I/O
- 因素2:檔案伺服器網絡帶寬
- 因素3:叢集節點硬體配置
- 因素4:SFTP參數配置
- 因素5:叢集參數配置
- 因素6:Linux檔案預讀值
- 因素7:Jsch版本選擇
-
- HBase
-
- 提升 BulkLoad 效率
-
- 操作場景
- 前提條件
- 操作步驟
- 提升索引 BulkLoad 效率
-
- 操作場景
- 預置條件
- 提升資料實時入庫的效率
-
- 操作場景
- 前提條件
- HDFS
-
- 提升讀寫性能
-
- 操作場景
- 提升寫性能
-
- 操作場景
- MapReduce
-
- 多 CPU 核心下的調優配置
-
- 操作場景
- CPU 使用率過高時的調優配置
- 确定 Job 基線
- Shuffle 調優
-
- 操作場景
- 操作步驟
- 大任務的 AM 調優
-
- 操作場景
- 操作步驟
- 推測執行
-
- 操作場景
- 通過“Slow Start”調優
-
- 操作場景
- 通過 Merge/Sort 流程提升 MR 性能
-
- 操作場景
- 操作步驟
- Spark
-
- 資料序列化
-
- 操作場景
- 配置記憶體
-
- 操作場景
- 操作步驟
- 設定并行度
-
- 操作場景
- 操作步驟
- 使用廣播變量
-
- 操作場景
- 使任務變得很大。
- 操作步驟
- Spark SQL join 優化
-
- 操作場景
- 使用 Netty 提升傳輸效率
-
- 操作場景
- 使用 SortShuffle 提升 shuffle 性能
-
- 操作場景
- 使用 External Shuffle Service 提升性能
-
- 操作場景
- Yarn 模式下動态資源排程
-
- 操作場景
- 操作步驟
- 性能調優示例:廣播變量
-
- 操作場景
- 經驗總結
-
- 使用 mapPartitions,按每個分區計算結果
- 使用 coalesce 調整分片的數量
- localDir 配置
- Collect 小資料
- 使用 reduceByKey
- 廣播 map 代替數組
- 資料傾斜
- 優化資料結構
- Hive
-
- 建立表分區
-
- 操作場景
- 操作步驟
- Join 優化
-
- 操作場景
- Map join
- Sort Merge Bucket Map Join
- 注意事項
-
- Join資料傾斜問題
- Group By 優化
-
- 操作場景
- 操作步驟
- 注意事項
-
- Group By資料傾斜
- Count Distinct聚合問題
配置原則
如何發揮叢集最佳性能
原則1:CPU核數配置設定原則
資料節點:建議預留2~4個核給OS和其他程序(資料庫,HBase等)外,其他的核分
配給YARN。
控制節點:由于運作的程序較多,建議預留6~8個核。
原則2:記憶體配置設定
除了配置設定給OS、其他服務的記憶體外,剩餘的資源應盡量配置設定給YARN。
原則3:虛拟CPU個數配置設定
節點上YARN可使用的虛拟CPU個數建議配置為實體核數的1~2倍之間。如果上層計算應用對CPU的計算能力要求不高,可以配置為2倍的實體CPU
原則4:提高磁盤IO吞吐率
盡可能挂載較多的盤,以提高磁盤IO吞吐
影響性能的因素
因素1:檔案伺服器磁盤I/O
一般磁盤順序讀寫的速度為百兆級别,如第二代SATA盤順序讀的理論速度為300Mps,
隻從一個盤裡讀,若想達到1Gps每秒的導入速度是不可能的。并且若從一個磁盤讀,
單純依靠增加map數來提高導入速率也不一定可以。因為随着map數變多,對于一個磁
盤裡的檔案讀,相當由順序讀變成了随機讀,map數越多,磁盤讀取檔案的随機性越
強,讀取性能反而越差。如随機讀最差可變成800Kps。 是以需要想辦法增大檔案服務
器的磁盤IO讀效率,可以使用專業的檔案伺服器,如NAS系統,或者使用更簡單的方
法,把多個磁盤進行Raid0或者Raid5。
因素2:檔案伺服器網絡帶寬
單個檔案伺服器的網絡帶寬越大越好,建議在10000Mb/s以上。
因素3:叢集節點硬體配置
叢集節點硬體配置越高,如CPU核數和記憶體都很多,可以增大同時運作的map或reduce
個數,如果單個節點硬體配置難以提升,可以增加叢集節點數。
因素4:SFTP參數配置
不使用壓縮、加密算法優先選擇aes128-cbc,完整性校驗算法優先選擇
[email protected]
因素5:叢集參數配置
因素6:Linux檔案預讀值
設定磁盤檔案預讀值大小為16384,使用linux指令:
echo 16384 > /sys/block/sda/queue/read_ahead_kb
說明:sda表示目前磁盤的磁盤名。
因素7:Jsch版本選擇
選擇最新版本jsch jar包,即jsch-0.1.51.jar,性能可提升20%。
HBase
提升 BulkLoad 效率
操作場景
批量加載功能采用了MapReduce作業直接生成符合HBase内部資料格式的檔案,然後把
生成的StoreFiles檔案加載到正在運作的群集。使用批量加載會比直接使用HBase的API
節約更多的CPU和網絡資源。
ImportTSV是一個HBase的索引表資料加載工具。
前提條件
在執行批量加載時需要指定檔案的輸出路徑:Dimporttsv.bulk.output。
操作步驟
參數入口:執行批量加載任務時,在BulkLoad指令行中加入如下參數。
增強 Bulk Load 效率的配置項
- -Dimporttsv.mapper.class
- 新的自定義mapper通過把鍵值對的構造從mapper移動到reducer以幫助提高性能。mapper隻需要把每一行的原始文本發送給reducer,reducer解析每一行的每一條記錄并建立鍵值對。 預設值:org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper
提升索引 BulkLoad 效率
操作場景
索引批量加載功能采用了MapReduce作業直接生成符合HBase内部資料格式的檔案,然
後把生成的資料檔案加載到正在運作的群集。使用批量加載會比直接使用HBase的API
節約更多的CPU和網絡資源。
預置條件
l 在執行批量加載時需要指定檔案的輸出路徑:Dimporttsv.bulk.output。
l 使用者表需要為建立了索引的表。
增強 Bulk Load 效率的配置項
- -Dimporttsv.mapper.class
- 新的自定義mapper通過把鍵值隊的構造從mapper移動到reducer以幫助提高性能。mapper隻需要把每一行的原始文本發送給reducer,reducer解析每一行的每一條記錄并建立鍵值對。 預設值:org.apache.hadoop.hbase.index.mapreduce.IndexTsvImporterTextMapper
提升資料實時入庫的效率
操作場景
需要把資料實時儲存到HBase中。
前提條件
調用HBase的put或delete接口,把資料儲存到HBase中。
影響資料實時入庫性能的參數
- hbase.regionserver.wal.durable.sync
-
控制HLog檔案在寫入到HDFS時的同步程度。如果為true,HDFS在把資料寫入
到硬碟後才傳回;如果為false,HDFS在把資料寫入OS的緩存後就傳回。把該值設定為false比true在寫入性能上會更優。預設值:true
hbase.regionserver.hfile.durable.sync -
控制HFile檔案在寫入到HDFS時的同步程度。如果為true,HDFS在把資料寫入
到硬碟後才傳回;如果為false,HDFS在把資料寫入OS的緩存後就傳回。
把該值設定為false比true在寫入性能上會更優。預設值:true
HDFS
提升讀寫性能
操作場景
在HDFS中,通過調整屬性的值,使得HDFS叢集更适應自身的業務情況,進而提升HDFS的讀寫性能。
HDFS 讀寫性能優化配置
- dfs.blocksize
- 表示建立檔案的預設塊大小。機關:位元組。指定大小或者指定具體位元組大小(例如134217728,表示128MB)。最優值:268435456(256MB)說明:參數值必須為512的倍數,否則向HDFS寫入檔案時會出現錯誤。預設值:134217728(128MB)範圍:512-1073741824
提升寫性能
操作場景
在HDFS中,通過調整屬性的值,使得HDFS叢集更适應自身的業務情況,進而提升HDFS的寫性能。
- dfs.datanode.drop.cache.behind.reads
- 設定為true表示丢棄緩存的資料(需要在DataNode中配置)。預設值:true
MapReduce
多 CPU 核心下的調優配置
操作場景
當CPU核心數很多時,如CPU核心為磁盤數的3倍時的調優配置。
- 說明
- HDFS用戶端配置檔案路徑:用戶端安裝目錄/hadoop/etc/hadoop/hdfs-site.xml。
- Yarn用戶端配置檔案路徑:用戶端安裝目錄/hadoop/etc/hadoop/yarn-site.xml。
- MapReduce用戶端配置檔案路徑:用戶端安裝目錄/hadoop/etc/hadoop/mapred-site.xml。 節點容器槽位數
- 如下配置組合決定了每節點任務(map、reduce)的并發數。
-
-
: 預設值: 8192yarn.nodemanager.resource.memory-mb
-
-
-
: 預設值: 4096mapreduce.map.memory.mb
-
-
-
: 預設值: 4096mapreduce.reduce.memory.mb
-
- 影響:如果所有的任務(map/reduce)需要讀寫資料至磁盤,多個程序将會同時通路一個磁盤。這将會導緻磁盤的IO性能非常的低下。為了改善磁盤的性能,請確定用戶端并發通路磁盤的數不大于3。
-
- 容器數應該為[ 2.5 * Hadoop中磁盤配置數 ]。
-
- 容器槽位的數量應少于可使用CPU數的70%,除去節點中運作的作業系統以及其它程序如Datanode,節點管理器需要更多的CPU來處理高負載量的Map任務。
Map輸出與壓縮 - Map任務所産生的輸出可以在寫入磁盤之前被壓縮,這樣可以節約磁盤空間并得到更快的寫盤速度,同時可以減少至Reducer的資料傳輸量。需要在用戶端進行配置
-
-
mapreduce.map.output.compress 指定了Map任務輸出結果可以在網絡傳輸前被壓縮。這是一個per-job的配置。
-
-
-
mapreduce.map.output.compress.codec 指定用于壓縮的編解碼器. 預設值:org.apache.hadoop.io.compress.SnappyCodec
-
-
在這種情況下,磁盤的IO是主要瓶頸。是以可以選擇一種壓縮率非常高的壓縮算法。
編解碼器可配置為LZO和Snappy中的任意一種,Benchmark測試結果顯示LZO和Snappy是非常平衡以及高效的編碼器。
Spills -
-
mapreduce.map.sort.spill.percent 預設值:0.8
-
- 在這種情況下,磁盤I/O将成為主要的瓶頸。是以io.sort.mb的配置如溢出将會被最小化。io.sort.mb應該配置成使溢出不再産生。例如HDFS的塊大小設定成256MB,而每條記錄隻有100位元組,設定io.sort.mb為316MB,io.sort.spill.percent為0.99(99%填補門檻值)來完成消除Map方的溢出。 資料包大小
- 當HDFS用戶端寫資料至資料節點時,資料會被累積,直到形成一個包。然後這個資料包會通過網絡傳輸。dfs.client-write-packet-size配置項可以指定該資料包的大小。這個可以通過每個job進行指定。
-
-
dfs.client-write-packet-size 預設值:262144
-
- 資料節點從HDFS用戶端接收資料包,然後将資料包裡的資料單線程寫入磁盤。當磁盤處于并發狀态并且資料包的大小可以增加,磁盤的尋道時間以及IO性能将會增加。
CPU 使用率過高時的調優配置
- 操作場景
- 在叢集整體CPU使用率過高的時候,可以通過調整dfs.client.read.shortcircuit”參數配置,使其更合理利用叢集資源,來達到降低CPU使用率。
-
dfs.client.read.shortcircuit 最短路徑讀取。當client和DataNode在一起時,資料以碼流的方式從DataNode節點傳到用戶端。這種方式比較低效,并且導緻大量的上下文切換。相反的,用戶端可以選擇最短路徑讀取,tr直接從磁盤讀取資料。如果将本參數設定為“true”,則可以在“dfs.block.local-path-access.user”參數中指定對應的角色。
-
确定 Job 基線
- 操作場景
- 确定Job基線是調優的基礎,一切調優項效果的檢查,都是通過和基線資料做對比來獲得。 Job基線的确定有如下三個原則:
-
- 叢集的資源要吃滿
-
- reduce階段盡量放在一輪
-
- 每個task的執行時間要合理操作步驟
原則一:叢集的資源要吃滿。
Job運作時,會讓所有的節點都有任務處理,且處于繁忙狀态,這樣才能保證資源充分利用,任務的并發度達到最大。可以通過調整處理的資料量大小,以及調整map和reduce個數來實作。Reduce個數的控制使用mapreduce.job.reduces.Map數取決于使用了哪種InputFormat,以及檔案是否可分割。預設的nTextFileInputFormat将根據block的個數來配置設定map數(一個block一個map)。通過如下配置參數進行調整。
-
mapreduce.input.fileinputformat.split.maxsize 可以設定資料分片的資料最大值。由使用者定義的分片大小的設定及每個檔案block大小的設定,可以計算得分片的大小。計算公式如下:splitSize = Math.max(minSize, Math.min(maxSize, blockSize)) 如果maxSize設定大于blockSize,那麼每個block就是一個分片,否則就會将一個block檔案分隔為多個分
片,如果block中剩下的一小段資料量小于splitSize,還是認為它是獨立的分片。
-
mapreduce.input.fileinputformat.split.minsize 可以設定資料分片的資料最小值。 預設:0
原則二:reduce階段盡量放在一輪。
存在一個場景,大部分的reduce在第一輪跑完後,剩下唯一一個reduce繼續跑。這種情況下,這個reduce的執行時間将極大影響這個job的運作時間。是以需要将reduce個數減少。
另一種場景,所有的map跑完後,隻有個别節點有reduce在跑。這時候叢集資源沒有得到充分利用,需要增加reduce的個數以便每個節點都有任務處理。
原則三:每個task的執行時間要合理。
如果一個job,每個map或reduce的執行時間隻有幾秒鐘,就意味着這個job的大部分時間都消耗在task的排程和程序啟停上了,是以需要增加每個task處理的資料大小。一般一個task處理時間在1分鐘左右比較合适。控制單個task處理任務的大小可通過如下配置來調整。
-
mapreduce.input.fileinputformat.split.maxsize 可以設定資料分片的資料最大值。由使用者定義的分片大小的設定及每個檔案block大小的設定,可以計算得分片的大小。計算公式如下:splitSize = Math.max(minSize, Math.min(maxSize, blockSize)) 如果maxSize設定大于blockSize,那麼每個block就是一個分片,否則就會将一個block檔案分隔為多個分片,如果block中剩下的一小段資料量小于splitSize,還是認為它是獨立的分片。
-
mapreduce.input.fileinputformat.split.minsize 可以設定資料分片的資料最小值。 預設:0
Shuffle 調優
操作場景
Shuffle階段是MapReduce性能的關鍵部分,包括了從Map task将中間資料寫到磁盤一直到Reduce task拷貝資料并最終放到reduce函數的全部過程。這一塊adoop提供了大量的調優參數。
操作步驟
-
Map階段的調優
判斷Map使用的記憶體大小判斷Map配置設定的記憶體是否足夠,一個簡單的辦法是檢視運作完成的job的Counters中,對應的task是否發生過多次GC,以及GC時間占總task運作時間之比。通常,GC時間不應超過task運作時間的10%,即GC time elapsed(ms)/CPU time spent (ms)<10%。
當然,Map需要的記憶體還需要随着Map buffer的調大而對應調整。主要通過如
下參數進行調整。
-
mapreduce.map.memory.mb 設定程序占用的總記憶體。 預設:4096
- 使用Combinner
-
在Map階段,有一個可選過程,将同一個key值的中介結果合并,叫做
combinner。一般将reduce類設定為combinner即可。通過combine,一般情況下
可以顯著減少Map的輸出中間結果,減少寫磁盤的内容。
- Copy階段的調優
- 資料是否壓縮
-
對Map的中間結果進行壓縮,當資料量大時,會顯著減少網絡傳輸的資料量,但是也因為多了壓縮和解壓,帶來了更多的CPU消耗。是以需要做好權衡。當任務屬于網絡瓶頸類型時,壓縮Map中間結果效果明顯。之前調優的bulkload,壓縮中間結果後性能提升60%左右。壓縮算法建議選擇FusionInsight附帶的snappy算法,擁有較好的壓縮速率和壓縮比。
配置方法:将“mapreduce.map.output.compress”參數值設定為“true”,将
“mapreduce.map.output.compress.codec”參數值設定為
“org.apache.hadoop.io.compress.SnappyCodec”。
-
Merge階段的調優
通過調整如下參數減少reduce寫磁盤的次數。
-
mapreduce.reduce.merge.inmem.threshold 允許多少個檔案同時存在reduce記憶體裡。當達到這個門檻值時,reduce就會觸發mergeAndSpill,将資料寫到硬碟上。預設:1000
-
mapreduce.reduce.shuffle.merge.percent 當reduce中存放map中間結果的buffer使用達到多少百分比時,會觸發merge操作。預設:0.66
-
mapreduce.reduce.shuffle.input.buffer.percent 允許Map中間結果占用reduce堆大小的百分比。預設:0.70
-
mapreduce.reduce.input.buffer.percent 當開始執行reduce函數時,允許map檔案占reduce堆大小的百分比。當map檔案比較小時,可以将這個值設定成1.0,這樣可以避免reduce将拷貝過來的map中間結果寫磁盤。預設:0
大任務的 AM 調優
操作場景
任務場景:運作的一個大任務,map總數達到了10萬的規模,但是一直沒有跑成功。經過查詢,發現是AM反映緩慢,最終逾時失敗。
此任務的問題是,task數量變多時,AM管理的對象也線性增長,是以就需要更多的記憶體來管理。AM預設配置設定的記憶體堆大小是1GB。
操作步驟
通過調大如下的參數來進行AM調優。
-
yarn.app.mapreduce.am.resource.mb 該參數值必須大于下面參數的堆大小。預設1.5GB
-
yarn.app.mapreduce.am.command-opts 傳遞到 MapReduce ApplicationMaster的 Java 指令行參數。 預設:-Xmx1024m -XX:CMSFullGCsBeforeCompaction=1 -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -verbose:gc -Xloggc:/tmp/@[email protected] -Djava.security.krb5.conf=#{conf_dir:KerberosClient}/kdc.conf -Djava.security.auth.login.config=#{conf_dir}/jaas.conf -Dzookeeper.server.principal=zookeeper/hadoop
推測執行
操作場景
當叢集規模很大時(如幾百上千台節點的叢集),個别機器出現軟硬體故障的機率就變大了,并且會是以延長整個任務的執行時間(跑完的任務都在等出問題的機器跑結束)。推測執行通過将一個task分給多台機器跑,取先運作完的那個,會很好的解決這個問題。對于小叢集,可以将這個功能關閉。
-
mapreduce.map.speculative 是否開啟map的推測執行。true表示開啟。預設false
-
mapreduce.reduce.speculative 是否開啟reduce的推測執行。true表示開啟。預設false
通過“Slow Start”調優
操作場景
MapReduce的AM在申請資源的時候,會一次性申請所有的Map資源,延後申請reduce的資源,這樣就能達到先執行完大部分Map再執行Reduce的目的。在有些場景下,需要確定Map執行完,再執行Reduce,或者提前執行Reduce。
-
mapreduce.job.reduce.slowstart.completedmaps 當多少占比的Map執行完後開始執行reduce。預設100%的Map跑完後開始起reduce。預設1
通過 Merge/Sort 流程提升 MR 性能
操作場景
您可以通過調整Merge/Sort流程來提升MR任務的執行性能。
操作步驟
參數入口:在用戶端的“${HADOOP_HOME}/etc/hadoop”目錄下,編輯hdfs-site.xml檔案,增加參數配置。
-
mapreduce.job.reduce.shuffle.consumer.plugin.class org.apache.hadoop.mapreduce.task.reduce.ShuffleWithoutBarriers
Spark
資料序列化
操作場景
- Spark支援兩種方式的序列化 :
-
- Java原生序列化JavaSerializer
-
- Kryo序列化KryoSerializer
序列化對于Spark應用的性能來說,具有很大的影響。在特定的資料格式的情況下,KryoSerializer的性能可以達到JavaSerializer的10倍以上,而對于一些Int之類的基本類型資料,性能的提升就幾乎可以忽略。
KryoSerializer依賴Twitter的Chill庫來實作,相對于JavaSerializer,主要的問題在于不是所有的Java Serializable對象都能支援,相容性不好,是以需要手動注冊類。
序列化功能用在兩個地方 : 序列化任務和序列化資料。Spark任務序列化隻支援
JavaSerializer,資料序列化支援JavaSerializer和KryoSerializer。
Spark程式運作時,在shuffle和RDD Cache等過程中,會有大量的資料需要序列化,預設使用JavaSerializer,通過配置讓KryoSerializer作為資料序列化器來提升序列化性能。
在開發應用程式時,添加如下代碼來使用KryoSerializer作為資料序列化器。
- 實作類注冊器并手動注冊類。
public class DemoRegistrator implements KryoRegistrator
{
@Override
public void registerClasses(Kryo kryo)
{
//以下為示例類,請注冊自定義的類
kryo.register(AggrateKey.class);
kryo.register(AggrateValue.class);
}
}
- 配置KryoSerializer作為資料序列化器和類注冊器。
val conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "com.etl.common.DemoRegistrator");
配置記憶體
操作場景
Spark是記憶體計算架構,計算過程中記憶體不夠對Spark的執行效率影響很大。可以通過監控GC(Gabage Collection),評估記憶體中RDD的大小來判斷記憶體是否變成性能瓶頸,并根據情況優化。
監控節點程序的GC情況(在conf目錄下的spark-env.sh添加export SPARK_JAVA_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"會在節點日志裡列印出GC資訊),如果頻繁出現Full GC,需要優化GC。把RDD做Cache操作,通過日志檢視RDD在記憶體中的大小,如果資料太大,需要改變RDD的存儲級别來優化。
操作步驟
-
優化GC,調整老年代和新生代的大小和比例。在conf目錄下的spark-env.sh設定-
XX:NewRatio可以調整老年代和新生代的比例,如export SPARK_JAVA_OPTS="-XX:NewRatio=2",則新生代占整個堆空間的1/3,老年代占2/3
-
開發Spark應用程式時,優化RDD的資料結構。
– 使用原始類型數組替代集合類(fastutil)。
– 避免嵌套結構。
– Key盡量不要使用String。
-
開發Spark應用程式時,序列化RDD
RDD做cache時預設是不序列化資料的,可以通過設定存儲級别來序列化RDD減小記憶體。例如:
設定并行度
操作場景
并行度控制任務的數量,影響shuffle操作後資料被切分成的塊數。調整并行度讓任務的數量和每個任務處理的資料與機器的處理能力達到最優。
檢視CPU使用情況和記憶體占用情況,當任務和資料不是平均分布在各節點,而是集中在個别節點時,可以增大并行度使任務和資料更均勻的分布在各個節點。增加任務的并行度,充分利用叢集機器的計算能力,一般并行度設定為叢集CPU總和的2-3倍。
操作步驟
- 使用spark.default.parallelism設定并行度,請根據實際的記憶體、CPU、資料以及應用程式邏輯的情況調整并行度參數。
val conf = new SparkConf();
conf.set("spark.default.parallelism", 24);
- 在會産生shuffle的操作函數内設定并行度參數,請根據實際的記憶體、CPU、資料以及應用程式邏輯的情況調整并行度參數。
使用廣播變量
操作場景
Broadcast(廣播)可以把資料集合分發到每一個節點上,Spark任務在執行過程中要使
用這個資料集合時,就會在本地查找Broadcast過來的資料集合。如果不使用
Broadcast,每次任務需要資料集合時,都會把資料序列化到任務裡面,不但耗時,還
使任務變得很大。
- 每個任務分片在執行中都需要同一份資料集合時,就可以把公共資料集Broadcast到每個節點,讓每個節點在本地都儲存一份。
- 大表和小表做join操作時可以把小表Broadcast到各個節點,進而就可以把join操作轉變成普通的操作,減少了shuffle操作。
操作步驟
在開發應用程式時,添加如下代碼,将“testArr”資料廣播到各個節點。
val testArr: Array[Long] = new Array[Long](200);
val testBroadcast: Broadcast[Array[Long]] = sc.broadcast(testArr);
val resultRdd: RDD[Long] = Operater.handleData(testBroadcast);
Spark SQL join 優化
操作場景
Spark SQL 中,當對兩個表進行join操作時,利用Broadcast,将小表BroadCast到各個節點上,進而轉變成普通的操作,減少了shuffle操作,提高任務執行性能。
在Spark SQL中進行Join操作時,可以按照以下步驟進行優化。為了友善說明,設表A
和表B,且A、B表都有個名為name列。對A、B表進行join操作。
-
估計表的大小。
根據每次加載資料的大小,來估計表大小。
也可以在Hive的資料庫路徑下檢視表的大小。在Spark的配置檔案hive-site.xml中,檢視Hive的資料庫路徑的配置,預設為/user/hive/warehouse。
<property>
<name>hive.metastore.warehouse.dir</name>
<value>${test.warehouse.dir}</value>
<description></description>
</property>
表的大小。如檢視表A的大小
hadoop fs -du -s -h ${test.warehouse.dir}/a
2. 配置自動廣播的門檻值。
Spark中,判斷表是否廣播的門檻值為10485760(即10M)。如果兩個表的大小至少有一個小于10M時,可以跳過該步驟。
自動廣播門檻值的配置參數
-
spark.sql.autoBroadcastJoinThreshold 預設10485760。當進行join操作時,配置廣播的最大值;當表的位元組數小于該值時便進行廣播。當配置為-1時,将不進行廣播。參見https://spark.apache.org/docs/latest/sql-programming-guide.html。
- 配置自動廣播門檻值的方法:
-
在Spark的配置檔案“spark-defaults.conf”中,設定“spark.sql.autoBroadcastJoinThreshold”的值。其中,根據場景而定,但要求該值至少比其中一個表大。
spark.sql.autoBroadcastJoinThreshold = < size>
利用Hive CLI指令,設定門檻值。在運作Join操作時,提前運作下面語句SET spark.sql.autoBroadcastJoinThreshold=< size>
其中,根據場景而定,但要求該值至少比其中一個表大。
-
進行join操作。
這時join的兩個table,至少有個表是小于門檻值的。
如果A表和B表都小于門檻值,且A表的位元組數小于B表時,則運作 B join A,如
-
SELECT A.name FROM B JOIN A ON A.name = B.name;
否則 運作A join B。
-
SELECT A.name FROM A JOIN B ON A.name = B.name;
使用 Netty 提升傳輸效率
操作場景
Spark應用中往往存在大量的網絡資料傳輸,是以需使用Java NIO來實作。Spark1.2以前預設是基于原生的Java NIO實作的資料傳輸服務,是以其魯棒性并非很完善,而Netty架構簡化了原生Java NIO的操作,并提升了整體的魯棒性。因其提升了資料傳輸的穩定性,是以在配合Sort-based Shuffle,External Shuffle等使用時,其整體性能有一定的提升。
-
spark.shuffle.blockTransferService 資料傳輸的實作方式,目前有兩種實作方式:基于原生NIO和基于Netty架構。預設netty
-
spark.shuffle.io.numConnectionsPerPeer 基于Netty下的資料傳輸服務,兩個節點之間的連接配接數,這些連接配接在資料傳輸期間是一直保持的且可重用。如果叢集中磁盤多而節點少,可以考慮增加該值來提升并發效率。1
說明
1. Spark 1.2+預設使用Netty作為資料傳輸服務;
2. 當傳輸資料量較大時增加連接配接數(增大spark.shuffle.io.numConnectionsPerPeer)可以提高并行
傳輸的資料量,進而提升傳輸性能,但該值并非越大越好,資料傳輸速度依然受限于網絡
I/O。
使用 SortShuffle 提升 shuffle 性能
操作場景
Shuffle是Spark中最為重要的一塊,也是性能調優必須要考慮的。在Spark1.2以前預設使用的是Hash-based Shuffle,其存在着以下幾個問題:
- 産生大量的shuffle檔案;
- 因shuffle結果需要往大量的shuffle檔案寫,會導緻持續不斷的I/O操作,進而磁盤一直處于繁忙狀态而影響性能。Sort-based Shuffle很好的解決了Hash-based存在的問題,其産生的shuffle檔案數遠遠于-based Shuffle,因shuffle檔案數的減少是以I/O相對來說隻是很短時間内會很大,不會造成持續不斷的I/O操作。
說明
1. Spark 1.2+預設使用Sort-based Shuffle;
2. 當記憶體足夠大時,Hash-based和Sort-based這兩種shuffle性能差距;
3. 當使用MLLib時在某些算法下Sort-based的性能會低于Hash-based。
-
spark.shuffle.manager Shuffle的實作方式,目前有兩種實作方式:基于sort和基于hash。預設sort
使用 External Shuffle Service 提升性能
操作場景
Spark系統在運作含shuffle過程的應用時,Executor程序除了運作task,還要負責寫shuffle資料以及給其他Executor提供shuffle資料。當Executor程序任務過重,導緻觸發GC(Garbage Collection)而不能為其他Executor提供shuffle資料時,會影響任務運作。External shuffle Service是長期存在于NodeManager程序中的一個輔助服務。通過該服務來抓取shuffle資料,減少了Executor的壓力,在Executor GC的時候也不會影響其他Executor的任務運作。
-
spark.shuffle.service.enabled 修改為 true
Yarn 模式下動态資源排程
操作場景
對于Spark應用來說,資源是影響Spark應用執行效率的一個重要因素。當一個長期運作的服務(比如Thrift Server),若配置設定給它多個Executor,可是卻沒有任何任務配置設定給它。而此時有其他的應用卻資源緊張,這就造成了很大的資源浪費和資源不合理的排程。
動态資源排程就是為了解決這種場景,根據目前應用任務的負載情況,實時的增減Executor個數,進而實作動态配置設定資源,使整個Spark系統更加健康。
操作步驟
将“spark.dynamicAllocation.enabled”參數的值設定為“true”,表示開啟動态資源排程功能。
下面是一些可選配置
-
spark.dynamicAllocation.minExecutors 最小Executor個數。 預設0
-
spark.dynamicAllocation.initialExecutors 初始Executor個數。 預設spark.dynamicAllocati
-
on.minExecutorsspark.dynamicAllocation.maxExecutors 最大executor個數。預設 Integer.MAX_VALUE
-
spark.dynamicAllocation.schedulerBacklogTimeout 排程第一次逾時時間。 預設1(s)
-
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 排程第二次及之後逾時時間。 預設spark.dynamicAllocati
-
on.schedulerBacklogTimeoutspark.dynamicAllocation.executorIdleTimeout 普通Executor空閑逾時時間。 預設60(s)
-
spark.dynamicAllocation.cachedExecutorIdleTimeout 含有cached blocks的Executor空閑逾時時間。預設spark.dynamicAllocation.executorIdleTimeout的2倍
性能調優示例:廣播變量
操作場景
示例描述:使用Broadcast來實作大小表的join操作,去掉shuffle優化執行效率。
目前環境情況:
- 叢集 :2台
- CPU : 28核
- 記憶體 : 315G,程式實際申請10G資料量
- 大表:25000000條,252M
- 小表: 5000條 ,34k
- 原代碼示例如下:
def main(args: Array[String])
{
val conf = new SparkConf();
val sc = new SparkContext(conf);
val bigTableRDD = sc.textFile("/BigTable.csv")
.map(x =>{val token = x.split(","); (token(0), token(1))});
val smallTableRDD = sc.textFile("/SmallTable.csv")
.map(x =>{val token = x.split(","); (token(0), token(1))});
val resuleRDD = bigTableRDD.rightOuterJoin(smallTableRDD);
resuleRDD.count();
}
-
該代碼的性能問題如下:
使用join算子,會出現shuffle操作,數量較大時,對性能影響很大,因為資料需要
在節點之間傳輸,而且中間節點還要寫入磁盤。
-
對代碼進行優化。
使用廣播把小表廣播出去,作為公共查找資料,然後實作join操作,不會造成節點
間的資料交換也不會寫資料在本地磁盤。
- 優化後代碼如下所示:
def main(args: Array[String])
{
val conf = new SparkConf();
val sc = new SparkContext(conf);
val bigTableRDD = sc.textFile("/BigTable.csv")
.map(x =>{val token = x.split(","); (token(0), token(1))});
val smallTableRDD = sc.textFile("/SmallTable.csv")
.map(x =>{val token = x.split(","); (token(0), token(1))});
val smallTable = smallTableRDD.collect();
val smallTableMap = new HashMap[String, String]();
for(data <- smallTable)
{
smallTableMap.put(data._1, data._2);
}
val smallTableBroadcast: Broadcast[HashMap[String, String]] = sc.broadcast(smallTableMap);
val resultRDD = bigTableRDD.map(x => handle(x, smallTableBroadcast))
.filter(x => x != null)
resultRDD.count();
}
def handle(data: (String, String), smallTableBroadcast: Broadcast[HashMap[String, String]]):
(String, String, String) =
{
val smallTableMap = smallTableBroadcast.value;
val joinData = smallTableMap.get(data._1);
if(joinData == None)
{
return null;
}
else
{
return (data._1, data._2, joinData.get)
}
}
- 優化後的效果如下所示:
- 未優化前時間 :| 117s| 126s|118s
-
優化後時間 :| 14s | 14s | 13s
由上面的運作時間可以看出優化後的效率是未優化效率的近8倍。合理的使用
broadcast減少shuffle操作,會使程式性能得到很大的提升。
經驗總結
使用 mapPartitions,按每個分區計算結果
如果每條記錄的開銷太大。例:
rdd.map{x=>conn=getDBConn;conn.write(x.toString);conn.close}
則可以使用MapPartitions,按每個分區計算結果
rdd.mapPartitions(records => conn.getDBConn;for(item <- records)
write(item.toString); conn.close)
使用 coalesce 調整分片的數量
coalesce可以調整分片的數量。
- 當之前的操作有很多filter時,使用coalesce減少空運作的任務數量。
- 當輸入切片個數太大,導緻程式無法正常運作時使用。
- 當任務數過大時候Shuffle壓力太大導緻程式挂住不動,或者出現linux資源受限的問題。
localDir 配置
Spark的Shuffle過程需要寫本地磁盤,Shuffle是Spark性能的瓶頸,I/O是Shuffle的瓶頸。配置多個磁盤則可以并行的把資料寫入磁盤。如果節點中挂載多個磁盤,在每個磁盤配置一個Spark的localDir。可以有效分散Shuffle檔案的存放,提高磁盤I/O的效率。如果隻有一個磁盤,配置了多個目錄,性能提升不大。
Collect 小資料
大資料量時不适用collect操作。
Collect操作會将Executor的資料發送到Driver端,使用collect前請確定Driver端記憶體足夠,以免發生OutOfMemory。當不确定資料量大小時,可使用saveAsTextFile等操作把資料寫入HDFS,當能大緻确定資料大小且driver記憶體可以存的下的時候,可使用Collect,友善本地調測。
使用 reduceByKey
reduceByKey會在Map端做本地聚合,reduceByKey的Shuffle過程更加平緩。
groupByKey等Shuffle操作不會在Map端做聚合,能使用reduceByKey的地方盡量使用該方式,避免出現groupByKey().map(x=>(x._1,x._2.size))。
廣播 map 代替數組
當每個記錄需要查表,如果是Driver端傳過去的,用廣播方式傳遞資料,資料結構采用set/map替代Iterator。因為Set/Map的查詢速率接近O(1),而Iterator是O(n)。
資料傾斜
當資料發生傾斜(某一部分資料量特别大), 雖然沒有GC(Gabage Collection,垃圾回收),但是task執行時間嚴重不一緻。
- 需要重新設計key,以更小粒度的key使得task大小合理化。
- 修改并行度。
優化資料結構
- 把資料按列存放,讀取資料時就可以隻掃描需要的列。
- 通過設定spark.shuffle.consolidateFiles為true,來合并shuffle中間檔案,減少shuffle檔案的數量,減少檔案IO操作以提升性能。最終檔案數為reduce tasks數目。
Hive
建立表分區
操作場景
Hive在做Select查詢時,一般會掃描整個表内容,會消耗較多時間去掃描不關注的資料。此時,可根據業務需求及其查詢次元,建立合理的表分區,進而提高查詢效率。
操作步驟
- 使用PuTTY工具,以root使用者登入HiveServer所在節點。
-
執行以下指令,進入用戶端安裝目錄,例如“/opt/client”。
cd /opt/client
- 配置用戶端環境變量。
-
在用戶端中執行如下指令,執行登入操作。
kinit 使用者名
-
執行以下指令登入用戶端工具。
beeline
- 指定靜态分區或者動态分區。
-
靜态分區:
靜态分區是手動輸入分區名稱,在建立表時使用關鍵字PARTITIONED BY指
定分區列名及資料類型。應用開發時,使用ALTER TABLE ADD PARTITION
語句增加分區,以及使用LOAD DATA INTO PARTITON語句将資料加載到分
區時,隻能靜态分區。
-
動态分區:通過查詢指令,将結果插入到某個表的分區時,可以使用動态分
區。動态分區通過在用戶端工具執行如下指令來開啟:
set hive.exec.dynamic.partition=true
動态分區預設模式是strict,也就是必須至少指定一列為靜态分區,在靜态分
區下建立動态子分區,可以通過如下設定來開啟完全的動态分區:
set hive.exec.dynamic.partition.mode=nonstrict
注意
動态分區可能導緻一個DML語句建立大量的分區,對應的建立大量新檔案夾,對系統
性能可能帶來影響。
Join 優化
操作場景
使用Join語句時,如果資料量大,可能造成指令執行速度和查詢速度慢,此時可進行Join優化。Join優化可分為Map join和Sort Merge Bucket Map Join兩種方式。
Map join
Hive的Map Join适用于能夠在記憶體中存放下的小表(指表大小小于25M),通過
“hive.mapjoin.smalltable.filesize”定義小表的大小,預設為25M。
Map Join的方法有兩種:
- 使用來使用map join。
- 執行語句前設定如下參數,目前版本中該值預設為true。
set hive.auto.convert.join=true
使用Map Join時沒有Reduce任務,而是在Map任務前起了一個MapReduce Local Task,這個Task通過TableScan讀取小表内容到本機,在本機以HashTable的形式儲存并寫入硬碟上傳到DFS,并在distributed cache中儲存,在Map Task中從本地磁盤或者distributed cache中讀取小表内容直接與大表join得到結果并輸出。
使用Map Join時需要注意小表不能過大,如果小表将記憶體基本用盡,會使整個系統性能下降甚至出現記憶體溢出的異常。
Sort Merge Bucket Map Join
使用Sort Merge Bucket Map Join必須滿足以下2個條件:
- join的兩張表都很大,記憶體中無法存放。
-
兩張表都按照join key進行分桶(clustered by (column))和排序(sorted by(column)),且
兩張表的分桶數正好是倍數關系。
通過如下設定,啟用Sort Merge Bucket Map Join:
set hive.optimize.bucketmapjoin=true
set hive.optimize.bucketmapjoin.sortedmerge=true
這種Map Join也沒有Reduce任務,是在Map任務前啟動MapReduce Local Task,将小表内容按桶讀取到本地,在本機儲存多個桶的HashTable備份并寫入HDFS,并儲存在Distributed Cache中,在Map Task中從本地磁盤或者Distributed Cache中按桶一個一個讀取小表内容,然後與大表做比對直接得到結果并輸出。
注意事項
Join資料傾斜問題
執行任務的時候,任務進度長時間維持在99%,這種現象叫資料傾斜。資料傾斜是經常存在的,因為有少量的Reduce任務配置設定到的資料量和其他Reduce差異過大,導緻大部分Reduce都已完成任務,但少量Reduce任務還沒完成的情況。解決資料傾斜的問題,可通過設定set hive.optimize.skewjoin=true并調整hive.skewjoin.key的大小。hive.skewjoin.key是指Reduce端接收到多少個key即認為資料是傾斜的,并自動分發到多個Reduce。
Group By 優化
操作場景
優化Group by語句,可提升指令執行速度和查詢速度。Group by的時候, Map端會先進行分組, 分組完後分發到Reduce端, Reduce端再進行分組。可采用Map端聚合的方式來進行Group by優化,開啟Map端初步聚合,減少Map的輸出資料量。
操作步驟
在Hive用戶端進行如下設定:
set hive.map.aggr=true
注意事項
Group By資料傾斜
Group By也同樣存在資料傾斜的問題,設定hive.groupby.skewindata為true,生成的查詢計劃會有兩個MapReduce Job,第一個Job的Map輸出結果會随機的分布到Reduce中,每個Reduce做聚合操作,并輸出結果,這樣的處理會使相同的Group By Key可能被分發到不同的Reduce中,進而達到負載均衡,第二個Job再根據預處理的結果按照Group By Key分發到Reduce中完成最終的聚合操作。
Count Distinct聚合問題
當使用聚合函數 count distinct完成去重計數時,處理值為空的情況會使Reduce産生很嚴重的資料傾斜,可以将空值單獨處理,如果是計算count distinct,可以通過where字句将該值排除掉,并在最後的count distinct結果中加1。如果還有其他計算,可以先将值為空的記錄單獨處理,再和其他計算結果合并。