MapReduce的定義
Hadoop 中的 MapReduce 是一個使用簡單的軟體架構,基于它寫出來的應用程式能夠運作在由上千個商用機器組成的大型叢集上,并以一種可靠容錯式并行處理TB級别的資料集
mapreduce的優點
1、MapReduce 易于程式設計 。它簡單的實作一些接口,就可以完成一個分布式程式,這個分布式程式可以分布到大量廉價的 PC 機器運作。也就是說你寫一個分布式程式,跟寫一個簡單的串行程式是一模一樣的。就是因為這個特點使得 MapReduce 程式設計變得非常流行。
2、良好的 擴充性 。當你的計算資源不能得到滿足的時候,你可以通過簡單的增加機器來擴充它的計算能力。
3、 高容錯性 。MapReduce 設計的初衷就是使程式能夠部署在廉價的 PC 機器上,這就要求它具有很高的容錯性。比如其中一台機器挂了,它可以把上面的計算任務轉移到另外一個節點上面上運作,不至于這個任務運作失敗,而且這個過程不需要人工參與,而完全是由Hadoop 内部完成的。
4、适合 PB 級以上海量資料的 離線處理 。這裡加紅字型離線處理,說明它适合離線處理而不适合線上處理。比如像毫秒級别的傳回一個結果,MapReduce 很難做到。
mapreduce的不能實作的
1 、實時計算。MapReduce 無法像 Mysql 一樣,在毫秒或者秒級内傳回結果。
2、流式計算。流式計算的輸入資料時動态的,而 MapReduce 的輸入資料集是靜态的,不能動态變化。這是因為 MapReduce 自身的設計特點決定了資料源必須是靜态的。
3、DAG(有向圖)計算。多個應用程式存在依賴關系,後一個應用程式的輸入為前一個的輸出。在這種情況下,MapReduce 并不是不能做,而是使用後,每個MapReduce 作業的輸出結果都會寫入到磁盤,會造成大量的磁盤IO,導緻性能非常的低下。
MapReduce運作過程
如上圖所示是MR的運作詳細過程:圖中用紅色粗線描述的元件都是可以重寫的。
maptask階段
1、首先mapTask讀檔案是通過InputFormat(内部是調RecordReader()–>read())來一次讀一行,傳回K,V值。(預設是TextInputFormat,還可以輸入其他的類型DBInputFormat讀取資料庫資料,squeceinputFormat讀取squece檔案)。程式會根據InputFormat将輸入檔案分割成splits,每個split會作為一個map task的輸入。
2、mapper(map(k,v))–>context.write()
shuffle階段
1、輸出資料到OutputCollector收集器(不會輸出一組就傳到下一步進行處理,而是需要一個收集的過程,減少IO)
2、将收集到的資料寫到環形緩沖區(資料結構其實就是個位元組數組byte[]),通過Spiller來将溢出的資料溢出到檔案中去(在這裡會通過hashPartitioner執行分區、通過key.comPareTo來實作排序(分為系統預設的快排和外部排序)即實作了shuffle的核心機制:分區和排序)。
3、将多個溢出檔案進行Merge(采用歸并排序),合并成一個大檔案(一個大檔案對應一個maptask)
4、将檔案下載下傳到ReduceTask的本地磁盤工作目錄–>将多個MapTask的輸出大檔案再進行歸并排序(也可以說是ReduceTask去各個mapTask對應的分區去取對應的資料)。
reducetask階段
1、Reducer(reduce(k,v))–>context.write(k,v)–>OutputFormat(RecordWriter.write(k,v))即reduceTask階段。
2、将資料寫到part-r-00000這裡
MapReduce程式設計模式
1、輸入資料接口:InputFormat —> FileInputFormat(檔案類型資料讀取的通用抽象類) DBInputFormat (資料庫資料讀取的通用抽象類,預設使用的實作類是 :TextInputFormat。job.setInputFormatClass(TextInputFormat.class)TextInputFormat的功能邏輯是:一次讀一行文本,然後将該行的起始偏移量作為key,行内容作為value傳回
2、邏輯處理接口: Mapper,完全需要使用者自己去實作其中 map() setup() clean()
3、map輸出的結果在shuffle階段會被partition以及sort,此處有兩個接口可自定義:Partitioner有預設實作 HashPartitioner,邏輯是 根據key和numReduces來傳回一個分區号; key.hashCode()&Integer.MAXVALUE % numReduces,通常情況下,用預設的這HashPartitioner就可以,如果業務上有特别的需求,可以自定義分區實作Partitioner接口,重寫getpartition()方法。當我們用自定義的對象作為key來輸出時,就必須要實作WritableComparable接口,override其中的compareTo()方法。
4、reduce端的資料分組比較接口 : Groupingcomparator,reduceTask拿到輸入資料(一個partition的所有資料)後,首先需要對資料進行分組,其分組的預設原則是key相同,然後對每一組kv資料調用一次reduce()方法,并且将這一組kv中的第一個kv的key作為參數傳給reduce的key,将這一組資料的value的疊代器傳給reduce()的values參數
5、邏輯處理接口:Reducer
完全需要使用者自己去實作其中 reduce() setup() clean()
6、輸出資料接口: OutputFormat —> 有一系列子類 FileOutputformat DBoutputFormat …..
預設實作類是TextOutputFormat,功能邏輯是: 将每一個KV對向目标文本檔案中輸出為一行
MapReduce注意事項
環形緩存區:(資料從outputCollector中傳入環形緩存區,直到達到80%的緩存時,緩存才會啟用清理機制,将已經溢出的資料溢出到檔案中去(通過spiller來将資料溢出到檔案中去))會溢出多次,每次溢出都會對資料進行分區排序,形成多個分區排序後的資料,最終進行合并。
combiner的作用:對spiller階段的溢出資料進行一個reduce處理,直接讓相同k的value值相加,減少資料量以及傳輸過程中的開銷,大大提高效率。(根據業務需求使用,并不是每個業務都要用。可自定義一個Combiner類,内部邏輯和Reduce類似)
shuffle:洗牌、發牌——(核心機制:資料分區,排序,緩存)具體來說:就是将maptask輸出的處理結果資料,分發給reducetask,并在分發的過程中,對資料按key進行了分區和排序;
資料傾斜:指的是任務在shuffle階段時會進行一個分區操作(預設的是hashcode取模),如果有大部分資料被分到一個ReduceTask端進行處理,一小部分任務被分到其他的ReduceTask端進行處理,就會造成其他ReduceTask處理完成後,仍有一個ReduceTask還在處理資料。最終造成整個工程延遲的情況。(為了解決這個問題,引入了Partition)
大量小檔案優化:預設的情況下TextInputFormat是按照檔案規劃切片,不管檔案多小,都會是一個單獨的切片,都會交給一個maptask。如果有大量的小檔案,就會産生大量的maptask,處理效率低下。最好先将小檔案合成大檔案後上傳的hdfs。如果已經存在hdfs中了,可以使用另種inputFormat來做切片(ConbineFileInputFormat的子類ConbineTextInputFormat),它的切片邏輯和TextInputFormat不同,它可以将多個小檔案規劃到一個切片中,這樣多個小檔案就可以交給一個maptask去處理。
wordcount的代碼實作
需要jar包
Hadoop-2.4.1\share\hadoop\hdfs\hadoop-hdfs-2.4.1.jar
hadoop-2.4.1\share\hadoop\hdfs\lib\所有jar包
hadoop-2.4.1\share\hadoop\common\hadoop-common-2.4.1.jar
hadoop-2.4.1\share\hadoop\common\lib\所有jar包
hadoop-2.4.1\share\hadoop\mapreduce\除hadoop-mapreduce-examples-2.4.1.jar之外的jar包
hadoop-2.4.1\share\hadoop\mapreduce\lib\所有jar包
mapper類實作
/*
* KEYIN:輸入kv資料對中key的資料類型
* VALUEIN:輸入kv資料對中value的資料類型
* KEYOUT:輸出kv資料對中key的資料類型
* VALUEOUT:輸出kv資料對中value的資料類型
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
/*
* map方法是提供給map task程序來調用的,map task程序是每讀取一行文本來調用一次我們自定義的map方法
* map task在調用map方法時,傳遞的參數:
* 一行的起始偏移量LongWritable作為key
* 一行的文本内容Text作為value
*/
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
//拿到一行文本内容,轉換成String 類型
String line = value.toString();
//将這行文本切分成單詞
String[] words=line.split(" ");
//輸出<單詞,1>
for(String word:words){
context.write(new Text(word), new IntWritable(1));
}
}
}
reducer類實作
/*
* KEYIN:對應mapper階段輸出的key類型
* VALUEIN:對應mapper階段輸出的value類型
* KEYOUT:reduce處理完之後輸出的結果kv對中key的類型
* VALUEOUT:reduce處理完之後輸出的結果kv對中value的類型
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
/*
* reduce方法提供給reduce task程序來調用
*
* reduce task會将shuffle階段分發過來的大量kv資料對進行聚合,聚合的機制是相同key的kv對聚合為一組
* 然後reduce task對每一組聚合kv調用一次我們自定義的reduce方法
* 比如:<hello,1><hello,1><hello,1><tom,1><tom,1><tom,1>
* hello組會調用一次reduce方法進行處理,tom組也會調用一次reduce方法進行處理
* 調用時傳遞的參數:
* key:一組kv中的key
* values:一組kv中所有value的疊代器
*/
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
//定義一個計數器
int count = 0;
//通過value這個疊代器,周遊這一組kv中所有的value,進行累加
for(IntWritable value:values){
count+=value.get();
}
//輸出這個單詞的統計結果
context.write(key, new IntWritable(count));
}
}
job送出用戶端實作:
public class WordCountJobSubmitter {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job wordCountJob = Job.getInstance(conf);
//重要:指定本job所在的jar包
wordCountJob.setJarByClass(WordCountJobSubmitter.class);
//指定自定義的分區器
//wordCountJob.setPartitionerClass(ProvincePartitioner.class);
//指定reduceTask的個數
//wordCountJob.setNumReduceTask(5)
//設定wordCountJob所用的mapper邏輯類為哪個類
wordCountJob.setMapperClass(WordCountMapper.class);
//設定wordCountJob所用的reducer邏輯類為哪個類
wordCountJob.setReducerClass(WordCountReducer.class);
//設定map階段輸出的kv資料類型
wordCountJob.setMapOutputKeyClass(Text.class);
wordCountJob.setMapOutputValueClass(IntWritable.class);
//設定最終輸出的kv資料類型
wordCountJob.setOutputKeyClass(Text.class);
wordCountJob.setOutputValueClass(IntWritable.class);
//指定需要Conbiner,以及使用哪個類作為Conbine的邏輯
//wordCountJob.setCombinerClass(WordCountReducer.class);
//如果不設定InputFormat,它預設是TextInputFormat.class
//wordCountJob.setInputFormatClass(CombineTextInputFormat.class);
//CombineTextInputFormat.setMaxInputSplitSize(wordCountJob,4194304);
//CombineTextInputFormat.setMinInputSplitSize(wordCountJob,2097152);
//設定要處理的文本資料所存放的路徑
FileInputFormat.setInputPaths(wordCountJob, new Path(args[0]));
//設定輸出結果所存放的路徑
FileOutputFormat.setOutputPath(wordCountJob, new Path(args[1]));
//送出job給hadoop叢集
wordCountJob.waitForCompletion(true);
}
}
原文:
https://blog.csdn.net/qq_16633405/article/details/78924165
https://blog.csdn.net/litianxiang_kaola/article/details/71154302