天天看点

MapReduce系列之MapReduce任务处理流程

MapReduce处理数据的流程一般是这样的:

MapReduce系列之MapReduce任务处理流程

1、从HDFS上读取数据,因为是分布式与并行计算,需要将数据划分给多个MapReduce任务。HDFS存储文件也是分块的,每个MapReduce的输入一般是和HDFS的数据块是对应的。也就是说一个HDFS数据块作为一个MapReduce任务的输入。这是Hadoop默认的情况,我们也可以实现InputFormat自定义输入格式。

2、Map进行计算:这一步和Reduce都是由用户根据需要实现的。在WordCount例子中,对每个单词做映射,word-->(word,1)

3、shuffle and sort:这一步是MapReduce的核心,但用户基本不用管,可能会根据具体的需要自定义比较器和分区器。具体详细过程如下:

  •         maptask收集我们的map()方法输出的kv对,放到内存缓冲区中
  •         从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
  •         多个溢出文件会被合并成大的溢出文件
  •         在溢出过程中,及合并的过程中,都要调用partitoner进行分组和针对key进行排序
  •         reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据
  •         reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)
  •         合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)

Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快

缓冲区的大小可以通过参数调整,  参数:io.sort.mb  默认100M。

4、Reduce:和Map一样需要用户根据具体需求实现。在WordCount例子中,对每个单词的一系列值做加法。

5、将计算结果输出到HDFS,可以实现OutputFormat接口自定义输出格式。

以上就是MapReduce编程的一个大体流程。

附  WordCount例子代码:https://github.com/taowenjun/MapReduce/tree/master/cn/tao/wordcount

声明:图片来自网络

继续阅读