天天看點

MapReduce程式設計基礎

1. WordCount示例及MapReduce程式架構 2.  MapReduce程式執行流程 3.  深入學習MapReduce程式設計(1) 4. 參考資料及代碼下載下傳 

首先通過一個簡單的程式來實際運作一個MapReduce程式,然後通過這個程式我們來哦那個結一下MapReduce程式設計模型。

下載下傳源程式:/Files/xuqiang/WordCount.rar,将該程式打包成wordcount.jar下面的指令,随便寫一個文本檔案,這裡是WordCountMrtrial,并上傳到hdfs上,這裡的路徑是/tmp/WordCountMrtrial,運作下面的指令:

 xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ ./bin/hadoop jar wordcount.jar WordCount /tmp/WordCountMrtrial /tmp/result

 如果該任務運作完成之後,将在hdfs的/tmp/result目錄下生成類似于這樣的結果:

 gentleman 11

get 12

give 8

go 6

good 9

government 16

運作一個程式的基本上就是這樣一個過程,我們來看看具體程式:

main函數中首先生成一個Job對象, Job job = new Job(conf, "word count");然後設定job的MapperClass,ReducerClass,設定輸入檔案路徑 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));設定輸出檔案路徑:FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));等待程式運作完成:System.exit(job.waitForCompletion(true) ? 0 : 1);可以看出main中僅僅是啟動了一個job,然後設定該job相關的參數,具體實作MapReduce是mapper類和reducer類。

 TokenizerMapper類中map函數将一行分割成<K2, V2>,然後IntSumReducer的reduce将<K2, list<V2>>轉換成最終結果<K3, V3>。

MapReduce程式設計基礎

通過這個示例基本上也能總結出簡單的MapReduce程式設計的模型:一個Mapper類,一個Reducer類,一個Driver類。

MapReduce程式設計基礎

 這裡所描述的執行流程更加注重是從程式的角度去了解,更加全面的流程可參考[這裡]。

MapReduce程式設計基礎

首先使用者指定待處理的檔案,在WordCount就是檔案WordCountMrtrial,這是hadoop根據設定的 InputDataFormat來将輸入檔案分割成一個record(key/value對),然後将這些record傳遞給map函數,在 WordCount示例中,對應的record就是<line_number行号, line_content該行内容>;

然後map函數根據輸入的record,形成<K2, V2>,在WordCount示例中形成<K2, V2>就是<single_word, word_count>,例如<"a", 1>;

如果map過程完成之後,hadoop将這些生成的<K2, V2>按照K2進行分組,形成<K2,list(V2) >,之後傳遞給reduce函數,在該函數中最終得到程式的輸出結果<K3, V3>。

由于在hadoop需要将key/value對序列化,然後通過網絡network發送到叢集中的其他機器上,是以說hadoop中的類型需要能夠序列化。

具體而言,自定義的類型,如果一個類class實作了Writable interface的話,那麼這個可以作為value類型,如果一個class實作了WritableComparable<T> interface的話,那麼這個class可以作為value類型或者是key類型。 

hadoop本身已經實作了一些預定義的類型predefined classes,并且這些類型實作了WritableComparable<T>接口。

MapReduce程式設計基礎

如果一個類想要成為一個mapper,那麼該類需要實作Mapper接口,同時繼承自MapReduceBase。在MapReduceBase類中,兩個方法是特别需要注意的:

void configure( JobConf job):這個方法是在任務被運作之前調用 

void close():在任務運作完成之後調用

剩下的工作就是編寫map方法,原型如下:

void map(Object key, Text value, Context context

                    ) throws IOException, InterruptedException;

 這個方法根據<K1, V1>生成<K2, V2>,然後通過context輸出。

同樣的在hadoop中預先定義了如下的Mapper:

MapReduce程式設計基礎

如果一個類想要成為Reducer的話,需要首先實作Reducer接口,然後需要繼承自MapReduceBase。

當reducer接收從mapper傳遞而來的key/value對,然後根據key來排序,分組,最終生成<K2, list<V2>> ,然後reducer根據<K2, list<V2>>生成<K3, V3>.

同樣在hadoop中預定義了一些Reducer:

MapReduce程式設計基礎

 Partitioner的作用主要是将mapper運作的結果“導向directing”到reducer。如果一個類想要成為Partitioner,那麼需要實作Partitioner接口,該接口繼承自JobConfigurable,定義如下:

MapReduce程式設計基礎

public interface Partitioner<K2, V2> extends JobConfigurable {

  /** 

   * Get the paritition number for a given key (hence record) given the total 

   * number of partitions i.e. number of reduce-tasks for the job.

   *   

   * <p>Typically a hash function on a all or a subset of the key.</p>

   *

   * @param key the key to be paritioned.

   * @param value the entry value.

   * @param numPartitions the total number of partitions.

   * @return the partition number for the <code>key</code>.

   */

  int getPartition(K2 key, V2 value, int numPartitions);

MapReduce程式設計基礎

hadoop将根據方法getPartition的傳回值确定将mapper的值發送到那個reducer上。傳回值相同的key/value對将被“導向“至同一個reducer。

上面我們的假設是MapReduce程式的輸入是key/value對,也就是<K1,

V1>,但是實際上一般情況下MapReduce程式的輸入是big file的形式,那麼如何将這個檔案轉換成<K1,

V1>,即file -> <K1, V1>。這就需要使用InputFormat接口了。 

下面是幾個常用InputFormat的實作類:

MapReduce程式設計基礎

當然除了使用hadoop預先定義的InputDataFormat之外,還可以自定義,這是需要實作InputFormat接口。該接口僅僅包含兩個方法:

 InputSplit[] getSplits(JobConf job, int numSplits) throws  IOException;該接口實作将大檔案分割成小塊split。

  RecordReader<K, V> getRecordReader(InputSplit split,

                                     JobConf job, 

                                     Reporter reporter) throws IOException; 

該方法輸入分割成的split,然後傳回RecordReader,通過RecordReader來周遊該split内的record。 

每個reducer将自己的輸出寫入到結果檔案中,這是使用output data format來配置輸出的檔案的格式。hadoop預先實作了:

MapReduce程式設計基礎

我們知道在linux中存在所謂的“流”的概念,也就是說我們可以使用下面的指令:

cat input.txt | RandomSample.py 10 >sampled_output.txt 

同樣在hadoop中我們也可以使用類似的指令,顯然這樣能夠在很大程度上加快程式的開發程序。下面來看看hadoop中流執行的過程:

MapReduce程式設計基礎

hadoop streaming從标磚輸入STDIN讀取資料,預設的情況下使用\t來分割每行,如果不存在\t的話,那麼這時正行的内容将被看作是key,而此時的value内容為空;

然後調用mapper程式,輸出<K2, V2>;

之後,調用Partitioner來将<K2, V2>輸出到對應的reducer上;

reducer根據輸入的<K2, list(V2)> 得到最終結果<K3, V3>并輸出到STDOUT上。 

下面我們假設需要做這樣一個工作,輸入一個檔案,檔案中每行是一個數字,然後得到該檔案中的數字的最大值(當然這裡可以使用streaming中自帶的Aggregate)。 首先我們編寫一個python檔案(如果對python不是很熟悉,看看[這裡]):

3.6.2.1 準備資料

 xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >url1

 xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >url2

上傳到hdfs上:

 xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -mkdir urls

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url1 urls/

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url2 urls/

3.6.2.2 編寫mapper multifetch.py

MapReduce程式設計基礎

#!/usr/bin/env python

import sys, urllib, re

title_re = re.compile("<title>(.*?)</title>",

        re.MULTILINE | re.DOTALL | re.IGNORECASE)

for line in sys.stdin:

    # We assume that we are fed a series of URLs, one per line

    url = line.strip()

    # Fetch the content and output the title (pairs are tab-delimited)

    match = title_re.search(urllib.urlopen(url).read())

    if match:

        print url, "\t", match.group(1).strip()

MapReduce程式設計基礎

該檔案的主要作用是給定一個url,然後輸出該url代表的html頁面的title部分。

在本地測試一下該程式:

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >urls

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >>urls

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ sudo chmod u+x ./multifetch.py 

 xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ cat urls | ./multifetch.py 将輸出:

 http://www.cs.brandeis.edu Computer Science Department | Brandeis University

http://www.nytimes.com The New York Times - Breaking News, World News & Multimedia

3.6.2.3 編寫reducer reducer.py

編寫reducer.py檔案 

MapReduce程式設計基礎

 #!/usr/bin/env python

from operator import itemgetter

import sys

    line = line.strip()

    print line

MapReduce程式設計基礎

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ chmod u+x ./reducer.py  

現在我們的mapper和reducer已經準備好了,那麼首先在本地上運作測試一下程式的功能,下面的指令模拟在hadoop上運作的過程:

首先mapper從stdin讀取資料,這裡是一行;

然後讀取該行的内容作為一個url,然後得到該url代表的html的title的内容,輸出<url, url-title-content>;

調用sort指令将mapper輸出排序;

将排序完成的結果交給reducer,這裡的reducer僅僅是将結果輸出。 

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ cat urls | ./multifetch.py | sort | ./reducer.py 

http://www.cs.brandeis.edu     Computer Science Department | Brandeis University

http://www.nytimes.com     The New York Times - Breaking News, World News & Multimedia  

顯然程式能夠正确 

3.6.2.4 在hadoop streaming上運作

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop jar ./mapred/contrib/streaming/hadoop-0.21.0-streaming.jar \

> -mapper /home/xuqiang/hadoop/src/hadoop-0.21.0/multifetch.py \

> -reducer /home/xuqiang/hadoop/src/hadoop-0.21.0/reducer.py \

> -input urls/* \

> -output titles 

 程式運作完成之後,檢視運作結果:

 xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -cat titles/part-00000

http://pages.cs.brandeis.edu/~cs147a/lab/hadoop-example/ 

http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#Hadoop+Streaming 

<Hadoop In Action> 

出處:http://www.cnblogs.com/xuqiang/archive/2011/06/05/2071935.html

繼續閱讀