天天看點

MapReduce原理及執行個體分析

前言

由于最近開始涉及MR程式的編寫,之前會一點HIVE,對MR接觸不多,不論從原理還是實際操作上,都有些障礙,終于在今天柳暗花明,将這一過程記錄下,與大家分享~

環境準備

在VM上搭建好LINUX虛拟機,并安裝配置好HADOOP2.2.0,我這裡是單節點的僞分布式

在eclipse中安裝hadoop插件

對我們這種MR的新手而言,最好在本地有一個HADOOP運作環境,這樣有許多好處:

如果我們每次寫完MR程式,都打成JAR包上傳至線上伺服器上運作,那麼每次MR運作的時間非常長,也許等待了許久,運作結果和我們預期不一緻,又得改程式重新來一邊,這會有一點痛苦!

在我們本地的HADOOP上運作MR程式非常快,就那麼幾秒,更加重要的是,我們可以再

本地準備輸入檔案去測試MR的邏輯,這對調試/開發程式非常友善!

執行個體及原理分析

假設,我們有這樣的輸入檔案:

cate-a spu-1 1

cate-a spu-1 2

cate-a spu-2 3

cate-a spu-2 4

cate-a spu-3 5

cate-a spu-3 6

cate-a spu-1 7

cate-a spu-4 8

cate-a spu-4 9

cate-a spu-1 8

...

我們希望得到分cate,分spu的總和,并且取分cate分spu的TOP3

<a href="http://s3.51cto.com/wyfs02/M01/6B/F5/wKiom1U7XbWQTAlVAAJl7mLEZ4s882.jpg" target="_blank"></a>

如上圖示,大緻描述了MAP/REDUCE的運作流程:

輸入檔案+InputFormat  提供給MAP

需要清楚提供給MAP的KEY1/VALUE1是什麼?MAP準備輸出的KEY2/VALUE2是什麼?

MAP輸出後,會進行分區操作,也就是決定KEY2/VALUE2發到哪些reduce上

分區由job.setPartitionerClass決定

在同一個分區内,會對KEY2進行排序,依據是job.setSortComparatorClass,

如果沒有設定則根據KEY的compareTo方法

接下來進入分組階段,會構造KEY3和VALUE疊代器

分組的依據是job.setGroupingComparatorClass,隻要比較器比較的相同就在同一組

KEY3/VALUE疊代器交給reduce方法處理

步驟:

自定義KEY

KEY應該是可序列化,可比較的,隻需要注意實作WritableComparable即可。

重點關注compareTo方法。

1

2

3

4

5

6

7

8

<code>@Override</code>

<code>public</code> <code>int</code> <code>compareTo(Cate2SpuKey that) {</code>

<code>System.out.println(</code><code>"開始對KEY進行排序..."</code><code>);</code>

<code>if</code><code>(cate2.equals(that.getCate2())){</code>

<code>return</code> <code>spu.compareTo(that.getSpu());</code>

<code>}</code>

<code>return</code> <code>cate2.compareTo(that.getCate2());</code>

分區

分區,是KEY的第一次比較,extends Partitioner 并提供getPartition即可。

這裡根據cate分區。

分組

需要注意的是,分組類必須提供構造方法,并且重載 

public int compare(WritableComparable w1, WritableComparable w2) 。這裡根據cate,spu分組。

通過上述的,就可以取得分cate分spu的SUM(counts)值了。

通過eclipse hadoop插件,可以友善我們上傳測試檔案到HDFS,可以浏覽,删除HDFS檔案,更加友善的是,就像運作普通JAVA程式一樣的運作/調試MR程式(不在需要打成JAR包),讓我們可以追蹤MR的每一步,非常友善進行邏輯性測試~

<a href="http://s3.51cto.com/wyfs02/M00/6B/F2/wKioL1U7gm6x1rEXAAGiT4OdcUI287.jpg" target="_blank"></a>

那麼怎麼取分cate分spu的TOP3呢?

我們隻需要把上一個MR的輸出檔案,作為另一個MR的輸入,并且以cate+counts 為KEY ,以spu為VALUE,根據cate分區,分組,排序的話:cate相同情況下,根據counts倒序;

最後在reduce階段取TOP3即可。

9

10

11

12

13

14

15

16

17

18

19

20

<code>protected</code> <code>void</code> <code>reduce(Cate2CountsKey key, Iterable&lt;Text&gt; values,</code>

<code>Reducer&lt;Cate2CountsKey, Text, Text, Text&gt;.Context context)</code>

<code>throws</code> <code>IOException, InterruptedException {</code>

<code>System.out.println(</code><code>"reduce..."</code><code>);</code>

<code>System.out.println(</code><code>"VALUES疊代前... key:"</code> <code>+ key.toString());</code>

<code>System.out.println(</code><code>"VALUES疊代前... key:"</code> <code>+ key.getCounts());</code>

<code>int</code> <code>top = </code><code>3</code><code>;</code>

<code>for</code><code>(Text t : values){</code>

<code>if</code><code>(top &gt; </code><code>0</code><code>){</code>

<code>System.out.println(</code><code>"VALUES疊代中... key:"</code> <code>+ key.toString());</code>

<code>System.out.println(</code><code>"VALUES疊代中... key:"</code> <code>+ key.getCounts());</code>

<code>context.write(</code><code>new</code> <code>Text(key.getCate2() + </code><code>"\t"</code> <code>+ t.toString()), </code>

<code>new</code> <code>Text(key.getCounts() </code>

<code>+ </code><code>""</code><code>));</code>

<code>top--;</code>

<code>System.out.println(</code><code>"reduce over..."</code><code>);</code>

<a href="http://s3.51cto.com/wyfs02/M01/6B/F2/wKioL1U7hUDxjEDHAAFWfetzxTI513.jpg" target="_blank"></a>

那麼到現在,分組取TOP就完成了。

一個疑問:reduce階段中的KEY到底是什麼?

在上面例子中的取TOP3的MR中,我們是以cate+counts為KEY,spu為VALUE。

cate作為分區,分組的依據,排序根據同一個cate下counts倒序。如下圖所示:

<a href="http://s3.51cto.com/wyfs02/M01/6B/F6/wKiom1U7iSuivGaRAAB6fM_hS0g123.jpg" target="_blank"></a>

那麼reduce方法中的KEY是什麼?

spu1,spu4,spu3...是VALUES中的,那麼這個疊代器的對應KEY是什麼呢?

是cate+42嗎?還是其他?

在VALUES疊代過程中,這個KEY會變化嗎?

我們可以看下ECLIPSE中的控制台列印輸出的内容:

<a href="http://s3.51cto.com/wyfs02/M00/6B/F3/wKioL1U7i8uDWJtGAAHA5SM7Zeo775.jpg" target="_blank"></a>

從列印上來看,可以分析出如下結論:

分組後,交給reduce方法處理的KEY是同一組的所有KEY的第一個KEY,并且在VALUES疊代過程中,KEY并不會重新NEW,而是利用SETTER反射的方式重新設定屬性值,這樣在VALUES疊代過程中取得的KEY都是與之對應的KEY了。

本文轉自zfz_linux_boy 51CTO部落格,原文連結:http://blog.51cto.com/zhangfengzhe/1638361,如需轉載請自行聯系原作者

繼續閱讀