天天看點

Hadoop優化 第一篇 : HDFS/MapReduce - leocook

Hadoop優化 第一篇 : HDFS/MapReduce

比較慚愧,部落格很久(半年)沒更新了。最近也自己搭了個部落格,wordpress玩的還不是很熟,感興趣的朋友可以多多交流哈!位址是:http://www.leocook.org/

另外,我建了個QQ群:305994766,希望對大資料、算法研發、系統架構感興趣的朋友能夠加入進來,大家一起學習,共同進步(進群請說明自己的公司-職業-昵稱)。

1.應用程式角度進行優化

1.1.減少不必要的reduce任務

若對于同一份資料需要多次處理,可以嘗試先排序、分區,然後自定義InputSplit将某一個分區作為一個Map的輸入,在Map中處理資料,将Reduce的個數設定為空。

1.2.外部檔案引用

如字典、配置檔案等需要在Task之間共享的資料,可使用分布式緩存DistributedCache或者使用-files

1.3.使用Combiner

combiner是發生在map端的,作用是歸并Map端輸出的檔案,這樣Map端輸出的資料量就小了,減少了Map端和reduce端間的資料傳輸。需要注意的是,Combiner不能影響作業的結果;不是每個MR都可以使用Combiner的,需要根據具體業務來定;Combiner是發生在Map端的,不能垮Map來執行(隻有Reduce可以接收多個Map任務的輸出資料)

1.4.使用合适的Writable類型

盡可能使用二進制的Writable類型,例如:IntWritable, FloatWritable等,而不是Text。因為在一個批處理系統中将數值轉換為文本時低效率的。使用二進制的Writable類型可以降低cpu資源的消耗,也可以減少Map端中間資料、結果資料占用的空間。

1.5.盡可能的少建立新的Java對象

a)需要注意的Writable對象,例如下面的寫法:

public void map(...) {
    …
    for (String word : words) {
        output.collect(new Text(word), new IntWritable(1));
    }
}      

這樣會沖去建立對象new Text(word)和new IntWritable(1)),這樣可能會産生海量的短周期對象。更高效的寫法見下:

class MyMapper … {
    Text wordText = new Text();
    IntWritable one = new IntWritable(1);
    public void map(...) {
        for (String word: words) {
        wordText.set(word);
            output.collect(wordText, one);
        }
    }
}      

b)對于可變字元串,使用StringBuffer而不是String

String類是經過final修飾的,那麼每次對它的修改都會産生臨時對象,而SB則不會。

2. Linux系統層面上的配置調優

2.1. 檔案系統的配置

a) 關閉檔案在被操作時會記下時間戳:noatime和nodiratime

b) 選擇I/O性能較好的檔案系統(Hadoop比較依賴本地的檔案系統)

2.2. Linux檔案系統預讀緩沖區大小

指令blockdev

2.3. 去除RAID和LVM

2.4. 增大同時打開的檔案數和網絡連接配接數

ulimit

net.core.somaxconn

2.5. 關閉swap分區

在Hadoop中,對于每個作業處理的資料量和每個Task中用到的各種緩沖,使用者都是完全可控的。

/etc/sysctl.conf

2.6. I/O排程器選擇

詳情見AMD的白皮書

3. Hadoop平台内參數調優

Hadoop相關可配置參數共有幾百個,但是其中隻有三十個左右會對其性能産生顯著影響。

3.1. 計算資源優化

a) 設定合理的slot(資源槽位)

mapred.tasktracker.map.tasks.maximum / mapred.tasktracker.reduce.tasks.maximum

參數說明:每個TaskTracker上可并發執行的Map Task和Reduce Task數目

預設值:都是2

推薦值:根據具體的節點資源來看,推薦值是(core_per_node)/2~2*(cores_per_node)

機關:無

3.2. 節點間的通信優化

a) TaskTracker和JobTracker之間的心跳間隔

這個值太小的話,在一個大叢集中會造成JobTracker需要處理高并發心跳,可能會有很大的壓力。

建議叢集規模小于300時,使用預設值3秒,在此基礎上,叢集規模每增加100台,會加1秒。

b) 啟用帶外心跳(out-of-band heartbeat)

mapreduce.tasktracker.outofband.heartbeat

參數說明:主要是為了減少任務配置設定延遲。它與正常心跳不同,一般的心跳是一定時間間隔發送的,而帶外心跳是在任務運作結束或是失敗時發送,這樣就能在TaskTracker節點出現空閑資源的時候能第一時間通知JobTracker。

3.3. 磁盤塊的配置優化

a) 作業相關的磁盤配置:mapred.local.dir

參數說明:map本地計算時所用到的目錄,建議配置在多塊硬碟上

b) 存儲相關的磁盤配置(HDFS資料存儲):dfs.data.dir

參數說明:HDFS的資料存儲目錄,建議配置在多塊硬碟上,可提高整體IO性能

例如:

<property>
  <name>dfs.name.dir</name>
  <value>/data1/hadoopdata/mapred/jt/,/data2/hadoopdata/mapred/jt/</value>
</property>      

c) 存儲相關的磁盤配置(HDFS中繼資料存儲):dfs.name.dir

參數說明:HDFS的中繼資料存儲目錄,建議設定多目錄,每個多目錄都可儲存中繼資料的一個備份

注:要想提升hadoop整體IO性能,對于hadoop中用到的所有檔案目錄,都需要評估它磁盤IO的負載,對于IO負載可能會高的目錄,最好都配置到多個磁盤上,以提示IO性能

3.4. RPC Handler個數和Http線程數優化

a) RPC Handler個數(mapred.job.tracker.handler.count)

參數說明:JobTracker需要并發的處理來自各個TaskTracker的RPC請求,可根據叢集規模和并發數來調整RPC Handler的個數。

預設值:10

推薦值:60-70,最少要是TaskTracker個數的4%

機關:無

b) Http線程數(tasktracker.http.threads)

在Shuffle階段,Reduce Task會通過Http請求從各個TaskTracker上讀取Map Task的結果,TaskTracker是使用Jetty Server來提供服務的,這裡可适量調整Jetty Server的工作線程以提高它的并發處理能力。

預設值:40

推薦值:50-80+

3.5. 選擇合适的壓縮算法

mapred.compress.map.output / Mapred.output.compress

map輸出的中間結果時需要進行壓縮的,指定壓縮方式(Mapred.compress.map.output.codec/ Mapred.output.compress.codec)。推薦使用LZO壓縮。

3.6. 啟用批量任務排程(現在新版本都預設支援了)

a) Fair Scheduler

mapred.fairscheduler.assignmultiple

b) Capacity Scheduler

3.7. 啟用預讀機制(Apache暫時沒有)

Hadoop是順序讀,是以預讀機制可以很明顯的提高HDFS的讀性能。

HDFS預讀:

dfs.datanode.readahead :true

dfs.datanode.readahead.bytes :4MB

shuffle預讀:

mapred.tasktracker.shuffle.fadvise : true

mapred.tasktracker.shuffle.readahead.bytes : 4MB

3.8.HDFS相關參數優化

1) dfs.replication

參數說明:hdfs檔案副本數

預設值:3

推薦值:3-5(對于IO較為密集的場景可适量增大)

機關:無

2) dfs.blocksize

參數說明:

預設值:67108864(64MB)

推薦值:稍大型叢集建議設為128MB(134217728)或256MB(268435456)

機關:無

3) dfs.datanode.handler.count

參數說明:DateNode上的服務線程數

預設值:10

推薦值:

機關:無

4) fs.trash.interval

參數說明:HDFS檔案删除後會移動到垃圾箱,該參數時清理垃圾箱的時間

預設值:0

推薦值:1440(1day)

機關:無

5) io.sort.factor

參數說明:當一個map task執行完之後,本地磁盤上(mapred.local.dir)有若幹個spill檔案,map task最後做的一件事就是執行merge sort,把這些spill檔案合成一個檔案(partition)。執行merge sort的時候,每次同時打開多少個spill檔案由該參數決定。打開的檔案越多,不一定merge sort就越快,是以要根據資料情況适當的調整。

預設值:10

推薦值:

機關:無

6) mapred.child.java.opts

參數說明:JVM堆的最大可用記憶體

預設值:-Xmx200m

推薦值:-Xmx1G | -Xmx4G | -Xmx8G

機關:-Xmx8589934592也行,機關不固定

7) io.sort.mb

參數說明:Map Task的輸出結果和中繼資料在記憶體中占的buffer總大小,當buffer達到一定閥值時,會啟動一個背景程序來對buffer裡的内容進行排序,然後寫入本地磁盤,形成一個split小檔案

預設值:100

推薦值:200 | 800

機關:兆

8) io.sort.spill.percent

參數說明:即io.sort.mb中所說的閥值

預設值:0.8

推薦值:0.8

機關:無

9) io.sort.record

參數說明:io.sort.mb中分類給中繼資料的空間占比

預設值:0.05

推薦值:0.05

機關:無

10) Mapred.reduce.parallel

參數說明:Reduce shuffle階段copier線程數。預設是5,對于較大叢集,可調整為16~25

預設值:5

推薦值:16~25

機關:無

4.系統實作角度調優

https://www.xiaohui.org/archives/944.html

主要針對HDFS進行優化,HDFS性能低下的兩個原因:排程延遲和可移植性

4.1. 排程延遲

關于排程延遲主要是發生在兩個階段:

a) tasktracker上出現空餘的slot到該tasktracker接收到新的task;

b) tasktracker擷取到了新的Task後,到連接配接上了datanode,并且可以讀寫資料。

之是以說這兩個階段不夠高效,因為一個分布式計算系統需要解決的是計算問題,如果把過多的時間花費在其它上,就顯得很不合适,例如線程等待、高負荷的資料傳輸。

下面解釋下會經曆上邊兩個階段發生的過程:

a) 當tasktracker上出現slot時,他會調用heartbeat方法向jobtracker發送心跳包(預設時間間隔是3秒,叢集很大時可适量調整)來告知它,假設此時有準備需要執行的task,那麼jobtracker會采用某種排程機制(排程機制很重要,是一個可以深度研究的東東)選擇一個Task,然後通過調用heartbeat方法發送心跳包告知tasktracker。在該過程中,HDFS一直處于等待狀态,這就使得資源使用率不高。

b) 這個過程中所發生的操作都是串行化的:tasktracker會連接配接到namenode上擷取到自己需要的資料在datanode上的存儲情況,然後再從datanode上讀資料,在該過程中,HDFS一直處于等待狀态,這就使得資源使用率不高。

若能減短hdfs的等待時間;在執行task之前就開始把資料讀到将要執行該task的tasktracker上,減少資料傳輸時間,那麼将會顯得高效很多。未解決此類問題,有這樣幾種解決方案:重疊I/O和CPU階段(pipelining),task預取(task prefetching),資料預取(data prefetching)等。

4.2. 可移植性

Hadoop是Java寫的,是以可移植性相對較高。由于它屏蔽了底層檔案系統,是以無法使用底層api來優化資料的讀寫。在活躍度較高的叢集裡(例如共享叢集),大量并發讀寫會增加磁盤的随機尋道時間,這會降低讀寫效率;在大并發寫的場景下,還會增加大量的磁盤碎片,這樣将會大大的增加了讀資料的成本,hdfs更适合檔案順序讀取。

對于上述問題,可以嘗試使用下面的解決方案:

tasktracker現在的線程模型是:one thread per client,即每個client連接配接都是由一個線程處理的(包括接受請求、處理請求,傳回結果)。那麼這一塊一個拆分成兩個部分來做,一組線程來處理和client的通信(Client Threads),一組用于資料的讀寫(Disk Threads)。

想要解決上述兩個問題,暫時沒有十全十美的辦法,隻能盡可能的權衡保證排程延遲相對較低+可移植性相對較高。

4.3. 優化政策:Prefetching與preshuffling

a) Prefetching包括Block-intra prefetching和Block-inter prefetching:

Block-intra prefetching:對block内部資料處理方式進行了優化,即一邊進行計算,一邊預讀将要用到的資料。這種方式需要解決兩個難題:一個是計算和預取同步,另一個是确定合适的預取率。前者可以使用進度條(processing bar)的概念,進度條主要是記錄計算資料和預讀資料的進度,當同步被打破時發出同步失效的通知。後者是要根據實際情況來設定,可采用重複試驗的方法來确定。

Block-inter prefetching:在block層面上預讀資料,在某個Task正在處理資料塊A1的時候,預測器能預測接下來将要讀取的資料塊A2、A3、A4,然後把資料塊A2、A3、A4預讀到Task所在的rack上。

b) preshuffling

資料被map task處理之前,由預測器判斷每條記錄将要被哪個reduce task處理,将這些資料交給靠近reduce task的map task來處理。

參考資料:

cloudera官方文檔

http://blog.cloudera.com/blog/2009/12/7-tips-for-improving-mapreduce-performance/

AMD白皮書(較為實用)

http://www.admin-magazine.com/HPC/content/download/9408/73372/file/Hadoop_Tuning_Guide-Version5.pdf

國内部落格(大部分内容都是AMD白皮書上的翻譯):

http://dongxicheng.org/mapreduce/hadoop-optimization-0/

http://dongxicheng.org/mapreduce/hadoop-optimization-1/