maptask的并行度決定map階段的任務處理并發度,進而影響到整個job的處理速度
那麼,maptask并行執行個體是否越多越好呢?其并行度又是如何決定呢?
一個job的map階段并行度由用戶端在送出job時決定
而用戶端對map階段并行度的規劃的基本邏輯為:
将待處理資料執行邏輯切片(即按照一個特定切片大小,将待處理資料劃分成邏輯上的多個split),然後每一個split配置設定一個maptask并行執行個體處理
這段邏輯及形成的切片規劃描述檔案,由fileinputformat實作類的getsplits()方法完成,其過程如下圖:
a) 簡單地按照檔案的内容長度進行切片
b) 切片大小,預設等于block大小
c) 切片時不考慮資料集整體,而是逐個針對每一個檔案單獨切片
比如待處理資料有兩個檔案:
file1.txt 320m
file2.txt 10m
經過fileinputformat的切片機制運算後,形成的切片資訊如下:
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10m
通過分析源碼,在fileinputformat中,計算切片大小的邏輯:
math.max(minsize,math.min(maxsize, blocksize)); 切片主要由這幾個值來運算決定
minsize:預設值:1
配置參數: mapreduce.input.fileinputformat.split.minsize
maxsize:預設值:long.maxvalue
配置參數:mapreduce.input.fileinputformat.split.maxsize
blocksize
是以,預設情況下,切片大小=blocksize
maxsize(切片最大值):
參數如果調得比blocksize小,則會讓切片變小,而且就等于配置的這個參數的值
minsize (切片最小值):
參數調的比blocksize大,則可以讓切片變得比blocksize還大
但是,不論怎麼調參數,都不能讓多個小檔案“劃入”一個split
選擇并發數的影響因素:
1、運算節點的硬體配置
2、運算任務的類型:cpu密集型還是io密集型
3、運算任務的資料量
如果硬體配置為2*12core + 64g,恰當的map并行度是大約每個節點20-100個map,最好每個map的執行時間至少一分鐘。
l 如果job的每個map或者 reduce task的運作時間都隻有30-40秒鐘,那麼就減少該job的map或者reduce數,每一個task(map|reduce)的setup和加入到排程器中進行排程,這個中間的過程可能都要花費幾秒鐘,是以如果每個task都非常快就跑完了,就會在task的開始和結束的時候浪費太多的時間。
(mapred.job.reuse.jvm.num.tasks,預設是1,表示一個jvm上最多可以順序執行的task
數目(屬于同一個job)是1。也就是說一個task啟一個jvm)
l 如果input的檔案非常的大,比如1tb,可以考慮将hdfs上的每個block size設大,比如設成256mb或者512mb
reducetask的并行度同樣影響整個job的執行并發度和執行效率,但與maptask的并發數由切片數決定不同,reducetask數量的決定是可以直接手動設定:
//預設值是1,手動設定為4
job.setnumreducetasks(4);
如果資料分布不均勻,就有可能在reduce階段産生資料傾斜
注意: reducetask數量并不是任意設定,還要考慮業務邏輯需求,有些情況下,需要計算全局彙總結果,就隻能有1個reducetask
盡量不要運作太多的reduce task。對大多數job來說,最好rduce的個數最多和叢集中的reduce持平,或者比叢集的reduce slots小。這個對于小叢集而言,尤其重要。
hadoop的釋出包中内置了一個hadoop-mapreduce-example-2.4.1.jar,這個jar包中有各種mr示例程式,可以通過以下步驟運作:
啟動hdfs,yarn
然後在叢集中的任意一台伺服器上啟動執行程式(比如運作wordcount):
hadoop jarhadoop-mapreduce-example-2.4.1.jar wordcount /wordcount/data /wordcount/out
<a target="_blank"></a>
jvm重用技術不是指同一job的兩個或兩個以上的task可以同時運作于同一jvm上,而是排隊按順序執行。