前言
由于最近开始涉及MR程序的编写,之前会一点HIVE,对MR接触不多,不论从原理还是实际操作上,都有些障碍,终于在今天柳暗花明,将这一过程记录下,与大家分享~
环境准备
在VM上搭建好LINUX虚拟机,并安装配置好HADOOP2.2.0,我这里是单节点的伪分布式
在eclipse中安装hadoop插件
对我们这种MR的新手而言,最好在本地有一个HADOOP运行环境,这样有许多好处:
如果我们每次写完MR程序,都打成JAR包上传至线上服务器上运行,那么每次MR运行的时间非常长,也许等待了许久,运行结果和我们预期不一致,又得改程序重新来一边,这会有一点痛苦!
在我们本地的HADOOP上运行MR程序非常快,就那么几秒,更加重要的是,我们可以再
本地准备输入文件去测试MR的逻辑,这对调试/开发程序非常方便!
实例及原理分析
假设,我们有这样的输入文件:
cate-a spu-1 1
cate-a spu-1 2
cate-a spu-2 3
cate-a spu-2 4
cate-a spu-3 5
cate-a spu-3 6
cate-a spu-1 7
cate-a spu-4 8
cate-a spu-4 9
cate-a spu-1 8
...
我们希望得到分cate,分spu的总和,并且取分cate分spu的TOP3
<a href="http://s3.51cto.com/wyfs02/M01/6B/F5/wKiom1U7XbWQTAlVAAJl7mLEZ4s882.jpg" target="_blank"></a>
如上图示,大致描述了MAP/REDUCE的运行流程:
输入文件+InputFormat 提供给MAP
需要清楚提供给MAP的KEY1/VALUE1是什么?MAP准备输出的KEY2/VALUE2是什么?
MAP输出后,会进行分区操作,也就是决定KEY2/VALUE2发到哪些reduce上
分区由job.setPartitionerClass决定
在同一个分区内,会对KEY2进行排序,依据是job.setSortComparatorClass,
如果没有设置则根据KEY的compareTo方法
接下来进入分组阶段,会构造KEY3和VALUE迭代器
分组的依据是job.setGroupingComparatorClass,只要比较器比较的相同就在同一组
KEY3/VALUE迭代器交给reduce方法处理
步骤:
自定义KEY
KEY应该是可序列化,可比较的,只需要注意实现WritableComparable即可。
重点关注compareTo方法。
1
2
3
4
5
6
7
8
<code>@Override</code>
<code>public</code> <code>int</code> <code>compareTo(Cate2SpuKey that) {</code>
<code>System.out.println(</code><code>"开始对KEY进行排序..."</code><code>);</code>
<code>if</code><code>(cate2.equals(that.getCate2())){</code>
<code>return</code> <code>spu.compareTo(that.getSpu());</code>
<code>}</code>
<code>return</code> <code>cate2.compareTo(that.getCate2());</code>
分区
分区,是KEY的第一次比较,extends Partitioner 并提供getPartition即可。
这里根据cate分区。
分组
需要注意的是,分组类必须提供构造方法,并且重载
public int compare(WritableComparable w1, WritableComparable w2) 。这里根据cate,spu分组。
通过上述的,就可以取得分cate分spu的SUM(counts)值了。
通过eclipse hadoop插件,可以方便我们上传测试文件到HDFS,可以浏览,删除HDFS文件,更加方便的是,就像运行普通JAVA程序一样的运行/调试MR程序(不在需要打成JAR包),让我们可以追踪MR的每一步,非常方便进行逻辑性测试~
<a href="http://s3.51cto.com/wyfs02/M00/6B/F2/wKioL1U7gm6x1rEXAAGiT4OdcUI287.jpg" target="_blank"></a>
那么怎么取分cate分spu的TOP3呢?
我们只需要把上一个MR的输出文件,作为另一个MR的输入,并且以cate+counts 为KEY ,以spu为VALUE,根据cate分区,分组,排序的话:cate相同情况下,根据counts倒序;
最后在reduce阶段取TOP3即可。
9
10
11
12
13
14
15
16
17
18
19
20
<code>protected</code> <code>void</code> <code>reduce(Cate2CountsKey key, Iterable<Text> values,</code>
<code>Reducer<Cate2CountsKey, Text, Text, Text>.Context context)</code>
<code>throws</code> <code>IOException, InterruptedException {</code>
<code>System.out.println(</code><code>"reduce..."</code><code>);</code>
<code>System.out.println(</code><code>"VALUES迭代前... key:"</code> <code>+ key.toString());</code>
<code>System.out.println(</code><code>"VALUES迭代前... key:"</code> <code>+ key.getCounts());</code>
<code>int</code> <code>top = </code><code>3</code><code>;</code>
<code>for</code><code>(Text t : values){</code>
<code>if</code><code>(top > </code><code>0</code><code>){</code>
<code>System.out.println(</code><code>"VALUES迭代中... key:"</code> <code>+ key.toString());</code>
<code>System.out.println(</code><code>"VALUES迭代中... key:"</code> <code>+ key.getCounts());</code>
<code>context.write(</code><code>new</code> <code>Text(key.getCate2() + </code><code>"\t"</code> <code>+ t.toString()), </code>
<code>new</code> <code>Text(key.getCounts() </code>
<code>+ </code><code>""</code><code>));</code>
<code>top--;</code>
<code>System.out.println(</code><code>"reduce over..."</code><code>);</code>
<a href="http://s3.51cto.com/wyfs02/M01/6B/F2/wKioL1U7hUDxjEDHAAFWfetzxTI513.jpg" target="_blank"></a>
那么到现在,分组取TOP就完成了。
一个疑问:reduce阶段中的KEY到底是什么?
在上面例子中的取TOP3的MR中,我们是以cate+counts为KEY,spu为VALUE。
cate作为分区,分组的依据,排序根据同一个cate下counts倒序。如下图所示:
<a href="http://s3.51cto.com/wyfs02/M01/6B/F6/wKiom1U7iSuivGaRAAB6fM_hS0g123.jpg" target="_blank"></a>
那么reduce方法中的KEY是什么?
spu1,spu4,spu3...是VALUES中的,那么这个迭代器的对应KEY是什么呢?
是cate+42吗?还是其他?
在VALUES迭代过程中,这个KEY会变化吗?
我们可以看下ECLIPSE中的控制台打印输出的内容:
<a href="http://s3.51cto.com/wyfs02/M00/6B/F3/wKioL1U7i8uDWJtGAAHA5SM7Zeo775.jpg" target="_blank"></a>
从打印上来看,可以分析出如下结论:
分组后,交给reduce方法处理的KEY是同一组的所有KEY的第一个KEY,并且在VALUES迭代过程中,KEY并不会重新NEW,而是利用SETTER反射的方式重新设置属性值,这样在VALUES迭代过程中取得的KEY都是与之对应的KEY了。
本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1638361,如需转载请自行联系原作者