通常我們在學習一門語言的時候,寫的第一個程式就是Hello World。而在學習Hadoop時,我們要寫的第一個程式就是詞頻統計
WordCount
程式。
一、MapReduce簡介
1.1 MapReduce程式設計模型
MapReduce采用”分而治之”的思想,把對大規模資料集的操作,分發給一個主節點管理下的各個分節點共同完成,然後通過整合各個節點的中間結果,得到最終結果。簡單地說,MapReduce就是”任務的分解與結果的彙總”。
在Hadoop中,用于執行MapReduce任務的機器角色有兩個:
- JobTracker用于排程工作的,一個Hadoop叢集中隻有一個JobTracker,位于master。
- TaskTracker用于執行工作,位于各slave上。
在分布式計算中,MapReduce架構負責處理了并行程式設計中分布式存儲、工作排程、負載均衡、容錯均衡、容錯處理以及網絡通信等複雜問題,把處理過程高度抽象為兩個函數:map和reduce,map負責把任務分解成多個任務,reduce負責把分解後多任務處理的結果彙總起來。
需要注意的是,用MapReduce來處理的資料集(或任務)必須具備這樣的特點:待處理的資料集可以分解成許多小的資料集,而且每一個小資料集都可以完全并行地進行處理。
1.2 MapReduce工作過程
對于一個MR任務,它的輸入、輸出以及中間結果都是
<key, value>
鍵值對:
- Map:
——><k1, v1>
list(<k2, v2>)
- Reduce:
——><k2, list(v2)>
list(<k3, v3>)
MR程式的執行過程主要分為三步:Map階段、Shuffle階段、Reduce階段,如下圖:
- Map階段
- 分片(Split):map階段的輸入通常是HDFS上檔案,在運作Mapper前,FileInputFormat會将輸入檔案分割成多個split ——1個split至少包含1個HDFS的Block(預設為64M);然後每一個分片運作一個map進行處理。
- 執行(Map):對輸入分片中的每個鍵值對調用
函數進行運算,然後輸出一個結果鍵值對。map()
- Partitioner:對
的輸出進行partition,即根據key或value及reduce的數量來決定目前的這對鍵值對最終應該交由哪個reduce處理。預設是對key哈希後再以reduce task數量取模,預設的取模方式隻是為了避免資料傾斜。然後該key/value對以及partitionIdx的結果都會被寫入環形緩沖區。map()
- Partitioner:對
- 溢寫(Spill):map輸出寫在記憶體中的環形緩沖區,預設當緩沖區滿80%,啟動溢寫線程,将緩沖的資料寫出到磁盤。
- Sort:在溢寫到磁盤之前,使用快排對緩沖區資料按照partitionIdx, key排序。(每個partitionIdx表示一個分區,一個分區對應一個reduce)
- Combiner:如果設定了Combiner,那麼在Sort之後,還會對具有相同key的鍵值對進行合并,減少溢寫到磁盤的資料量。
- 合并(Merge):溢寫可能會生成多個檔案,這時需要将多個檔案合并成一個檔案。合并的過程中會不斷地進行 sort & combine 操作,最後合并成了一個已分區且已排序的檔案。
- Shuffle階段:廣義上Shuffle階段橫跨Map端和Reduce端,在Map端包括Spill過程,在Reduce端包括copy和merge/sort過程。通常認為Shuffle階段就是将map的輸出作為reduce的輸入的過程
- Copy過程:Reduce端啟動一些copy線程,通過HTTP方式将map端輸出檔案中屬于自己的部分拉取到本地。Reduce會從多個map端拉取資料,并且每個map的資料都是有序的。
- Merge過程:Copy過來的資料會先放入記憶體緩沖區中,這裡的緩沖區比較大;當緩沖區資料量達到一定門檻值時,将資料溢寫到磁盤(與map端類似,溢寫過程會執行 sort & combine)。如果生成了多個溢寫檔案,它們會被merge成一個有序的最終檔案。這個過程也會不停地執行 sort & combine 操作。
- Reduce階段:Shuffle階段最終生成了一個有序的檔案作為Reduce的輸入,對于該檔案中的每一個鍵值對調用
方法,并将結果寫到HDFS。reduce()
二、運作WordCount程式
在運作程式之前,需要先搭建好Hadoop叢集環境,參考《Hadoop+HBase+ZooKeeper分布式叢集環境搭建》。
2.1 源代碼
WordCount可以說是最簡單的MapReduce程式了,隻包含三個檔案:一個 Map 的 Java 檔案,一個 Reduce 的 Java 檔案,一個負責調用的主程式 Java 檔案。
我們在目前使用者的主檔案夾下建立
wordcount_01/
目錄,在該目錄下再建立
src/
和
classes/
。 src 目錄存放 Java 的源代碼,classes 目錄存放編譯結果。
TokenizerMapper.java
package com.lisong.hadoop;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
IntWritable one = new IntWritable();
Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException,InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
IntSumReducer.java
package com.lisong.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException {
int sum = ;
for(IntWritable val:values) {
sum += val.get();
}
result.set(sum);
context.write(key,result);
}
}
WordCount.java
package com.lisong.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != ) {
System.err.println("Usage: wordcount <in> <out>");
System.exit();
}
Job job = new Job(conf, "wordcount");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[]));
System.exit(job.waitForCompletion(true)?:);
}
}
以上三個.java源檔案均置于 src 目錄下。
2.2 編譯
Hadoop 2.x 版本中jar不再集中在一個 hadoop-core-*.jar 中,而是分成多個 jar。編譯WordCount程式需要如下三個 jar:
$HADOOP_HOME/share/hadoop/common/hadoop-common-.jar
$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-.jar
$HADOOP_HOME/share/hadoop/common/lib/commons-cli-.jar
使用
javac
指令進行編譯:
$ cd wordcount_01
$ javac -classpath /home/hadoop/hadoop/share/hadoop/common/hadoop-common-.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-.jar:/home/hadoop/hadoop/share/hadoop/common/lib/commons-cli-.jar -d classes/ src/*.java
- -classpath,設定源代碼裡使用的各種類庫所在的路徑,多個路徑用
隔開。":"
- -d,設定編譯後的 class 檔案儲存的路徑。
- src/*.java,待編譯的源檔案。
2.3 打包
将編譯好的 class 檔案打包成 Jar 包,jar 指令是 JDK 的打包指令行工具。
$ jar -cvf wordcount.jar classes
打包結果是 wordcount.jar 檔案,放在目前目錄下。
2.4 執行
執行hadoop程式的時候,輸入檔案必須先放入hdfs檔案系統中,不能是本地檔案。
1 . 先檢視hdfs檔案系統的根目錄:
$ hadoop/bin/hadoop fs -ls /
Found items
drwxr-xr-x - hadoop supergroup -- : /hbase
可以看出,hdfs的根目錄是一個叫
/hbase
的目錄。
2 . 然後利用
put
将輸入檔案(多個輸入檔案位于
input
檔案夾下)複制到hdfs檔案系統中:
$ hadoop/bin/hadoop fs -put input /hbase
3 . 運作wordcount程式
$ hadoop/bin/hadoop jar wordcount_01/wordcount.jar WordCount /hbase/input /hbase/output
提示找不到 WordCount 類:
Exception in thread "main" java.lang.NoClassDefFoundError: WordCount
…
因為程式中聲明了 package ,是以在指令中也要 com.lisong.hadoop.WordCount 寫完整:
$ hadoop/bin/hadoop jar wordcount_01/wordcount.jar com.lisong.hadoop.WordCount /hbase/input /hbase/output
其中 “jar” 參數是指定 jar 包的位置,com.lisong.hadoop.WordCount 是主類。運作程式處理 input 目錄下的多個檔案,将結果寫入 /hbase/output 目錄。
4 . 檢視運作結果
$ hadoop/bin/hadoop fs -ls /hbase/output
Found items
-rw-r--r-- hadoop supergroup -- : /hbase/output/_SUCCESS
-rw-r--r-- hadoop supergroup -- : /hbase/output/part-r-
可以看到
/hbase/output/
目錄下有兩個檔案,結果就存在
part-r-00000
中:
$ hadoop/bin/hadoop fs -cat /hbase/output/part-r-
Google
Java
baidu
hadoop
三、WordCount程式分析
3.1 Hadoop資料類型
Hadoop MapReduce操作的是鍵值對,但這些鍵值對并不是Integer、String等标準的Java類型。為了讓鍵值對可以在叢集上移動,Hadoop提供了一些實作了
WritableComparable
接口的基本資料類型,以便用這些類型定義的資料可以被序列化進行網絡傳輸、檔案存儲與大小比較。
- 值:僅會被簡單的傳遞,必須實作
或Writable
接口。WritableComparable
- 鍵:在Reduce階段排序時需要進行比較,故隻能實作
接口。WritableComparable
下面是8個預定義的Hadoop基本資料類型,它們均實作了
WritableComparable
接口:
類 | 描述 |
---|---|
BooleanWritable | 标準布爾型數值 |
ByteWritable | 單位元組數值 |
DoubleWritable | 雙位元組數 |
FloatWritable | 浮點數 |
IntWritable | 整型數 |
LongWritable | 長整型數 |
Text | 使用UTF8格式存儲的文本 |
NullWritable | 當 中的key或value為空時使用 |
3.2 源代碼分析
3.2.1 Map過程
package com.lisong.hadoop;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
IntWritable one = new IntWritable();
Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException,InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
Map過程需要繼承
org.apache.hadoop.mapreduce
包中 Mapper 類,并重寫其map方法。
其中的模闆參數:第一個Object表示輸入key的類型;第二個Text表示輸入value的類型;第三個Text表示表示輸出鍵的類型;第四個IntWritable表示輸出值的類型。
作為map方法輸入的鍵值對,其value值存儲的是文本檔案中的一行(以回車符為行結束标記),而key值為該行的首字母相對于文本檔案的首位址的偏移量。然後StringTokenizer類将每一行拆分成為一個個的單詞,并将
<word,1>
作為map方法的結果輸出,其餘的工作都交有 MapReduce架構 處理。
注:
StringTokenizer
是Java工具包中的一個類,用于将字元串進行拆分——預設情況下使用空格作為分隔符進行分割。
3.2.2 Reduce過程
package com.lisong.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException {
int sum = ;
for(IntWritable val:values) {
sum += val.get();
}
result.set(sum);
context.write(key,result);
}
}
Reduce過程需要繼承
org.apache.hadoop.mapreduce
包中 Reducer 類,并 重寫 reduce方法。
其中模闆參數同Map一樣,依次表示是輸入鍵類型,輸入值類型,輸出鍵類型,輸出值類型。
public void reduce(Text key, Iterable<IntWritable> values, Context context)
reduce 方法的輸入參數 key 為單個單詞,而 values 是由各Mapper上對應單詞的計數值所組成的清單(一個實作了 Iterable 接口的變量,可以了解成 values 裡包含若幹個 IntWritable 整數,可以通過疊代的方式周遊所有的值),是以隻要周遊 values 并求和,即可得到某個單詞出現的總次數。
3.2.3 執行作業
package com.lisong.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != ) {
System.err.println("Usage: wordcount <in> <out>");
System.exit();
}
Job job = new Job(conf, "wordcount");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[]));
System.exit(job.waitForCompletion(true)?:);
}
}
在MapReduce中,由Job對象負責管理和運作一個計算任務,并通過Job的一些方法對任務的參數進行相關的設定,此處:
- 設定了使用
完成Map過程中的處理,使用TokenizerMapper.class
完成Combine和Reduce過程中的處理。IntSumReducer.class
- 還設定了Map過程和Reduce過程的輸出類型:key的類型為Text,value的類型為IntWritable。
- 任務的輸出和輸入路徑則由指令行參數指定,并由FileInputFormat和FileOutputFormat分别設定。
- FileInputFormat類的很重要的作用就是将檔案進行切分 split,并将 split 進一步拆分成key/value對
- FileOutputFormat類的作用是将處理結果寫入輸出檔案。
- 完成相應任務的參數設定後,即可調用
方法執行任務。job.waitForCompletion()
3.2.4 WordCount流程
1)将檔案拆分成splits,由于測試用的檔案較小,是以每個檔案為一個split,并将檔案按行分割形成
<key,value>
對,key為偏移量(包括了回車符),value為文本行。這一步由MapReduce架構自動完成,如下圖:
2)将分割好的
<key,value>
對交給使用者定義的map方法進行處理,生成新的
<key,value>
對,如下圖所示:
3)得到map方法輸出的
<key,value>
對後,Mapper會将它們按照key值進行排序,并執行Combine過程,将key值相同的value值累加,得到Mapper的最終輸出結果。如下圖:
4)Reducer先對從Mapper接收的資料進行排序,再交由使用者自定義的reduce方法進行處理,得到新的
<key,value>
對,并作為WordCount的輸出結果,如下圖:
個人站點:http://songlee24.github.com
參考:
[1] 實戰Hadoop:開啟通向雲計算的捷徑
[2]《了解MapReduce核心Shuffle》