天天看点

MapReduce 之shuffle过程

我们知道在MapReduce程序在map阶段和reduce阶段之间,会进行shuffle操作。那么我们来详细分析一下shuffle的过程或者原理

MapReduce 之shuffle过程

在MapTask调用Mapper#map方法之前,会构造一个RecordWriter对象,如果Job没有reduce操作,那么new一个NewDirectOutputCollector

如果包含Reduce操作,就new一个NewOutputCollector操作

然后把这个Writer封装在MapContext中,当map阶段某一个map任务完成,就会调用MapContext#write方法。

Write方法又会调用MapOutputCollector#collect方法:

publicvoidwrite(Kkey, Vvalue)throwsIOException, InterruptedException {

collector.collect(key,value,partitioner.getPartition(key,value,

partitions));

}

具体的实现是由MapOutputBuffer来实现的。

这时候Map阶段Shuffle开始:

一 Map阶段的shuffle分析

1分区

一个Map任务完成之后,会进行分区。

1.1我们使用的是什么类来分区呢?首先会判断我们是否在配置文件配置了mapreduce.job.reduces参数,如果没有设置或者设置为1,那么我们就创建一个分区数为0的Partitioner。如果这个参数> 1,如果我们自己在代码设置Partitioner,那么就构造一个我们自己的Partitioner,如果没有,那么就构造一个默认的HashPartitioner。

1.2怎么分区呢?

Map任务结束后,每一个任务都会构造一个内存缓冲区kvbuffer。这个内存缓冲区是一个环形的数据结构,本质是一个字节数组。在初始化的时候就会构造这个字节数组:

kvbuffer= new byte[maxMemUsage];

bufvoid= kvbuffer.length;

kvmeta= ByteBuffer.wrap(kvbuffer)

 .order(ByteOrder.nativeOrder())

 .asIntBuffer();

setEquator(0);

bufstart= bufend = bufindex = equator;

kvstart= kvend = kvindex;

这儿有几个概念:

Kvbuffer:默认大小是100M,我们也可以自己调整这个缓冲区大小,参数是:mapreduce.task.io.sort.mb

kvmeta:存放索引的元数据信息

kvstart:开始下标

kvend:在spill开始的时候,它会被赋值为kvindex,spill结束,又被赋值为kvstartz,这时候kvstart=kvend。即只要kvstart!= kvend就表示正在spill,否则表示普通状态

kvindex:下一个可以记录的索引位置

最开始这三个值默认都是一样的

bufvoid:实际使用的缓冲区

bufmark:用于标记记录的结尾

bufIndex:初始值为0,指缓冲区增长到哪儿了

在kvindex和bufIndex之间的那部分就是还未溢写的数据,如果只要这部分数据超过80%,就会启动spill操作

流程:

#首先判断剩余的内存缓冲区是否大于0,而且现在是否处于spill阶段,如果剩余的内存缓冲区>0且不处于spill阶段,那么我们就把结果往内存缓冲区写。

#如果写入缓冲区的数据超过了阀值,默认80%就会启用spill程序

#SpillThread是一个线程类,专门负责溢写数据到磁盘,如果没有溢写发生,就一直处于等到状态,否则进行排序和溢写,调用sort

AndSpill方法

#根据分区数目创建SpillRecord

#然后调用getSpillFileForWrite获取HDFS文件路径,会生成一个带有编号的文件,比如output/spill{spillNumber }.out

lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT+ "/spill" + spillNumber + ".out", size, getConf());

#指定一个排序策略HeapSorter/QuickSorter进行排序,默认是QuickSorter

#构造一个IFile.Writer对象,然后输出流输出到指定文件

writer= new Writer<K, V>(job, partitionOut, keyClass,valClass,

codec,spilledRecordsCounter);

这里的codec支持压缩,有助于性能提升。它会根据指定的压缩策略:

mapreduce.map.output.compress.codec。如果没有指定,默认就是

DefaultCodec。

#如果用户设置了Combiner,在溢写到文件之前,还会进行一次combine操作,它继承了Reducer类,本质就是一个Reducer类。然后调用Combiner#combine操作,而后就会调用Reducer#run方法。

#将元数据信息写入内存索引数据结构:SpillRecord.如果内存中索引大于1MB,则写到文件名类似output/spillN.out.index文件,N就是当前spill的次数

#最后RecordWriter在close的时候会去merge当前map 任务产生的那些临时文件,最后会创建file.out和file.out.index文件用来存储最终的输出和索引

二 Reduce的shuffle操作

在Map 任务结束以后,Reduce就要开始从运行Map任务那些节点上复制内存中或者磁盘上的数据。然后再进行合并排序操作。

2.1copy阶段

首先会组装一个ShuffleConsumerPlugin插件,并对他进行初始化

然后调用其run方法,这个插件开始运行

#构造一个EventFetcher线程对象:它主要就是获取已经完成的Map任务事件,然后遍历事件,然后把主机名和URL放进一个集合

#获取进行复制的Fetcher数目,如果是本地就1个,如果是远程取决于mapreduce.reduce.shuffle.parallelcopies参数,默认是5

#然后每一个Fetcher开始工作,他们从之前保存的Host的map集合里获取主机名和URL,然后进行拷贝

#拷贝完毕关闭线程资源

继续阅读