Hadoop優化 第一篇 : HDFS/MapReduce
比較慚愧,部落格很久(半年)沒更新了。最近也自己搭了個部落格,wordpress玩的還不是很熟,感興趣的朋友可以多多交流哈!位址是:http://www.leocook.org/
另外,我建了個QQ群:305994766,希望對大資料、算法研發、系統架構感興趣的朋友能夠加入進來,大家一起學習,共同進步(進群請說明自己的公司-職業-昵稱)。
1.應用程式角度進行優化
1.1.減少不必要的reduce任務
若對于同一份資料需要多次處理,可以嘗試先排序、分區,然後自定義InputSplit将某一個分區作為一個Map的輸入,在Map中處理資料,将Reduce的個數設定為空。
1.2.外部檔案引用
如字典、配置檔案等需要在Task之間共享的資料,可使用分布式緩存DistributedCache或者使用-files
1.3.使用Combiner
combiner是發生在map端的,作用是歸并Map端輸出的檔案,這樣Map端輸出的資料量就小了,減少了Map端和reduce端間的資料傳輸。需要注意的是,Combiner不能影響作業的結果;不是每個MR都可以使用Combiner的,需要根據具體業務來定;Combiner是發生在Map端的,不能垮Map來執行(隻有Reduce可以接收多個Map任務的輸出資料)
1.4.使用合适的Writable類型
盡可能使用二進制的Writable類型,例如:IntWritable, FloatWritable等,而不是Text。因為在一個批處理系統中将數值轉換為文本時低效率的。使用二進制的Writable類型可以降低cpu資源的消耗,也可以減少Map端中間資料、結果資料占用的空間。
1.5.盡可能的少建立新的Java對象
a)需要注意的Writable對象,例如下面的寫法:
public void map(...) {
…
for (String word : words) {
output.collect(new Text(word), new IntWritable(1));
}
}
這樣會沖去建立對象new Text(word)和new IntWritable(1)),這樣可能會産生海量的短周期對象。更高效的寫法見下:
class MyMapper … {
Text wordText = new Text();
IntWritable one = new IntWritable(1);
public void map(...) {
for (String word: words) {
wordText.set(word);
output.collect(wordText, one);
}
}
}
b)對于可變字元串,使用StringBuffer而不是String
String類是經過final修飾的,那麼每次對它的修改都會産生臨時對象,而SB則不會。
2. Linux系統層面上的配置調優
2.1. 檔案系統的配置
a) 關閉檔案在被操作時會記下時間戳:noatime和nodiratime
b) 選擇I/O性能較好的檔案系統(Hadoop比較依賴本地的檔案系統)
2.2. Linux檔案系統預讀緩沖區大小
指令blockdev
2.3. 去除RAID和LVM
2.4. 增大同時打開的檔案數和網絡連接配接數
ulimit
net.core.somaxconn
2.5. 關閉swap分區
在Hadoop中,對于每個作業處理的資料量和每個Task中用到的各種緩沖,使用者都是完全可控的。
/etc/sysctl.conf
2.6. I/O排程器選擇
詳情見AMD的白皮書
3. Hadoop平台内參數調優
Hadoop相關可配置參數共有幾百個,但是其中隻有三十個左右會對其性能産生顯著影響。
3.1. 計算資源優化
a) 設定合理的slot(資源槽位)
mapred.tasktracker.map.tasks.maximum / mapred.tasktracker.reduce.tasks.maximum
參數說明:每個TaskTracker上可并發執行的Map Task和Reduce Task數目
預設值:都是2
推薦值:根據具體的節點資源來看,推薦值是(core_per_node)/2~2*(cores_per_node)
機關:無
3.2. 節點間的通信優化
a) TaskTracker和JobTracker之間的心跳間隔
這個值太小的話,在一個大叢集中會造成JobTracker需要處理高并發心跳,可能會有很大的壓力。
建議叢集規模小于300時,使用預設值3秒,在此基礎上,叢集規模每增加100台,會加1秒。
b) 啟用帶外心跳(out-of-band heartbeat)
mapreduce.tasktracker.outofband.heartbeat
參數說明:主要是為了減少任務配置設定延遲。它與正常心跳不同,一般的心跳是一定時間間隔發送的,而帶外心跳是在任務運作結束或是失敗時發送,這樣就能在TaskTracker節點出現空閑資源的時候能第一時間通知JobTracker。
3.3. 磁盤塊的配置優化
a) 作業相關的磁盤配置:mapred.local.dir
參數說明:map本地計算時所用到的目錄,建議配置在多塊硬碟上
b) 存儲相關的磁盤配置(HDFS資料存儲):dfs.data.dir
參數說明:HDFS的資料存儲目錄,建議配置在多塊硬碟上,可提高整體IO性能
例如:
<property>
<name>dfs.name.dir</name>
<value>/data1/hadoopdata/mapred/jt/,/data2/hadoopdata/mapred/jt/</value>
</property>
c) 存儲相關的磁盤配置(HDFS中繼資料存儲):dfs.name.dir
參數說明:HDFS的中繼資料存儲目錄,建議設定多目錄,每個多目錄都可儲存中繼資料的一個備份
注:要想提升hadoop整體IO性能,對于hadoop中用到的所有檔案目錄,都需要評估它磁盤IO的負載,對于IO負載可能會高的目錄,最好都配置到多個磁盤上,以提示IO性能
3.4. RPC Handler個數和Http線程數優化
a) RPC Handler個數(mapred.job.tracker.handler.count)
參數說明:JobTracker需要并發的處理來自各個TaskTracker的RPC請求,可根據叢集規模和并發數來調整RPC Handler的個數。
預設值:10
推薦值:60-70,最少要是TaskTracker個數的4%
機關:無
b) Http線程數(tasktracker.http.threads)
在Shuffle階段,Reduce Task會通過Http請求從各個TaskTracker上讀取Map Task的結果,TaskTracker是使用Jetty Server來提供服務的,這裡可适量調整Jetty Server的工作線程以提高它的并發處理能力。
預設值:40
推薦值:50-80+
3.5. 選擇合适的壓縮算法
mapred.compress.map.output / Mapred.output.compress
map輸出的中間結果時需要進行壓縮的,指定壓縮方式(Mapred.compress.map.output.codec/ Mapred.output.compress.codec)。推薦使用LZO壓縮。
3.6. 啟用批量任務排程(現在新版本都預設支援了)
a) Fair Scheduler
mapred.fairscheduler.assignmultiple
b) Capacity Scheduler
3.7. 啟用預讀機制(Apache暫時沒有)
Hadoop是順序讀,是以預讀機制可以很明顯的提高HDFS的讀性能。
HDFS預讀:
dfs.datanode.readahead :true
dfs.datanode.readahead.bytes :4MB
shuffle預讀:
mapred.tasktracker.shuffle.fadvise : true
mapred.tasktracker.shuffle.readahead.bytes : 4MB
3.8.HDFS相關參數優化
1) dfs.replication
參數說明:hdfs檔案副本數
預設值:3
推薦值:3-5(對于IO較為密集的場景可适量增大)
機關:無
2) dfs.blocksize
參數說明:
預設值:67108864(64MB)
推薦值:稍大型叢集建議設為128MB(134217728)或256MB(268435456)
機關:無
3) dfs.datanode.handler.count
參數說明:DateNode上的服務線程數
預設值:10
推薦值:
機關:無
4) fs.trash.interval
參數說明:HDFS檔案删除後會移動到垃圾箱,該參數時清理垃圾箱的時間
預設值:0
推薦值:1440(1day)
機關:無
5) io.sort.factor
參數說明:當一個map task執行完之後,本地磁盤上(mapred.local.dir)有若幹個spill檔案,map task最後做的一件事就是執行merge sort,把這些spill檔案合成一個檔案(partition)。執行merge sort的時候,每次同時打開多少個spill檔案由該參數決定。打開的檔案越多,不一定merge sort就越快,是以要根據資料情況适當的調整。
預設值:10
推薦值:
機關:無
6) mapred.child.java.opts
參數說明:JVM堆的最大可用記憶體
預設值:-Xmx200m
推薦值:-Xmx1G | -Xmx4G | -Xmx8G
機關:-Xmx8589934592也行,機關不固定
7) io.sort.mb
參數說明:Map Task的輸出結果和中繼資料在記憶體中占的buffer總大小,當buffer達到一定閥值時,會啟動一個背景程序來對buffer裡的内容進行排序,然後寫入本地磁盤,形成一個split小檔案
預設值:100
推薦值:200 | 800
機關:兆
8) io.sort.spill.percent
參數說明:即io.sort.mb中所說的閥值
預設值:0.8
推薦值:0.8
機關:無
9) io.sort.record
參數說明:io.sort.mb中分類給中繼資料的空間占比
預設值:0.05
推薦值:0.05
機關:無
10) Mapred.reduce.parallel
參數說明:Reduce shuffle階段copier線程數。預設是5,對于較大叢集,可調整為16~25
預設值:5
推薦值:16~25
機關:無
4.系統實作角度調優
https://www.xiaohui.org/archives/944.html
主要針對HDFS進行優化,HDFS性能低下的兩個原因:排程延遲和可移植性
4.1. 排程延遲
關于排程延遲主要是發生在兩個階段:
a) tasktracker上出現空餘的slot到該tasktracker接收到新的task;
b) tasktracker擷取到了新的Task後,到連接配接上了datanode,并且可以讀寫資料。
之是以說這兩個階段不夠高效,因為一個分布式計算系統需要解決的是計算問題,如果把過多的時間花費在其它上,就顯得很不合适,例如線程等待、高負荷的資料傳輸。
下面解釋下會經曆上邊兩個階段發生的過程:
a) 當tasktracker上出現slot時,他會調用heartbeat方法向jobtracker發送心跳包(預設時間間隔是3秒,叢集很大時可适量調整)來告知它,假設此時有準備需要執行的task,那麼jobtracker會采用某種排程機制(排程機制很重要,是一個可以深度研究的東東)選擇一個Task,然後通過調用heartbeat方法發送心跳包告知tasktracker。在該過程中,HDFS一直處于等待狀态,這就使得資源使用率不高。
b) 這個過程中所發生的操作都是串行化的:tasktracker會連接配接到namenode上擷取到自己需要的資料在datanode上的存儲情況,然後再從datanode上讀資料,在該過程中,HDFS一直處于等待狀态,這就使得資源使用率不高。
若能減短hdfs的等待時間;在執行task之前就開始把資料讀到将要執行該task的tasktracker上,減少資料傳輸時間,那麼将會顯得高效很多。未解決此類問題,有這樣幾種解決方案:重疊I/O和CPU階段(pipelining),task預取(task prefetching),資料預取(data prefetching)等。
4.2. 可移植性
Hadoop是Java寫的,是以可移植性相對較高。由于它屏蔽了底層檔案系統,是以無法使用底層api來優化資料的讀寫。在活躍度較高的叢集裡(例如共享叢集),大量并發讀寫會增加磁盤的随機尋道時間,這會降低讀寫效率;在大并發寫的場景下,還會增加大量的磁盤碎片,這樣将會大大的增加了讀資料的成本,hdfs更适合檔案順序讀取。
對于上述問題,可以嘗試使用下面的解決方案:
tasktracker現在的線程模型是:one thread per client,即每個client連接配接都是由一個線程處理的(包括接受請求、處理請求,傳回結果)。那麼這一塊一個拆分成兩個部分來做,一組線程來處理和client的通信(Client Threads),一組用于資料的讀寫(Disk Threads)。
想要解決上述兩個問題,暫時沒有十全十美的辦法,隻能盡可能的權衡保證排程延遲相對較低+可移植性相對較高。
4.3. 優化政策:Prefetching與preshuffling
a) Prefetching包括Block-intra prefetching和Block-inter prefetching:
Block-intra prefetching:對block内部資料處理方式進行了優化,即一邊進行計算,一邊預讀将要用到的資料。這種方式需要解決兩個難題:一個是計算和預取同步,另一個是确定合适的預取率。前者可以使用進度條(processing bar)的概念,進度條主要是記錄計算資料和預讀資料的進度,當同步被打破時發出同步失效的通知。後者是要根據實際情況來設定,可采用重複試驗的方法來确定。
Block-inter prefetching:在block層面上預讀資料,在某個Task正在處理資料塊A1的時候,預測器能預測接下來将要讀取的資料塊A2、A3、A4,然後把資料塊A2、A3、A4預讀到Task所在的rack上。
b) preshuffling
資料被map task處理之前,由預測器判斷每條記錄将要被哪個reduce task處理,将這些資料交給靠近reduce task的map task來處理。
參考資料:
cloudera官方文檔
http://blog.cloudera.com/blog/2009/12/7-tips-for-improving-mapreduce-performance/
AMD白皮書(較為實用)
http://www.admin-magazine.com/HPC/content/download/9408/73372/file/Hadoop_Tuning_Guide-Version5.pdf
國内部落格(大部分内容都是AMD白皮書上的翻譯):
http://dongxicheng.org/mapreduce/hadoop-optimization-0/
http://dongxicheng.org/mapreduce/hadoop-optimization-1/